Files
LogPatternExtractor/Processor/StreamingLogCluster.py
2026-05-02 18:33:38 +03:00

417 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import difflib
import os
import re
import time
from typing import List, Dict, Any, Union, Optional
import numpy as np
from sentence_transformers import SentenceTransformer, util
from Processor.Models.LogTemplate import LogTemplate
from Processor.Models.LogVariable import LogVariable
from Processor.TemplateDatabase import TemplateDatabase
class StreamingLogCluster:
# --- Константы класса для удобства настройки ---
THRESHOLD_CREATE_NEW = 0.7 #0.70
SCORE_EXACT_MATCH = 0.85
SCORE_PARTIAL_MATCH = 0.6
MAX_VAR_LEN = 32
HARD_DELIMITERS = {'|', ';', ','}
SOFT_DELIMITERS = {'=', ':', '-', '>', '<', '[', ']', '(', ')', '{', '}', '"', "'"}
def __init__(self, model_path: str, db_path: str = "logs_knowledge.db"):
self.model = SentenceTransformer(model_path)
self.db = TemplateDatabase(db_path)
# Компилируем регулярные выражения один раз
self.mask_regex = {
'guid': re.compile(r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-...'),
'ip': re.compile(r'\d{1,3}(?:\.\d{1,3}){3}'),
'ver': re.compile(r'\d{1,3}(?:\.\d{1,3}){2}'),
'num': re.compile(r'-?\d+(\.\d+)?'),
'base64': re.compile(r'(?<![A-Za-z0-9+/])(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?(?![A-Za-z0-9+/])')
}
token_patterns = [
r'(?P<DATE>\d{4}-\d{2}-\d{2}|\d{2}\.\d{2}\.\d{4}|\d{2}/\d{2}/\d{4})',
r'(?P<TIME>\d{2}:\d{2}:\d{2}(?:\.\d+)?)',
r'(?P<EMAIL>[\w\.-]+@[\w\.-]+\.\w+)',
r'(?P<IP>\d{1,3}(?:\.\d{1,3}){3})',
r'(?P<VER>\d{1,3}(?:\.\d{1,3}){2})',
r'(?P<GUID>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-...)',
r'(?P<WORD>[a-zA-Z0-9_]+)',
r'(?P<SYMBOL>[^\w\s])',
r'(?P<SPACE>\s+)'
]
self.master_regex = re.compile('|'.join(token_patterns))
self.var_type_names = {'DATE', 'TIME', 'EMAIL', 'IP', 'GUID', "VER"}
# --- Легковесный индекс в ОЗУ ---
self.template_ids: List[int] = []
self.embeddings: Optional[np.ndarray] = None
self.template_id_counter = self.db.get_max_id() + 1
self._load_index()
def _load_index(self):
"""Загружает ТОЛЬКО векторы и ID из БД, экономя оперативную память."""
print("📥 Загрузка векторного индекса из БД...")
# Принимаем в одну переменную (это просто список)
index_data = self.db.load_index_data()
# Если список пуст (БД пустая), безопасно выходим
if not index_data:
print("✅ База пуста.")
self.template_ids = []
self.embeddings = None
return
raw_templates, _ = index_data
ids = []
vecs = []
for row in raw_templates:
uid, _, emb_blob, _, _ = row
ids.append(uid)
vecs.append(np.frombuffer(emb_blob, dtype=np.float32))
self.template_ids = ids
self.embeddings = np.array(vecs)
print(f"✅ Готово. В индексе шаблонов: {len(self.template_ids)}")
def close(self):
self.db.close()
# --- Утилиты ---
def _tokenize(self, text: str) -> List[str]:
return [m.group() for m in self.master_regex.finditer(text)]
def _mask_for_search(self, text: str) -> str:
text = self.mask_regex['guid'].sub('<GUID>', text)
text = self.mask_regex['ip'].sub('<IP>', text)
text = self.mask_regex['num'].sub('<NUM>', text)
return text
def _detect_var_type(self, value: str) -> str:
match = self.master_regex.fullmatch(value)
return match.lastgroup if match and match.lastgroup in self.var_type_names else "VAR"
# --- Логика Кластеризации ---
def _find_best_match(self, input_vec: np.ndarray, log_text: str) -> Optional[int]:
"""Ищет лучший шаблон по косинусному сходству, используя только RAM-индекс."""
if self.embeddings is None or len(self.template_ids) == 0:
return None
scores = util.cos_sim(input_vec, self.embeddings)[0]
best_idx = scores.argmax().item()
best_score = scores[best_idx].item()
best_id = self.template_ids[best_idx]
if best_score > self.SCORE_EXACT_MATCH:
return best_id
if best_score > self.SCORE_PARTIAL_MATCH:
# Для проверки токенов придется подгрузить кандидата из БД
cand = self. _load_template_from_db(best_id)
cand_tokens = cand.get_tokens_as_str_list()
new_tokens = self._tokenize(log_text)
ratio = difflib.SequenceMatcher(None, cand_tokens, new_tokens).ratio()
if ratio > self.THRESHOLD_CREATE_NEW:
return best_id
return None
def process(self, log_text: str) -> Dict[str, Any]:
"""Основной пайплайн обработки лога."""
masked_input = self._mask_for_search(log_text)
input_vec = self.model.encode(masked_input)
best_id = self._find_best_match(input_vec, log_text)
if best_id is not None:
# Шаблон найден -> Грузим его из БД (ленивая загрузка)
template = self._load_template_from_db(best_id)
# Обновляем вектор скользящим средним
n = template.hits
updated_vec = (template.embedding * n + input_vec) / (n + 1)
template.embedding = updated_vec
# Обновляем вектор в RAM
idx = self.template_ids.index(best_id)
self.embeddings[idx] = updated_vec
return self._update_and_extract(template, log_text)
else:
# Шаблон не найден -> Создаем новый
return self._create_new_template(log_text, input_vec)
def process_time_measure(self,log_text: str) -> (float, float, float):
"""Основной пайплайн обработки лога."""
t1 = time.time()
masked_input = self._mask_for_search(log_text)
input_vec = self.model.encode(masked_input)
t2 = time.time()
best_id = self._find_best_match(input_vec, log_text)
if best_id is not None:
# Шаблон найден -> Грузим его из БД (ленивая загрузка)
template = self._load_template_from_db(best_id)
# Обновляем вектор скользящим средним
n = template.hits
updated_vec = (template.embedding * n + input_vec) / (n + 1)
template.embedding = updated_vec
# Обновляем вектор в RAM
idx = self.template_ids.index(best_id)
self.embeddings[idx] = updated_vec
t3 = time.time()
self._update_and_extract(template, log_text)
else:
t3 = time.time()
# Шаблон не найден -> Создаем новый
self._create_new_template(log_text, input_vec)
t4 = time.time()
return t2-t1, t3-t2, t4-t3
# --- Создание и обновление шаблонов ---
def _create_new_template(self, log_text: str, vector: np.ndarray) -> Dict[str, Any]:
tokens = self._tokenize(log_text)
new_tpl = LogTemplate(self.template_id_counter, tokens, log_text)
new_tpl.embedding = vector
# Добавляем в RAM индекс
self.template_ids.append(new_tpl.uid)
if self.embeddings is None:
self.embeddings = np.array([vector])
else:
self.embeddings = np.vstack([self.embeddings, vector])
self.template_id_counter += 1
self.db.save_template(new_tpl)
return {
'template_id': new_tpl.uid,
'template_view': new_tpl.render(),
'variables': [],
'status': 'created'
}
def _update_and_extract(self, template: LogTemplate, log_text: str) -> Dict[str, Any]:
new_tokens = self._tokenize(log_text)
old_tokens_str = template.get_tokens_as_str_list()
matcher = difflib.SequenceMatcher(None, old_tokens_str, new_tokens)
updated_template_tokens = []
extracted_variables = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal':
updated_template_tokens.extend(template.tokens[i1:i2])
elif tag == 'replace':
log_vals = new_tokens[j1:j2]
tpl_seg = template.tokens[i1:i2]
# Если заменяем существующую переменную
if len(tpl_seg) == 1 and isinstance(tpl_seg[0], LogVariable):
var = tpl_seg[0]
full_text = "".join(log_vals)
is_bloated = len(full_text) > self.MAX_VAR_LEN
has_hard = any(t.strip() in self.HARD_DELIMITERS for t in log_vals)
has_space = any(t.isspace() for t in log_vals)
has_soft = any(t.strip() in self.SOFT_DELIMITERS for t in log_vals)
if has_hard or has_space or (is_bloated and has_soft):
decomposed, new_vars = self._decompose_segment(log_vals, template, var.initial_value)
updated_template_tokens.extend(decomposed)
extracted_variables.extend(new_vars)
else:
updated_template_tokens.append(var)
if full_text != var.initial_value:
extracted_variables.append(self._make_delta(var, full_text))
else:
# Заменяем текст -> формируем новые переменные
init_hint = "".join(t.initial_value if isinstance(t, LogVariable) else str(t) for t in tpl_seg)
decomposed, new_vars = self._decompose_segment(log_vals, template, init_hint)
updated_template_tokens.extend(decomposed)
extracted_variables.extend(new_vars)
elif tag == 'delete':
tpl_seg = template.tokens[i1:i2]
if len(tpl_seg) == 1 and isinstance(tpl_seg[0], LogVariable):
var = tpl_seg[0]
updated_template_tokens.append(var)
if var.initial_value != "":
extracted_variables.append(self._make_delta(var, ""))
else:
new_var = LogVariable(template.get_next_var_id(), initial_value="".join(str(t) for t in tpl_seg))
updated_template_tokens.append(new_var)
if new_var.initial_value != "":
extracted_variables.append(self._make_delta(new_var, ""))
elif tag == 'insert':
decomposed, new_vars = self._decompose_segment(new_tokens[j1:j2], template, "")
updated_template_tokens.extend(decomposed)
extracted_variables.extend(new_vars)
template.tokens = updated_template_tokens
template.hits += 1
self.db.save_template(template)
return {
'template_id': template.uid,
'template_view': template.render(),
'variables': extracted_variables,
'status': 'updated'
}
# --- Вспомогательные методы для логики извлечения ---
def _decompose_segment(self, tokens_list: List[str], template: LogTemplate, initial_hint: str):
"""Разбивает сегмент на переменные и статические токены."""
full_text = "".join(tokens_list)
is_bloated = len(full_text) > self.MAX_VAR_LEN
result_structure = []
extracted_vars = []
current_var_tokens = []
def flush_var():
if not current_var_tokens:
return
val = "".join(current_var_tokens)
v_type = self._detect_var_type(val)
init = initial_hint if len(result_structure) == 0 else ""
new_v = LogVariable(template.get_next_var_id(), initial_value=init, var_type=v_type)
result_structure.append(new_v)
if val != new_v.initial_value:
extracted_vars.append(self._make_delta(new_v, val))
current_var_tokens.clear()
for token in tokens_list:
t_strip = token.strip()
should_split = (t_strip in self.HARD_DELIMITERS) or token.isspace() or (
is_bloated and t_strip in self.SOFT_DELIMITERS)
if should_split:
flush_var()
result_structure.append(token)
else:
current_var_tokens.append(token)
flush_var()
return result_structure, extracted_vars
def _make_delta(self, var: LogVariable, actual_value: str) -> Dict[str, Any]:
"""Формирует словарь дельты (изменения) для переменной."""
return {
'uid': var.uid,
'name': str(var),
'value': actual_value,
'initial': var.initial_value
}
# --- Интеграция с БД (Ленивая загрузка) ---
def _load_template_from_db(self, uid: int) -> LogTemplate:
"""Восстанавливает конкретный шаблон из БД."""
row, vars_map = self.db.get_template_data_by_id(uid)
if not row:
raise ValueError(f"Шаблон с ID {uid} не найден в БД!")
template_id, pattern, emb_blob, hits, local_cnt = row
# Передаем vars_map напрямую, так как там уже лежат переменные только этого шаблона
tokens = self._hydrate_pattern(pattern, vars_map)
tpl = LogTemplate(template_id, tokens, pattern)
tpl.embedding = np.frombuffer(emb_blob, dtype=np.float32)
tpl.hits = hits
tpl.local_var_counter = local_cnt
return tpl
def _hydrate_pattern(self, pattern: str, tpl_vars: Dict[int, LogVariable]) -> List:
parts = re.split(r'(<[A-Z]+_\d+>)', pattern)
tokens = []
for part in parts:
if not part: continue
if part.startswith('<') and part.endswith('>'):
match = re.match(r'<([A-Z]+)_(\d+)>', part)
if match:
v_type, v_id_str = match.groups()
v_id = int(v_id_str)
if v_id in tpl_vars:
tokens.append(tpl_vars[v_id])
else:
tokens.append(LogVariable(v_id, var_type=v_type))
continue
tokens.extend(self._tokenize(part))
return tokens
if __name__ == '__main__':
MODEL_PATH = '../Resources/model'
DB_FILE = "logs.db"
if os.path.exists(DB_FILE):
os.remove(DB_FILE)
print("--- ЗАПУСК: Delta Mode ---")
clusterer = StreamingLogCluster(MODEL_PATH, db_path=DB_FILE)
# 1. Создаем шаблон.
# Переменных нет, так как все значения становятся "дефолтными" (initial).
log1 = "2025-01-01 User admin login"
res1 = clusterer.process(log1)
print(f"Log 1: {log1} -> ID: {res1['template_id']}")
print(f" VARS (Delta): {res1['variables']}")
# Ожидание: [], так как при создании шаблона текущие значения становятся Initial.
# 2. Меняем admin -> guest.
# Должна вернуться ТОЛЬКО переменная гостя. Дата та же - она не вернется!
log2 = "2025-01-01 User guest login"
res2 = clusterer.process(log2)
print(f"\nLog 2: {log2} -> ID: {res2['template_id']}")
# Красивый вывод дельты
if res2['variables']:
print(" CHANGES DETECTED:")
for v in res2['variables']:
print(f" * {v['name']} changed from '{v['initial']}' to '{v['value']}'")
else:
print(" NO CHANGES (Full match with template defaults)")
# 3. Меняем всё (Дата + Юзер)
log3 = "2025-02-02 User root login"
res3 = clusterer.process(log3)
print(f"\nLog 3: {log3} -> ID: {res3['template_id']}")
if res3['variables']:
print(" CHANGES DETECTED:")
for v in res3['variables']:
print(f" * {v['name']} ('{v['initial']}') to '{v['value']}'")
# 4. Возвращаемся к оригиналу (admin + старая дата)
# Должен вернуться пустой список, так как это идеальное совпадение с Initials
log4 = "2025-01-01 User admin login"
res4 = clusterer.process(log4)
print(f"\nLog 4 (Revert): {log4} -> ID: {res4['template_id']}")
print(f" VARS (Delta): {res4['variables']}")