Source code for protobunny.backends.redis.queues

import logging

from protobunny.backends import (
    BaseSyncQueue,
)
from protobunny.models import Envelope

log = logging.getLogger(__name__)


[docs] class SyncQueue(BaseSyncQueue): """Message queue backed by pika and RabbitMQ.""" def get_tag(self) -> str: return self.subscription
[docs] def send_message(self, topic: str, body: bytes, correlation_id: str | None = None, **kwargs): """Low-level message sending implementation. Args: topic: a topic name for direct routing or a routing key with special binding keys body: serialized message (e.g. a serialized protobuf message or a json string) correlation_id: is present for result messages Returns: """ message = Envelope( body=body, correlation_id=correlation_id or b"", ) self.get_connection().publish(topic, message)