API Reference

Core Package Async

A module providing async support for messaging and communication using the configured broker as the backend.

This module includes functionality for publishing, subscribing, and managing message queues, as well as dynamically managing imports and configurations for the backend.

exception protobunny.asyncio.ConnectionError[source]

Raised when connection operations fail.

exception protobunny.asyncio.RequeueMessage[source]

Raise when a message could not be handled but should be requeued.

protobunny.asyncio.config_lib() None[source]

Add the generated package root to the sys.path.

async protobunny.asyncio.connect(**kwargs) BaseAsyncConnection[source]

Establishes an asynchronous connection to the configured messaging broker.

This method initializes and returns a singleton async connection. Subsequent calls return the existing connection instance unless it has been disconnected.

Parameters:

**kwargs – Backend-specific connection arguments (e.g., host, port, credentials, or protocol-specific tuning parameters).

Returns:

The active asynchronous connection singleton.

Return type:

BaseAsyncConnection

async protobunny.asyncio.disconnect() None[source]

Closes the active asynchronous connection to the broker.

Gracefully terminates heartbeats and background networking tasks. Safe to call even if no connection is active.

async protobunny.asyncio.get_message_count(msg_type: PBM | type[PBM] | ModuleType) int | None[source]

Asynchronously retrieves the current number of pending messages in a queue.

Parameters:

msg_type – The message instance, class, or module representing the queue.

Returns:

The count of messages waiting to be processed, or None

if the backend does not support count retrieval for this type.

Return type:

int | None

protobunny.asyncio.get_queue(pkg_or_msg: ProtoBunnyMessage | type['ProtoBunnyMessage'] | ModuleType, backend_name: str | None = None) BaseSyncQueue | BaseAsyncQueue[source]
Factory method to get an AsyncQueue/SyncQueue instance based on
  • the message type (e.g. mylib.subpackage.subsubpackage.MyMessage)

  • the mode (e.g. async)

  • the configured backend or the parameter passed (e.g. “rabbitmq”)

Parameters:
  • pkg_or_msg – A message instance, a message class, or a module containing message definitions.

  • backend_name – backend name to use

Returns:

A queue instance configured for the relevant topic.

Return type:

Async/SyncQueue

async protobunny.asyncio.publish(message: PBM) None[source]

Asynchronously publishes a Protobuf message to its corresponding topic.

The destination topic is automatically derived from the message class and package structure. Messages within a ‘.tasks’ package are automatically treated as persistent tasks requiring reliable delivery and queuing logic.

Parameters:

message – An instance of a class derived from ProtoBunnyMessage.

async protobunny.asyncio.publish_result(result: Result, topic: str | None = None, correlation_id: str | None = None) None[source]

Asynchronously publishes a processing result to the results topic of the source message.

Parameters:
  • result – The Result object containing the response payload and source message.

  • topic – Optional override for the result topic. Defaults to the automatically generated ‘.result’ topic associated with the source message.

  • correlation_id – Optional ID used to link this result to a specific request.

async protobunny.asyncio.reset_connection() BaseAsyncConnection[source]

Resets the singleton connection and returns it.

protobunny.asyncio.run_forever(main: Callable[[...], Awaitable[None]]) None[source]

Starts the event loop and keeps the process alive to consume messages.

Installs signal handlers for SIGINT and SIGTERM to trigger an orderly async shutdown.

Parameters:

main – The entry point async function to run before entering the permanent wait state.

async protobunny.asyncio.subscribe(pkg: type[PBM] | ModuleType, callback: AsyncCallback) BaseAsyncQueue[source]

Registers an async callback to consume messages from a specific topic or package.

If a message class is provided, subscribes to that specific topic. If a module is provided, subscribes to all message types defined within that module. For shared tasks (identified by the ‘.tasks’ convention), Protobunny automatically manages shared consumer groups and load balancing.

Parameters:
  • pkg – The message class (type[PBM]) or module to subscribe to.

  • callback – An async callable that accepts the received message.

Returns:

The queue object managing the active subscription.

Return type:

BaseAsyncQueue

async protobunny.asyncio.subscribe_logger(log_callback: LoggerCallback | None = None, prefix: str | None = None) LoggingAsyncQueue[source]

Asynchronously subscribes a logging callback to monitor message traffic.

Parameters:
  • log_callback – A custom function to handle log messages. Defaults to default_log_callback.

  • prefix – An optional subject/topic prefix to filter logged messages.

Returns:

The specialized async queue object for logging.

Return type:

LoggingAsyncQueue

async protobunny.asyncio.subscribe_results(pkg: type[PBM] | ModuleType, callback: AsyncCallback) BaseAsyncQueue[source]

Asynchronously subscribes to result topics for a message type or package.

