231 lines
9.7 KiB
Python
231 lines
9.7 KiB
Python
import math
|
||
import time
|
||
import os
|
||
import multiprocessing as mp
|
||
from multiprocessing import Queue, Process, Value
|
||
from queue import Empty
|
||
|
||
import mp
|
||
|
||
# Импортируйте ваши классы
|
||
from Generator.LogGenerator import LogGenerator
|
||
from Processor.StreamingLogCluster import StreamingLogCluster
|
||
|
||
|
||
# --- ПРОЦЕСС 1: ГЕНЕРАТОР НАГРУЗКИ ---
|
||
def load_generator(queue: Queue, target_rps: int, total_logs: int):
|
||
"""Генерирует логи с заданной частотой (RPS) и кладет в очередь."""
|
||
print(f"[ГЕНЕРАТОР] Запущен. Цель: {target_rps} логов/сек, Всего: {total_logs}")
|
||
|
||
gen = LogGenerator()
|
||
delay_between_logs = 1.0 / target_rps
|
||
|
||
for i in range(total_logs):
|
||
start_time = time.time()
|
||
|
||
# Генерируем лог
|
||
term = gen.generate()
|
||
log_text = term.render(0.5).text
|
||
|
||
# Кладем в очередь
|
||
queue.put(log_text)
|
||
|
||
# Пытаемся выдерживать заданный RPS
|
||
elapsed = time.time() - start_time
|
||
sleep_time = delay_between_logs - elapsed
|
||
if sleep_time > 0:
|
||
time.sleep(sleep_time)
|
||
|
||
# Кладем "ядовитую пилюлю" (сигнал остановки для воркера)
|
||
queue.put(None)
|
||
print(f"[ГЕНЕРАТОР] Завершил работу. Все {total_logs} логов отправлены в очередь.")
|
||
|
||
|
||
def load_generator_sin(
|
||
queue: Queue,
|
||
min_rps: float,
|
||
max_rps: float,
|
||
period_sec: float,
|
||
duration_sec: float,
|
||
current_rps_var: Value):
|
||
"""
|
||
Генерирует логи волнообразно (по синусоиде) от min_rps до max_rps.
|
||
period_sec - за сколько секунд проходит одна полная волна (от минимума до минимума)
|
||
duration_sec - общая длительность теста
|
||
"""
|
||
## print(f"[ГЕНЕРАТОР] Волнообразный старт: {min_rps} -> {max_rps} RPS.")
|
||
## print(f"[ГЕНЕРАТОР] Длина волны: {period_sec} сек, Тест идет: {duration_sec} сек.")
|
||
|
||
gen = LogGenerator()
|
||
|
||
# Математика волны
|
||
amplitude = (max_rps - min_rps) / 2.0 # Размах волны
|
||
offset = (max_rps + min_rps) / 2.0 # Центр волны
|
||
|
||
start_time = time.time()
|
||
logs_sent = 0
|
||
last_print_sec = -1
|
||
|
||
while True:
|
||
elapsed = time.time() - start_time
|
||
if elapsed >= duration_sec:
|
||
break
|
||
|
||
# Вычисляем текущий RPS по формуле: Offset - Amplitude * cos(2 * pi * t / T)
|
||
# Начинаем с -cos, чтобы старт был ровно с min_rps, а не с середины
|
||
current_rps = offset - amplitude * math.cos(2 * math.pi * elapsed / period_sec)
|
||
|
||
with current_rps_var.get_lock():
|
||
current_rps_var.value = current_rps
|
||
|
||
# Защита от деления на ноль (если задали min_rps = 0)
|
||
current_rps = max(0.1, current_rps)
|
||
delay = 1.0 / current_rps
|
||
|
||
loop_start = time.time()
|
||
|
||
# 1. Генерируем и отправляем лог
|
||
term = gen.generate()
|
||
log_text = term.render(0.5).text
|
||
queue.put(log_text)
|
||
logs_sent += 1
|
||
|
||
# --- Блок красивого вывода (раз в секунду показываем текущий напор) ---
|
||
current_sec = int(elapsed)
|
||
if current_sec > last_print_sec:
|
||
# Рисуем "градусник" нагрузки для наглядности
|
||
bar_len = int((current_rps / max_rps) * 20)
|
||
bar = "█" * bar_len + "░" * (20 - bar_len)
|
||
## print(f"[ГЕНЕРАТОР] Нагрузка: {current_rps:5.1f} RPS | {bar} | Отправлено: {logs_sent}")
|
||
last_print_sec = current_sec
|
||
# ----------------------------------------------------------------------
|
||
|
||
# 2. Ждем оставшееся время до следующего лога
|
||
work_time = time.time() - loop_start
|
||
sleep_time = delay - work_time
|
||
|
||
if sleep_time > 0:
|
||
time.sleep(sleep_time)
|
||
|
||
# Завершаем работу
|
||
queue.put(None)
|
||
print(f"[ГЕНЕРАТОР] Завершен. Всего сгенерировано логов: {logs_sent}")
|
||
|
||
|
||
# --- ПРОЦЕСС 2: ОБРАБОТЧИК (ВАШ КЛАСС) ---
|
||
def log_processor(queue: Queue, model_path: str, db_path: str, processed_count: Value):
|
||
"""Достает логи из очереди и обрабатывает их. Замеряет свою скорость."""
|
||
## print(f"[ОБРАБОТЧИК] Инициализация модели и БД...")
|
||
|
||
# ВАЖНО: Инициализировать кластер нужно ВНУТРИ процесса,
|
||
# чтобы SQLite и PyTorch не сошли с ума при передаче между процессами.
|
||
clusterer = StreamingLogCluster(model_path, db_path)
|
||
## print(f"[ОБРАБОТЧИК] Готов к приему данных!")
|
||
|
||
start_time = time.time()
|
||
|
||
while True:
|
||
try:
|
||
# Ждем лог из очереди (не более 5 секунд)
|
||
log_text = queue.get(timeout=50)
|
||
|
||
# Если пришел сигнал остановки - выходим
|
||
if log_text is None:
|
||
break
|
||
|
||
# Обрабатываем лог
|
||
clusterer.process(log_text)
|
||
with processed_count.get_lock():
|
||
processed_count.value += 1
|
||
|
||
# Каждые 50 логов выводим статистику
|
||
# if processed_count % 50 == 0:
|
||
# q_size = queue.qsize() # Сколько логов скопилось в очереди
|
||
# elapsed = time.time() - start_time
|
||
# current_rps = processed_count / elapsed
|
||
# print(
|
||
# f"[ОБРАБОТЧИК] Обработано: {processed_count} | Скорость: {current_rps:.1f} логов/сек | В очереди ждет: {q_size}")
|
||
|
||
except Empty:
|
||
print("[ОБРАБОТЧИК] Очередь пуста слишком долго. Завершаю работу.")
|
||
break
|
||
|
||
total_time = time.time() - start_time
|
||
print("-" * 40)
|
||
print(f"[ОБРАБОТЧИК] ИТОГИ:")
|
||
print(f" Всего обработано: {processed_count.value}")
|
||
print(f" Затрачено времени: {total_time:.2f} сек")
|
||
print(f" Средняя скорость: {processed_count.value / total_time:.2f} логов/сек")
|
||
print("-" * 40)
|
||
clusterer.close()
|
||
|
||
|
||
def monitor_process(queue: Queue, duration_sec: float, processed_count: Value, current_rps_generation: Value):
|
||
"""Монитор с расчетом реального RPS и состояния очереди."""
|
||
start_time = time.time()
|
||
last_print_time = 0
|
||
last_processed_count = 0 # Сколько логов мы обработали в прошлый раз
|
||
|
||
print(f"\n{'Время(с)'} | {'RPS (обработка)'} | {'RPS (генератор)'} | {'Очередь (логов)'}")
|
||
print("-" * 45)
|
||
|
||
while True:
|
||
elapsed = time.time() - start_time
|
||
|
||
# Условие выхода: прошло время теста + небольшой запас
|
||
if elapsed > duration_sec + 2:
|
||
break
|
||
|
||
# Выводим отчет каждые 2 секунды
|
||
if elapsed - last_print_time >= 2.0:
|
||
current_processed = processed_count.value
|
||
|
||
# Считаем RPS за прошедший интервал (2 секунды)
|
||
delta_logs = current_processed - last_processed_count
|
||
current_rps = delta_logs / (elapsed - last_print_time)
|
||
|
||
# Размер очереди
|
||
q_size = queue.qsize()
|
||
|
||
print(f"{int(elapsed)} | {current_rps} | {current_rps_generation.value} | {q_size}")
|
||
|
||
# Обновляем "состояние" для следующей итерации
|
||
last_print_time = elapsed
|
||
last_processed_count = current_processed
|
||
|
||
time.sleep(0.5)
|
||
|
||
|
||
# --- ТОЧКА ВХОДА ---
|
||
if __name__ == '__main__':
|
||
# Настройки Синусоиды
|
||
MIN_RPS = 1 # Минимум логов в секунду (на спаде)
|
||
MAX_RPS = 100 # Максимум логов в секунду (на пике)
|
||
PERIOD_SEC = 20.0 # Полный цикл от минимума до минимума займет 20 секунд
|
||
DURATION_SEC = 120.0 # Тестируем ровно 2 минуту (получится ровно 3 волны)
|
||
MODEL_PATH = '../Resources/model'
|
||
DB_FILE = "../Resources/logs.db"
|
||
|
||
if os.path.exists(DB_FILE):
|
||
os.remove(DB_FILE)
|
||
|
||
# 1. Общие переменные для мониторинга
|
||
processed_counter = Value('i', 0) # Счетчик обработанных логов
|
||
current_rps = Value('f', 0.0) # Счетчик генерируемых rps
|
||
log_queue = Queue()
|
||
|
||
# 2. Запуск процессов
|
||
proc_processor = Process(target=log_processor, args=(log_queue, MODEL_PATH, DB_FILE, processed_counter))
|
||
proc_generator = Process(target=load_generator_sin,
|
||
args=(log_queue, MIN_RPS, MAX_RPS, PERIOD_SEC, DURATION_SEC, current_rps))
|
||
proc_monitor = Process(target=monitor_process, args=(log_queue, DURATION_SEC, processed_counter, current_rps))
|
||
|
||
proc_monitor.start()
|
||
proc_processor.start()
|
||
time.sleep(2)
|
||
proc_generator.start()
|
||
|
||
proc_generator.join()
|
||
proc_processor.join()
|
||
proc_monitor.join()
|