Source code for protobunny.backends.nats.queues

import logging

from protobunny.backends import (
    BaseSyncQueue,
)
from protobunny.models import Envelope

log = logging.getLogger(__name__)


[docs] class SyncQueue(BaseSyncQueue): """Message queue backed by pika and RabbitMQ.""" def get_tag(self) -> str: return self.subscription
[docs] def send_message( self, topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True ): """Low-level message sending implementation. Args: topic: a topic name for direct routing or a routing key with special binding keys body: serialized message (e.g. a serialized protobuf message or a json string) correlation_id: is present for result messages persistent: if true will use aio_pika.DeliveryMode.PERSISTENT Returns: """ message = Envelope( body=body, correlation_id=correlation_id, ) self.get_connection().publish(topic, message)