Used by services that need to listen for completion signals or data returned by workers processing specific message types.

Parameters:
  • pkg – The message class or module whose results should be monitored.

  • callback – The async function to execute when a result is received.

Returns:

The queue object managing the result subscription.

Return type:

BaseAsyncQueue

async protobunny.asyncio.unsubscribe(pkg: type[PBM] | ModuleType, if_unused: bool = True, if_empty: bool = True) None[source]

Asynchronously removes a subscription for a specific message or package.

Parameters:
  • pkg – The message class or module to unsubscribe from.

  • if_unused – If True, only unsubscribes if no other callbacks are attached.

  • if_empty – If True, only unsubscribes if the local message buffer is empty.

async protobunny.asyncio.unsubscribe_all(if_unused: bool = True, if_empty: bool = True) None[source]

Asynchronously stops all message consumption by canceling every subscription.

Clears standard subscriptions, result listeners, and task workers. Typically invoked during graceful application shutdown.

Parameters:
  • if_unused – Policy for evaluating unused standard queues.

  • if_empty – Policy for evaluating empty standard queues.

async protobunny.asyncio.unsubscribe_results(pkg: type[PBM] | ModuleType) None[source]

Remove all in-process subscriptions for a message/package result topic

Core Package Sync

A module providing support for messaging and communication using the configured broker as the backend.

This module includes functionality for publishing, subscribing, and managing message queues, as well as dynamically managing imports and configurations for the backend.

exception protobunny.ConnectionError[source]

Raised when connection operations fail.

exception protobunny.RequeueMessage[source]

Raise when a message could not be handled but should be requeued.

protobunny.connect(**kwargs: Any) BaseSyncConnection[source]

Establishes a connection to the configured messaging broker.

This method initializes and returns a singleton connection. Subsequent calls return the existing connection instance unless it has been explicitly disconnected.

Parameters:

**kwargs – Backend-specific connection arguments (e.g., host, port, credentials, or protocol-specific tuning parameters).

Returns:

The active connection singleton.

Return type:

BaseSyncConnection

protobunny.disconnect() None[source]

Closes the active connection to the broker.

Gracefully terminates heartbeats and background networking tasks. Safe to call if no connection is active.

protobunny.get_backend(backend: str | None = None) ModuleType[source]

Retrieve and import the specified backend module.

Load the backend module based on the provided name or falls back to the default backend specified in the configuration. If the backend is unavailable or cannot be imported, it exits the program.

Parameters:

backend (str | None) – The name of the backend to import. If None, the backend from the configuration is used.

Returns:

The imported backend module.

protobunny.get_message_count(msg_type: PBM | type[PBM]) int | None[source]

Retrieves the current number of pending messages in a queue.

Parameters:

msg_type – The message instance or class

Returns:

The count of messages waiting to be processed, or None

if the backend does not support count retrieval for this queue type.

Return type:

int | None

protobunny.publish(message: PBM) None[source]

Publishes a Protobuf message to its corresponding topic.

The destination topic is automatically derived from the message class and package structure. Messages within a ‘.tasks’ package are automatically treated as persistent tasks requiring reliable delivery and queuing logic across all supported backends.

Parameters:

message – An instance of a class derived from ProtoBunnyMessage.

protobunny.publish_result(result: Result, topic: str | None = None, correlation_id: str | None = None) None[source]

Publishes a processing result to be consumed by results subscribers (See subscribe_results).

Parameters:
  • result – The Result object containing the response payload and source message from which the Result was generated.

  • topic – Optional override for the result topic. Defaults to the automatically generated ‘.result’ topic associated with the source message.

  • correlation_id – Optional ID used to link this result to a specific request.

protobunny.reset_connection(**kwargs: Any) BaseSyncConnection[source]

Resets the singleton connection and returns it.

protobunny.run_forever() None[source]

Blocks the main thread to maintain active message consumption.

Installs signal handlers for SIGINT and SIGTERM to trigger an orderly shutdown, ensuring all subscriptions are canceled and the connection is closed before the process exits.

protobunny.subscribe(pkg_or_msg: type[PBM] | ModuleType, callback: SyncCallback) BaseSyncQueue[source]

Registers a callback to consume messages from a specific topic or package.

If a message class is provided, subscribes to that specific topic. If a module is provided, subscribes to all message types defined within that module. For shared tasks (identified by the ‘.tasks’ convention), Protobunny automatically manages shared consumer groups and load balancing.

Parameters:
  • pkg_or_msg – The message class (type[PBM]) or module to subscribe to.

  • callback – The function to execute when a message is received.

Returns:

The queue object managing the active subscription.

Return type:

BaseSyncQueue

protobunny.subscribe_logger(log_callback: LoggerCallback | None = None, prefix: str | None = None) LoggingSyncQueue[source]

