Files
LogPatternExtractor/Infrostructure/ProtocolCoder/MessageEncoder.py
2026-05-02 18:33:38 +03:00

219 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from Infrostructure.ProtocolCoder.BitReader import BitReader
from Infrostructure.ProtocolCoder.BitWriter import BitWriter
class MessageEncoder:
def __init__(self):
pass
def encode_protocol(self, template_id, variables, section_power=3):
# --- 1. Секция заголовков ---
writer = BitWriter()
# Поле 1: Размер секции (1 байт)
# Здесь указываем саму степень (например, 3)
writer.add_bits(section_power, 8)
# Вычисляем размер одной секции в битах (S)
section_size_bits = 1 << section_power
# Максимальное число, которое можно записать в поле, описывающее длину (например, для 8 бит это 255)
max_len_per_section = (1 << section_size_bits) - 1
# Поле 2: Зарезервированная область (4 секции)
# 4 секции * section_size_bits
writer.add_bits(0, 4 * section_size_bits)
# --- 2. Секция шаблона ---
# Определяем битовую длину ID шаблона
# Если ID=0, нужно хотя бы 1 бит, но bit_length() вернет 0, обрабатываем это
tn = template_id.bit_length() if template_id > 0 else 1
# Поле 3: Размер следующей секции (tn) в секциях (размер поля = 1 секция)
# Внимание: в ТЗ написано "1 секция размер следующей секции ... в битах".
writer.add_bits(tn, section_size_bits)
# Поле 4: Идентификатор шаблона (tn бит)
writer.add_bits(template_id, tn)
# --- 3. Секции данных ---
for var_id, var_val in variables:
# Подготовка значения переменной
if isinstance(var_val, str):
# Если строка, берем код первого символа (для примера 'A' -> 65)
# Для полноценных строк нужно кодировать в байты, здесь упрощение под "числовые переменные"
if len(var_val) == 1:
val_int = ord(var_val)
else:
# Если пришла длинная строка, кодируем как большое число
val_bytes = var_val.encode('utf-8')
val_int = int.from_bytes(val_bytes, byteorder='big')
else:
val_int = var_val
# Определяем необходимые биты для значения и ID
# Используем bit_length для максимальной компактности
# Однако, в примере ID=1 (1 бит) записан в 4 бита.
# Алгоритм: берем минимально необходимый размер, либо выравниваем, если требуется.
# ТЗ: "вписываются в максимально компактном виде". Значит, берем реальный bit_length.
# Биты для значения
val_total_bits = val_int.bit_length() if val_int > 0 else 1
# Биты для ID
id_bits = var_id.bit_length() if var_id > 0 else 1
# Логика разбиения на секции, если значение не влезает в одну секцию описания размера.
# Поле размера (xn) само имеет размер 1 секцию (например, 8 бит).
# Значит, максимальная длина блока данных = 255 бит.
# Если val_total_bits > 255, нужно разбивать на несколько секций данных.
bits_left = val_total_bits
# Для корректной нарезки битов большого числа нам удобно преобразовать его в строку или срезать маской
# Но проще математически брать куски от старших бит к младшим или наоборот.
# Порядок записи битов: обычно Big Endian.
while bits_left > 0:
# Определяем, сколько бит значения запишем в этот блок
# Либо всё что осталось, либо максимум, который можно описать одним числом в поле размера
chunk_size = min(bits_left, max_len_per_section)
# Вырезаем нужный кусок (chunk) из числа val_int
# Нам нужны старшие биты из оставшихся.
# Пример: всего 10 бит, берем 8. Нужно сдвинуть (10-8)=2 раза вправо.
shift = bits_left - chunk_size
chunk_val = (val_int >> shift) & ((1 << chunk_size) - 1)
# Поле 5: Размер ID в битах (n) - занимает 1 секцию
writer.add_bits(id_bits, section_size_bits)
# Поле 6: Размер блока значения в битах (xn) - занимает 1 секцию
writer.add_bits(chunk_size, section_size_bits)
# Поле 7: Идентификатор (n бит)
writer.add_bits(var_id, id_bits)
# Поле 8: Блок значения (xn бит)
writer.add_bits(chunk_val, chunk_size)
bits_left -= chunk_size
return writer.get_bytes()
def decode_protocol(self, data):
"""
Декодирует бинарные данные обратно в ID шаблона и список переменных.
:param data: bytes объект
:return: кортеж (template_id, list_of_variables)
где list_of_variables это список кортежей (var_id, value)
"""
reader = BitReader(data)
# --- 1. Секция заголовков ---
if not reader.has_bits(8):
raise ValueError("Пустые данные или некорректный заголовок")
# 1. Размер секции (степень двойки)
section_power = reader.read_bits(8)
section_size = 1 << section_power # 2^power
# 2. Пропускаем зарезервированную область (4 секции)
reader.read_bits(4 * section_size)
# --- 2. Секция шаблона ---
# 3. Размер ID шаблона (1 секция)
tn = reader.read_bits(section_size)
# 4. Идентификатор шаблона (tn бит)
template_id = reader.read_bits(tn)
# --- 3. Секции данных ---
variables = []
last_var_id = None
# Читаем, пока есть данные.
# Минимальный блок данных требует 2 секции заголовков (размер ID и размер значения)
while reader.has_bits(2 * section_size):
# 5. Размер ID переменной (1 секция)
n = reader.read_bits(section_size)
# 6. Размер значения переменной (1 секция)
xn = reader.read_bits(section_size)
# Проверяем, хватает ли бит на само тело данных
# (Это может случиться, если в конце файла "мусорные" нули для выравнивания байта)
if not reader.has_bits(n + xn):
break
# 7. Идентификатор переменной
var_id = reader.read_bits(n)
# 8. Значение переменной (часть значения)
chunk_value = reader.read_bits(xn)
# Логика склеивания (Reassembly):
# Если ID текущей переменной совпадает с ID последней добавленной,
# значит это продолжение большого числа, которое было разбито на секции.
# Энкодер писал старшие части первыми (Big Endian logic в чанках),
# поэтому мы сдвигаем старое значение и добавляем новый кусок.
if last_var_id is not None and var_id == last_var_id:
# Получаем предыдущее значение
_, prev_val = variables.pop()
# Сдвигаем его влево на размер нового куска и добавляем новый кусок
new_val = (prev_val << xn) | chunk_value
variables.append((var_id, new_val))
else:
# Новая переменная
variables.append((var_id, chunk_value))
last_var_id = var_id
return template_id, variables
def get_hex(self, data):
return " ".join(f"{b:02X}" for b in data)
def from_hex(self, hex_str):
return bytes.fromhex(hex_str)
def int_to_str(self, number):
if number == 0:
return ""
# 1. Вычисляем, сколько байт занимает число
# (bit_length() + 7) // 8 — это округление вверх до целого байта
num_bytes = (number.bit_length() + 7) // 8
# 2. Превращаем число в байты
# Важно использовать byteorder='big', так как энкодер записывал старшие байты первыми
bytes_data = number.to_bytes(num_bytes, byteorder='big')
# 3. Декодируем байты в строку
try:
return bytes_data.decode('utf-8')
except UnicodeDecodeError:
# Если число не является валидной utf-8 строкой, возвращаем как есть или hex
return f"<Binary: {bytes_data.hex()}>"
if __name__ == '__main__':
me = MessageEncoder()
hex = "03 00 00 00 00 01 81 27 59 18 19 1A 96 98 19 16 98 19 00 8F F9 37 B7 BA 00"
# Генерируем
binary_data = me.from_hex(hex)
t = time.time()
for i in range(1000):
data = me.decode_protocol(binary_data)
print((time.time() - t )*1000)
tmp = [(i[0], me.int_to_str(i[1])) if i[1] > 100000 else i for i in data[1]]
print(data[0], tmp)