import pika import sys class RabbitMQMessenger: def __init__(self, host='k8s.worker', username='rabbit', password='rabbit', port=32294): """ Инициализация подключения к RabbitMQ. """ self.credentials = pika.PlainCredentials(username, password) self.parameters = pika.ConnectionParameters( host=host, port=port, credentials=self.credentials, # heartbeat нужен, чтобы соединение не рвалось при долгом ожидании heartbeat=600 ) self.connection = None self.channel = None self._connect() def _connect(self): """Создаем соединение и канал.""" try: self.connection = pika.BlockingConnection(self.parameters) self.channel = self.connection.channel() except Exception as e: print(f"Ошибка подключения к RabbitMQ: {e}") sys.exit(1) def send_message(self, queue_name: str, message: str): """ Отправка сообщения в очередь. :param queue_name: Имя очереди, куда отправляем данные. :param message: Данные (текст). """ # Объявляем очередь (durable=True значит, что очередь переживет перезагрузку RabbitMQ) self.channel.queue_declare(queue=queue_name, durable=True) self.channel.basic_publish( exchange='', routing_key=queue_name, body=message.encode('utf-8'), # Превращаем строку в байты properties=pika.BasicProperties( delivery_mode=2, # Сделать сообщение персистентным (сохранить на диске) )) print(f"[x] Отправлено в '{queue_name}': {message}") def send_binary_message(self, queue_name: str,message: bytes): # Объявляем очередь (durable=True значит, что очередь переживет перезагрузку RabbitMQ) self.channel.queue_declare(queue=queue_name, durable=True) self.channel.basic_publish( exchange='', routing_key=queue_name, body=message, # Превращаем строку в байты properties=pika.BasicProperties( delivery_mode=2, # Сделать сообщение персистентным (сохранить на диске) )) print(f"[x] Отправлено в '{queue_name}': {message}") def start_listening(self, queue_name: str, callback_function): """ Запуск прослушивания очереди (блокирует выполнение скрипта). :param queue_name: Имя очереди, которую слушаем (ответы). :param callback_function: Функция, которая будет вызвана при получении сообщения. Должна принимать один аргумент (текст сообщения). """ self.channel.queue_declare(queue=queue_name, durable=True) # prefetch_count=1 говорит RabbitMQ не давать работнику больше 1 сообщения за раз, # пока он не обработает предыдущее. self.channel.basic_qos(prefetch_count=1) # Внутренняя обертка, чтобы декодировать байты в текст перед передачей в ваш callback def internal_callback(ch, method, properties, body): text_data = body.decode('utf-8') print(f"[v] Получено из '{queue_name}'") # Вызываем вашу логику обработки callback_function(text_data) # Подтверждаем выполнение (ACK), чтобы сообщение удалилось из очереди ch.basic_ack(delivery_tag=method.delivery_tag) self.channel.basic_consume(queue=queue_name, on_message_callback=internal_callback) print(f"[*] Ожидание сообщений в очереди '{queue_name}'. Нажмите CTRL+C для выхода.") try: self.channel.start_consuming() except KeyboardInterrupt: self.close() def close(self): """Закрытие соединения.""" if self.connection and not self.connection.is_closed: self.connection.close() print("\n[!] Соединение закрыто")