Subscribes a specialized logging callback to monitor message traffic.

This creates a logging-specific queue that captures and logs metadata for messages matching the optional prefix.

Parameters:
  • log_callback – A custom function to handle log messages. Defaults to default_log_callback (logs routing key, cid, and content).

  • prefix – An optional subject/topic prefix to filter which messages are logged.

Returns:

The specialized queue object for logging.

Return type:

LoggingSyncQueue

protobunny.subscribe_results(pkg: type[PBM] | ModuleType, callback: SyncCallback) BaseSyncQueue[source]

Subscribes to result topics for a specific message type or package.

Used by services that need to listen for completion signals or data returned by workers processing specific message types.

Parameters:
  • pkg – The message class or module whose results should be monitored.

  • callback – The function to execute when a result message is received.

Returns:

The queue object managing the result subscription.

Return type:

BaseSyncQueue

protobunny.unsubscribe(pkg: type[PBM] | ModuleType, if_unused: bool = True, if_empty: bool = True) None[source]

Cancels an active subscription for a specific message type or package.

Parameters:
  • pkg – The message class or module to unsubscribe from.

  • if_unused – If True, only unsubscribes if no other callbacks are attached.

  • if_empty – If True, only unsubscribes if the buffer is empty.

protobunny.unsubscribe_all(if_unused: bool = True, if_empty: bool = True) None[source]

Stops all message consumption by canceling every in-process subscription.

Clears standard subscriptions, result listeners, and task workers. Typically invoked during graceful application shutdown.

Parameters:
  • if_unused – Policy for evaluating unused standard queues.

  • if_empty – Policy for evaluating empty standard queues.

protobunny.unsubscribe_results(pkg: type[PBM] | ModuleType) None[source]

Remove all in-process subscriptions for a message results topic

Parameters:

pkg – The message class or module to unsubscribe from its results topic.

Models

class protobunny.models.BaseQueue(topic: str)[source]
class protobunny.models.Envelope(body: bytes, correlation_id: str = '', delivery_mode: str = '', routing_key: str = '')[source]
class protobunny.models.IncomingMessageProtocol(*args, **kwargs)[source]

Defines a protocol for incoming messages in protobunny messaging system.

This protocol establishes the set of attributes and methods required for handling an incoming message.

body

The raw message content.

Type:

bytes

routing_key

The routing key associated with the message, which determines the message’s destination.

Type:

Optional[str]

correlation_id

An identifier that correlates the message with a specific request or context.

Type:

Optional[str]

delivery_mode

The delivery mode of the message, which could signify options like persistence or transient state.

Type:

Any

ack()[source]

Acknowledges the successful processing of the message.

reject(requeue)[source]

Rejects the message, with an optional flag indicating whether it should be requeued for processing.

class protobunny.models.MessageMixin[source]

Utility mixin for protobunny messages.

property json_content_fields: Iterable[str]

the list of fieldnames that are of type commons.JsonContent.

Type:

Returns

make_result(return_code: ReturnCode | int | None = None, error: str = '', return_value: dict[str, Any] | None = None) Result[source]

Returns a pb.results.Result message for the message, using the betterproto.lib.std.google.protobuf.Any message type.

The property result.source represents the source message.

Parameters:
  • return_code

  • error

  • return_value

Returns: a Result message.

property result_topic: str

Build the result topic name for the message.

property source: PBM

Return the source message from a Result

The source message is stored as a protobuf.Any message, with its type info and serialized value. The Result.source_message.type_url is used to instantiate the right class to deserialize the source message.

to_dict(casing: ~typing.Callable[[str, bool], str] = <function camel_case>, include_default_values: bool = False) dict[str, Any][source]

Returns a JSON serializable dict representation of this object.

Note: betterproto to_dict converts INT64 to strings, to allow js compatibility.

to_json(indent: None | int | str = None, include_default_values: bool = False, casing: ~typing.Callable[[str, bool], str] = <function camel_case>) str[source]

Overwrite the betterproto to_json to use the custom encoder

to_pydict(casing: ~typing.Callable[[str, bool], str] = <function camel_case>, include_default_values: bool = False) dict[str, Any][source]

Returns a dict representation of this object. Uses enum names instead of int values. Useful for logging

Conversely to the to_dict method, betterproto to_pydict doesn’t convert INT64 to strings.

property topic: str

Build the topic name for the message.

property type_url: str

Return the class fqn for this message.

validate_required_fields() None[source]

Raises a MissingRequiredFields if non optional fields are missing. Note: Ignore missing repeated fields. This check happens during serialization (see MessageMixin.__bytes__ method).

exception protobunny.models.MissingRequiredFields(msg: PBM, missing_fields: list[str])[source]

