import math import time import os import multiprocessing as mp from multiprocessing import Queue, Process, Value from queue import Empty import mp # Импортируйте ваши классы from Generator.LogGenerator import LogGenerator from Processor.StreamingLogCluster import StreamingLogCluster # --- ПРОЦЕСС 1: ГЕНЕРАТОР НАГРУЗКИ --- def load_generator(queue: Queue, target_rps: int, total_logs: int): """Генерирует логи с заданной частотой (RPS) и кладет в очередь.""" print(f"[ГЕНЕРАТОР] Запущен. Цель: {target_rps} логов/сек, Всего: {total_logs}") gen = LogGenerator() delay_between_logs = 1.0 / target_rps for i in range(total_logs): start_time = time.time() # Генерируем лог term = gen.generate() log_text = term.render(0.5).text # Кладем в очередь queue.put(log_text) # Пытаемся выдерживать заданный RPS elapsed = time.time() - start_time sleep_time = delay_between_logs - elapsed if sleep_time > 0: time.sleep(sleep_time) # Кладем "ядовитую пилюлю" (сигнал остановки для воркера) queue.put(None) print(f"[ГЕНЕРАТОР] Завершил работу. Все {total_logs} логов отправлены в очередь.") def load_generator_sin( queue: Queue, min_rps: float, max_rps: float, period_sec: float, duration_sec: float, current_rps_var: Value): """ Генерирует логи волнообразно (по синусоиде) от min_rps до max_rps. period_sec - за сколько секунд проходит одна полная волна (от минимума до минимума) duration_sec - общая длительность теста """ ## print(f"[ГЕНЕРАТОР] Волнообразный старт: {min_rps} -> {max_rps} RPS.") ## print(f"[ГЕНЕРАТОР] Длина волны: {period_sec} сек, Тест идет: {duration_sec} сек.") gen = LogGenerator() # Математика волны amplitude = (max_rps - min_rps) / 2.0 # Размах волны offset = (max_rps + min_rps) / 2.0 # Центр волны start_time = time.time() logs_sent = 0 last_print_sec = -1 while True: elapsed = time.time() - start_time if elapsed >= duration_sec: break # Вычисляем текущий RPS по формуле: Offset - Amplitude * cos(2 * pi * t / T) # Начинаем с -cos, чтобы старт был ровно с min_rps, а не с середины current_rps = offset - amplitude * math.cos(2 * math.pi * elapsed / period_sec) with current_rps_var.get_lock(): current_rps_var.value = current_rps # Защита от деления на ноль (если задали min_rps = 0) current_rps = max(0.1, current_rps) delay = 1.0 / current_rps loop_start = time.time() # 1. Генерируем и отправляем лог term = gen.generate() log_text = term.render(0.5).text queue.put(log_text) logs_sent += 1 # --- Блок красивого вывода (раз в секунду показываем текущий напор) --- current_sec = int(elapsed) if current_sec > last_print_sec: # Рисуем "градусник" нагрузки для наглядности bar_len = int((current_rps / max_rps) * 20) bar = "█" * bar_len + "░" * (20 - bar_len) ## print(f"[ГЕНЕРАТОР] Нагрузка: {current_rps:5.1f} RPS | {bar} | Отправлено: {logs_sent}") last_print_sec = current_sec # ---------------------------------------------------------------------- # 2. Ждем оставшееся время до следующего лога work_time = time.time() - loop_start sleep_time = delay - work_time if sleep_time > 0: time.sleep(sleep_time) # Завершаем работу queue.put(None) print(f"[ГЕНЕРАТОР] Завершен. Всего сгенерировано логов: {logs_sent}") # --- ПРОЦЕСС 2: ОБРАБОТЧИК (ВАШ КЛАСС) --- def log_processor(queue: Queue, model_path: str, db_path: str, processed_count: Value): """Достает логи из очереди и обрабатывает их. Замеряет свою скорость.""" ## print(f"[ОБРАБОТЧИК] Инициализация модели и БД...") # ВАЖНО: Инициализировать кластер нужно ВНУТРИ процесса, # чтобы SQLite и PyTorch не сошли с ума при передаче между процессами. clusterer = StreamingLogCluster(model_path, db_path) ## print(f"[ОБРАБОТЧИК] Готов к приему данных!") start_time = time.time() while True: try: # Ждем лог из очереди (не более 5 секунд) log_text = queue.get(timeout=50) # Если пришел сигнал остановки - выходим if log_text is None: break # Обрабатываем лог clusterer.process(log_text) with processed_count.get_lock(): processed_count.value += 1 # Каждые 50 логов выводим статистику # if processed_count % 50 == 0: # q_size = queue.qsize() # Сколько логов скопилось в очереди # elapsed = time.time() - start_time # current_rps = processed_count / elapsed # print( # f"[ОБРАБОТЧИК] Обработано: {processed_count} | Скорость: {current_rps:.1f} логов/сек | В очереди ждет: {q_size}") except Empty: print("[ОБРАБОТЧИК] Очередь пуста слишком долго. Завершаю работу.") break total_time = time.time() - start_time print("-" * 40) print(f"[ОБРАБОТЧИК] ИТОГИ:") print(f" Всего обработано: {processed_count.value}") print(f" Затрачено времени: {total_time:.2f} сек") print(f" Средняя скорость: {processed_count.value / total_time:.2f} логов/сек") print("-" * 40) clusterer.close() def monitor_process(queue: Queue, duration_sec: float, processed_count: Value, current_rps_generation: Value): """Монитор с расчетом реального RPS и состояния очереди.""" start_time = time.time() last_print_time = 0 last_processed_count = 0 # Сколько логов мы обработали в прошлый раз print(f"\n{'Время(с)'} | {'RPS (обработка)'} | {'RPS (генератор)'} | {'Очередь (логов)'}") print("-" * 45) while True: elapsed = time.time() - start_time # Условие выхода: прошло время теста + небольшой запас if elapsed > duration_sec + 2: break # Выводим отчет каждые 2 секунды if elapsed - last_print_time >= 2.0: current_processed = processed_count.value # Считаем RPS за прошедший интервал (2 секунды) delta_logs = current_processed - last_processed_count current_rps = delta_logs / (elapsed - last_print_time) # Размер очереди q_size = queue.qsize() print(f"{int(elapsed)} | {current_rps} | {current_rps_generation.value} | {q_size}") # Обновляем "состояние" для следующей итерации last_print_time = elapsed last_processed_count = current_processed time.sleep(0.5) # --- ТОЧКА ВХОДА --- if __name__ == '__main__': # Настройки Синусоиды MIN_RPS = 1 # Минимум логов в секунду (на спаде) MAX_RPS = 100 # Максимум логов в секунду (на пике) PERIOD_SEC = 20.0 # Полный цикл от минимума до минимума займет 20 секунд DURATION_SEC = 120.0 # Тестируем ровно 2 минуту (получится ровно 3 волны) MODEL_PATH = '../Resources/model' DB_FILE = "../Resources/logs.db" if os.path.exists(DB_FILE): os.remove(DB_FILE) # 1. Общие переменные для мониторинга processed_counter = Value('i', 0) # Счетчик обработанных логов current_rps = Value('f', 0.0) # Счетчик генерируемых rps log_queue = Queue() # 2. Запуск процессов proc_processor = Process(target=log_processor, args=(log_queue, MODEL_PATH, DB_FILE, processed_counter)) proc_generator = Process(target=load_generator_sin, args=(log_queue, MIN_RPS, MAX_RPS, PERIOD_SEC, DURATION_SEC, current_rps)) proc_monitor = Process(target=monitor_process, args=(log_queue, DURATION_SEC, processed_counter, current_rps)) proc_monitor.start() proc_processor.start() time.sleep(2) proc_generator.start() proc_generator.join() proc_processor.join() proc_monitor.join()