Exception raised by MessageMixin.validate_required_fields when required fields are missing.

class protobunny.models.ProtoBunnyMessage[source]

Base class for all protobunny messages.

protobunny.models.deserialize_message(topic: str | None, body: bytes) ProtoBunnyMessage | None[source]

Deserialize the body of a serialized pika message.

Parameters:
  • topic – str. The topic. It’s used to determine the type of message.

  • body – bytes. The serialized message

Returns:

A deserialized message.

protobunny.models.deserialize_result_message(body: bytes) Result[source]

Deserialize the result message.

Parameters:

body – bytes. The serialized protobunny.core.results.Result

Returns:

Instance of Result

protobunny.models.get_body(message: IncomingMessageProtocol) str[source]

Get the json string representation of the message body to use for the logger service. If message couldn’t be parsed, it returns the raw content.

protobunny.models.get_message_class_from_topic(topic: str) type[ProtoBunnyMessage] | None | Result[source]

Return the message class from a topic with lazy import of the user library

Parameters:

topic – the topic that represents the message queue, mapped to the message class example for redis mylib:tasks:TaskMessage -> mylib.tasks.TaskMessage class

Returns: the message class for the topic or None if the topic is not recognized

protobunny.models.get_message_class_from_type_url(url: str) type[ProtoBunnyMessage][source]

Return the message class from a topic with lazy import of the user library

Parameters:

url – the fullname message class

Returns: the message class

protobunny.models.to_json_content(data: dict) JsonContent | None[source]

Serialize an object and build a JsonContent message.

Parameters:

data – A json-serializable object

Returns: A pb.commons.JsonContent instance

class protobunny.backends.BaseAsyncConnection(**kwargs)[source]
class protobunny.backends.BaseConnection(*args, **kwargs)[source]
class protobunny.backends.BaseSyncConnection(**kwargs)[source]
connect(**kwargs) None[source]

Establish Sync connection.

Parameters:

timeout – Maximum time to wait for connection (seconds)

Raises:
  • ConnectionError – If connection fails

  • TimeoutError – If connection times out

disconnect(timeout: float = 10.0) None[source]

Close sync and the underlying async connections and stop event loop.

Parameters:

timeout – Maximum time to wait for cleanup (seconds)

classmethod get_connection(vhost: str = '') BaseSyncConnection[source]

Get singleton instance (sync).

get_consumer_count(topic: str, timeout: float = 10.0) int[source]

Get the number of messages in a queue.

Parameters:
  • topic – The queue topic

  • timeout – Maximum time to wait (seconds)

Returns:

Number of messages in the queue

Raises:
  • ConnectionError – If not connected

  • TimeoutError – If operation times out

get_message_count(topic: str, timeout: float = 10.0) int[source]

Get the number of messages in a queue.

Parameters:
  • topic – The queue topic

  • timeout – Maximum time to wait (seconds)

Returns:

Number of messages in the queue

Raises:
  • ConnectionError – If not connected

  • TimeoutError – If operation times out

is_connected() bool[source]

Check if connection is established.

publish(topic: str, message: IncomingMessageProtocol, mandatory: bool = False, immediate: bool = False, timeout: float = 10.0) None[source]

Publish a message to a topic.

Parameters:
  • topic – The routing key/topic

  • message – The message to publish

  • mandatory – If True, raise error if message cannot be routed

  • immediate – If True, publish message immediately to the queue

  • timeout – Maximum time to wait for publish (seconds)

Raises:
  • ConnectionError – If not connected

  • TimeoutError – If operation times out

purge(topic: str, timeout: float = 10.0, **kwargs) None[source]

Empty a queue of all messages.

Parameters:
  • topic – The queue topic to purge

  • timeout – Maximum time to wait (seconds)

Raises:
  • ConnectionError – If not connected

  • TimeoutError – If operation times out

subscribe(topic: str, callback: Callable, shared: bool = False, timeout: float = 10.0) str[source]

Subscribe to a queue/topic.

Parameters:
  • topic – The routing key/topic to subscribe to

  • callback – Function to handle incoming messages

  • shared – if True, use shared queue (round-robin delivery)

  • timeout – Maximum time to wait for subscription (seconds)

Returns:

Subscription tag identifier

Raises:
  • ConnectionError – If not connected

  • TimeoutError – If operation times out

unsubscribe(tag: str, timeout: float = 10.0, if_unused: bool = True, if_empty: bool = True) None[source]

Unsubscribe from a queue.

Parameters:
  • if_unused

  • if_empty

  • tag – Subscription identifier returned from subscribe()

  • timeout – Maximum time to wait (seconds)

Raises:

TimeoutError – If operation times out

class protobunny.backends.BaseSyncQueue(topic: str)[source]
get_consumer_count() int[source]

Get current message count.

get_message_count() int[source]

Get current message count.

publish(message: ProtoBunnyMessage) None[source]

Publish a message to the queue.

Parameters:

message – a ProtoBunnyMessage message

publish_result(result: Result, topic: str | None = None, correlation_id: str | None = None) None[source]

Publish a message to the results topic.

Parameters:
  • result – a amlogic_messages.results.Result message

  • topic

  • correlation_id

purge(**kwargs) None[source]

Delete all messages from the queue.

subscribe(callback: Callable[[PBM], Any] | Callable[[Any, str], Any]) None[source]

Subscribe to messages from the queue.

Parameters:

callback

subscribe_results(callback: Callable[[Result], Any]) None[source]

Subscribe to results from the queue.

See the deserialize_result method for return params.

Parameters:

callback – function to call when results come in.

unsubscribe(if_unused: bool = True, if_empty: bool = True) None[source]

Unsubscribe from the queue.

unsubscribe_results() None[source]

Unsubscribe from results. Will always delete the underlying queues

class protobunny.backends.LoggingSyncQueue(prefix: str)[source]

Represents a specialized queue for logging purposes.

>>> import protobunny as pb
>>> pb.subscribe_logger()  # it uses the default logger_callback

You can add a custom callback that accepts the envelope message from the backend and msg_content: str as arguments. The type of message will respect the protocol IncomingMessageProtocol

>>> def log_callback(message: "IncomingMessageProtocol", msg_content: str):
>>>     print(message.body)
>>> pb.subscribe_logger(log_callback)

You can use functools.partial to add more arguments

>>> def log_callback_with_args(message: aio_pika.IncomingMessage, msg_content: str, maxlength: int):
>>>     print(message.body[maxlength])
>>> import functools
>>> functools.partial(log_callback_with_args, maxlength=100)
>>> pb.subscribe_logger(log_callback_with_args)
publish(message: ProtoBunnyMessage) None[source]

Publish a message to the queue.

Parameters:

message – a ProtoBunnyMessage message

publish_result(result: Result, topic: str | None = None, correlation_id: str | None = None) None[source]

Publish a message to the results topic.

Parameters:
  • result – a amlogic_messages.results.Result message

  • topic

  • correlation_id

RabbitMQ aio backend

Implements a RabbitMQ Connection with both sync and async support using aio_pika.

class protobunny.asyncio.backends.rabbitmq.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, vhost: str = '', worker_threads: int = 2, prefetch_count: int = 1, requeue_delay: int = 3, exchange_name: str = 'amq.topic', dl_exchange: str = 'protobunny-dlx', dl_queue: str = 'protobunny-dlq', heartbeat: int = 1200, timeout: int = 1500, url: str | None = None)[source]

Async RabbitMQ Connection wrapper.

async connect(timeout: float = 30.0) Connection[source]

Establish RabbitMQ connection.

Parameters:

timeout – Maximum time to wait for connection establishment (seconds)

Raises:
  • ConnectionError – If connection fails

  • asyncio.TimeoutError – If connection times out

property connection: AbstractRobustConnection

Get the connection object.

Raises:

ConnectionError – If not connected

async disconnect(timeout: float = 10.0) None[source]

Close RabbitMQ connection and cleanup resources.

Parameters:

timeout – Maximum time to wait for cleanup (seconds)

property exchange: AbstractExchange

Get the exchange object.

Raises:

ConnectionError – If not connected

async get_consumer_count(topic: str) int[source]

Get the number of messages in a queue.

Parameters:

topic – The queue topic

Raises:

ConnectionError – If not connected

async get_message_count(topic: str) int | None[source]

Get the number of messages in a queue.

Parameters:

topic – The queue topic

Returns:

Number of messages currently in the queue

Raises:

ConnectionError – If not connected

property is_connected_event: Event

Lazily create the event in the current running loop.

property lock: Lock

Lazy instance lock.

async publish(topic: str, message: Message, mandatory: bool = True, immediate: bool = False) None[source]

Publish a message to a topic.

Parameters:
  • topic – The routing key/topic

  • message – The message to publish

  • message – The message to publish

  • mandatory – If True, raise an error if message cannot be routed

  • immediate – IF True, send message immediately to the queue

Raises:

ConnectionError – If not connected

async purge(topic: str, **kwargs) None[source]

Empty a queue of all messages.

Parameters:

topic – The queue topic to purge

Raises:

ConnectionError – If not connected

async setup_queue(topic: str, shared: bool = False) AbstractQueue[source]

Set up a RabbitMQ queue.

Parameters:
  • topic – the queue/routing key topic

  • shared – if True, all clients share the same queue and receive messages round-robin (task queue). If False, each client has its own anonymous queue and all receive copies of each message (pub/sub).

Returns:

The configured queue

Raises:

ConnectionError – If not connected

async subscribe(topic: str, callback: Callable, shared: bool = False) str[source]

Subscribe to a queue/topic.

Parameters:
  • topic – The routing key/topic to subscribe to

  • callback – Function to handle incoming messages. Should accept an aio_pika.IncomingMessage parameter.

  • shared – if True, use shared queue for round-robin delivery (task queue). If False, use anonymous queue where all subscribers receive all messages (pub/sub).

Returns:

Subscription tag identifier needed to unsubscribe later

Raises:

ConnectionError – If not connected

Example

def handle_message(message: aio_pika.IncomingMessage):
    print(f"Received: {message.body.decode()}")

tag = await conn.subscribe("my.events.*", handle_message)
async unsubscribe(tag: str, if_unused: bool = True, if_empty: bool = True) None[source]

Unsubscribe from a queue.

Parameters:
  • if_empty – will delete non empty queues if False

  • if_unused – will delete used queues if False

  • tag – The subscription identifier returned from subscribe()

Raises:

ValueError – If tag is not found

class protobunny.asyncio.backends.rabbitmq.queues.AsyncQueue(topic: str)[source]
async static send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True) None[source]

Low-level message sending implementation.

Parameters:
  • topic – a topic name for direct routing or a routing key with special binding keys

  • body – serialized message (e.g. a serialized protobuf message or a json string)

  • correlation_id – is present for result messages

  • persistent – if true will use aio_pika.DeliveryMode.PERSISTENT

Returns:

Mosquitto aio backend

Implements a Mosquitto (MQTT) Connection with both sync and async support

class protobunny.asyncio.backends.mosquitto.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, url: str | None = None, worker_threads: int = 2, requeue_delay: int = 3, **kwargs)[source]

Async Mosquitto Connection wrapper using aiomqtt.

build_topic_key(topic: str) str[source]

Joins project prefix with topic using the configured delimiter.

property is_connected_event: Event

Lazily create the event in the current running loop.

async purge(topic: str, **kwargs) None[source]

Clears the retained message for the topic. Note: This does not affect messages currently in flight to offline clients with persistent sessions.

async subscribe(topic: str, callback: Callable, shared: bool = False) str[source]

Subscribes to a topic and starts a consumer loop in the background. :param topic: :param callback: :param shared:

Returns:

The consumer tag

async unsubscribe(tag: str, **kwargs)[source]

Gracefully stops a consumer loop and unsubscribes from the broker.

class protobunny.asyncio.backends.mosquitto.queues.AsyncQueue(topic: str)[source]

Redis aio backend

Implements a Redis Connection

class protobunny.asyncio.backends.redis.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, db: int | None = None, vhost: str = '', url: str | None = None, worker_threads: int = 2, prefetch_count: int = 1, requeue_delay: int = 3, **kwargs)[source]

Async Redis Connection wrapper.

async connect(**kwargs) Connection[source]

Establish Redis connection.

Args:

Raises:
  • ConnectionError – If connection fails

  • asyncio.TimeoutError – If connection times out

property connection: Redis

Get the connection object.

Raises:

ConnectionError – If not connected

async disconnect(timeout: float = 10.0) None[source]

Close Redis connection and cleanup resources.

Parameters:

timeout – Maximum time to wait for cleanup (seconds)

async get_consumer_count(topic: str) int[source]

Get the total number of consumers across all groups for a topic.

Parameters:

topic – The stream topic

Returns:

Total number of consumers

async get_message_count(topic: str) int[source]

Get the number of messages in the Redis Stream.

Parameters:

topic – The stream topic name

Returns:

Number of entries currently in the stream.

property is_connected_event: Event

Lazily create the event in the current running loop.

property lock: Lock

Lazy instance lock.

async publish(topic: str, message: IncomingMessageProtocol, **kwargs) None[source]

Simulates Topic Exchange routing.

async purge(topic: str, reset_groups: bool = False) None[source]

Empty a Redis Stream and optionally clear all consumer groups.

Parameters:
  • topic – The stream/topic name to purge

  • reset_groups – If True, deletes all consumer groups (resets consumer count to 0)

async reset_stream_groups(stream_key: str) None[source]

Hard reset: Deletes all consumer groups for a topic. To be used with caution.

async setup_queue(topic: str, shared: bool = False) dict[source]

Set up a Redis Stream and Consumer Group for tasks if shared, otherwise use Pub/Sub.

Parameters:
  • topic – The stream key / routing key (queue name)

  • shared – If True, uses a fixed group name for round-robin. If False, just creates metadata to use to subscribe the callback

Returns:

The name of the consumer group to use

async subscribe(topic: str, callback: Callable, shared: bool = False) str[source]

Subscribe to Redis.

Parameters:
  • topic – The stream key/topic to subscribe to

  • callback – Function to handle incoming messages.

  • shared – If True, uses a shared consumer group (round-robin). If False, uses a unique group for this instance and let redis pubsub manage the routing.

Returns:

A unique subscription tag used to stop the consumer later.

class protobunny.asyncio.backends.redis.queues.AsyncQueue(topic: str)[source]

NATS aio backend

Implements a NATS Connection

class protobunny.asyncio.backends.nats.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, vhost: str = '', url: str | None = None, worker_threads: int = 2, prefetch_count: int = 1, requeue_delay: int = 3, heartbeat: int = 1200)[source]

Async NATS Connection wrapper.

async connect(**kwargs) Connection[source]

Establish NATS connection.

Args:

Raises:
  • ConnectionError – If connection fails

  • asyncio.TimeoutError – If connection times out

property connection: Client

Get the connection object.

Raises:

ConnectionError – If not connected

async disconnect(timeout: float = 10.0) None[source]

Close NATS connection and cleanup resources.

Parameters:

timeout – Maximum time to wait for cleanup (seconds)

property is_connected_event: Event

Lazily create the event in the current running loop.

property lock: Lock

Lazy instance lock.

class protobunny.asyncio.backends.nats.queues.AsyncQueue(topic: str)[source]

Python aio backend

class protobunny.asyncio.backends.python.connection.BaseLocalConnection(vhost: str = '/', requeue_delay: int = 3)[source]

Base class with shared logic for python “connections”.

class protobunny.asyncio.backends.python.connection.Connection(*args, **kwargs)[source]

Asynchronous local connection using asyncio.

static create_tag(topic: str, shared: bool) str[source]

Generate subscription tag.

is_connected() bool[source]

Check if connection is established

property is_connected_event: Event

Lazily create the event in the current running loop.

async setup_queue(topic: str, shared: bool = False) Queue[source]

Create appropriate queue type.

class protobunny.asyncio.backends.python.connection.MessageBroker[source]

Centralized message broker

async create_exclusive_queue(topic: str) Queue[source]

Create an exclusive queue for a topic.

async create_shared_queue(topic: str) Queue[source]

Get or create a shared queue.

async get_message_count(topic: str) int[source]

Get queue size of the shared queues.

async publish(topic: str, message: Envelope) bool[source]

Publish a message to all relevant queues.

async purge_queue(topic: str) None[source]

Empty a shared queue.

async remove_exclusive_queue(topic: str, queue: Queue) None[source]

Remove an exclusive queue.

async remove_shared_queues(topic: str) None[source]

Remove all in process subscriptions for a shared queue.

class protobunny.asyncio.backends.python.queues.AsyncQueue(topic: str)[source]

Message queue backed by asyncio.Queue.

async send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True)[source]

Low-level message sending implementation.

Parameters:
  • topic – a topic name for direct routing or a routing key with special binding keys

  • body – serialized message (e.g. a serialized protobuf message or a json string)

  • correlation_id – is present for result messages

  • persistent – if true will use aio_pika.DeliveryMode.PERSISTENT

Returns:

RabbitMQ backend

Implements a RabbitMQ Connection with both sync and async support using aio_pika.

class protobunny.backends.rabbitmq.connection.Connection(**kwargs)[source]

Synchronous wrapper around Async Rmq Connection.

Manages a dedicated event loop in a background thread to run async operations.

Example

with Connection() as conn:
    conn.publish("my.topic", message)
    tag = conn.subscribe("my.topic", callback)
async_class

alias of Connection

class protobunny.backends.rabbitmq.queues.SyncQueue(topic: str)[source]

Message queue backed by pika and RabbitMQ.

send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True)[source]

Low-level message sending implementation.

Parameters:
  • topic – a topic name for direct routing or a routing key with special binding keys

  • body – serialized message (e.g. a serialized protobuf message or a json string)

  • correlation_id – is present for result messages

  • persistent – if true will use aio_pika.DeliveryMode.PERSISTENT

Returns:

Mosquitto backend

Implements a Mosquitto Connection with sync support.

class protobunny.backends.mosquitto.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, url: str | None = None, worker_threads: int = 2, requeue_delay: int = 3, **kwargs)[source]

Synchronous Mosquitto Connection wrapper using paho-mqtt.

classmethod get_connection(vhost: str = '') Connection[source]

Get singleton instance (sync).

purge(topic: str, **kwargs) None[source]

Clears the retained message for the topic by publishing an empty payload. In Paho, we must wait for the publication to confirm the purge is complete.

class protobunny.backends.mosquitto.queues.SyncQueue(topic: str)[source]

Message queue backed by pika and RabbitMQ.

send_message(topic: str, body: bytes, correlation_id: str | None = None, **kwargs)[source]

Low-level message sending implementation.

Parameters:
  • topic – a topic name for direct routing or a routing key with special binding keys

  • body – serialized message (e.g. a serialized protobuf message or a json string)

  • correlation_id – is present for result messages

Returns:

Redis backend

Implements a Redis Connection with both sync and async support

class protobunny.backends.redis.connection.Connection(**kwargs)[source]

Synchronous wrapper around the async connection

Example

with SyncRedisConnection() as conn:
    conn.publish("my.topic", message)
    tag = conn.subscribe("my.topic", callback)
async_class

alias of Connection

class protobunny.backends.redis.queues.SyncQueue(topic: str)[source]

Message queue backed by pika and RabbitMQ.

send_message(topic: str, body: bytes, correlation_id: str | None = None, **kwargs)[source]

Low-level message sending implementation.

Parameters:
  • topic – a topic name for direct routing or a routing key with special binding keys

  • body – serialized message (e.g. a serialized protobuf message or a json string)

  • correlation_id – is present for result messages

Returns:

NATS backend

Implements a NATS Connection with sync methods

class protobunny.backends.nats.connection.Connection(**kwargs)[source]

Synchronous wrapper around Async Rmq Connection.

Manages a dedicated event loop in a background thread to run async operations.

Example

with Connection() as conn:
    conn.publish("my.topic", message)
    tag = conn.subscribe("my.topic", callback)
async_class

alias of Connection

class protobunny.backends.nats.queues.SyncQueue(topic: str)[source]

Message queue backed by pika and RabbitMQ.

send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True)[source]

Low-level message sending implementation.

Parameters:
  • topic – a topic name for direct routing or a routing key with special binding keys

  • body – serialized message (e.g. a serialized protobuf message or a json string)

  • correlation_id – is present for result messages

  • persistent – if true will use aio_pika.DeliveryMode.PERSISTENT

Returns:

Python backend

class protobunny.backends.python.connection.BaseLocalConnection(vhost: str = '/', requeue_delay: int = 3)[source]

Base class with shared logic for python “connections”.

class protobunny.backends.python.connection.Connection(*args, **kwargs)[source]

Synchronous local connection using threads.

static create_tag(topic: str, shared: bool) str[source]

Generate subscription tag.

property is_connected_event: Event

Lazily create the event in the current running loop.

setup_queue(topic: str, shared: bool) Queue[source]

Create appropriate queue type.

class protobunny.backends.python.queues.SyncQueue(topic: str)[source]

Message queue backed by pika and RabbitMQ.

send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True)[source]

Low-level message sending implementation.

Parameters:
  • topic – a topic name for direct routing or a routing key with special binding keys

  • body – serialized message (e.g. a serialized protobuf message or a json string)

  • correlation_id – is present for result messages

  • persistent – if true will use aio_pika.DeliveryMode.PERSISTENT

Returns:

Logger service

Logging Service

A program for logging MQTT messages with optional message filtering and length truncation.

This program subscribes to an MQTT queue and logs incoming messages based on user-defined parameters such as filtering by regex, truncating message content to a maximum length, and setting logging mode (asynchronous or synchronous). Signal handling is provided to ensure graceful shutdown.

Modules:

log_callback: Logs incoming MQTT messages to stdout with optional filtering and truncation. main_sync: Entry point for synchronous logging mode with signal handling. main: Entry point for asynchronous logging mode with signal handling.

usage: python -m protobunny.logger [-h] [-f FILTER] [-l MAX_LENGTH] [-m MODE] [-p PREFIX]

MQTT Logger

options:
-h, --help

show this help message and exit

-f FILTER, --filter FILTER

filter messages matching this regex

-l MAX_LENGTH, --max-length MAX_LENGTH

cut off messages longer than this

-m MODE, --mode MODE

Set async or sync mode. Default is async.

-p PREFIX, --prefix PREFIX

Set the prefix for the logger if different from the configured messages-prefix

protobunny utility

Protobunny tool

protobunny generate

Generate betterproto classes and automatically includes the path to the custom proto types and add the ProtoBunny mixin for the configured package (i.e. generated-package-name).

See protobunny generate --help for more options.

protobunny log

Start a logger in console. See protobunny log --help for more options.

Full configuration for pyproject.toml

[tool.protobunny]
messages-directory = 'messages'
messages-prefix = 'acme'
generated-package-name = 'mymessagelib.codegen'
generated-package-root = "./"
backend = "rabbitmq"
force-required-fields = true
mode = "async"
log-redis-tasks = true

The following command will generate protobunny decorated betterproto classes in the mymessagelib/codegen directory:

protobunny generate