API Reference¶
Core Package Async¶
A module providing async support for messaging and communication using the configured broker as the backend.
This module includes functionality for publishing, subscribing, and managing message queues, as well as dynamically managing imports and configurations for the backend.
- exception protobunny.asyncio.RequeueMessage[source]¶
Raise when a message could not be handled but should be requeued.
- async protobunny.asyncio.connect(**kwargs) BaseAsyncConnection[source]¶
Establishes an asynchronous connection to the configured messaging broker.
This method initializes and returns a singleton async connection. Subsequent calls return the existing connection instance unless it has been disconnected.
- Parameters:
**kwargs – Backend-specific connection arguments (e.g., host, port, credentials, or protocol-specific tuning parameters).
- Returns:
The active asynchronous connection singleton.
- Return type:
- async protobunny.asyncio.disconnect() None[source]¶
Closes the active asynchronous connection to the broker.
Gracefully terminates heartbeats and background networking tasks. Safe to call even if no connection is active.
- async protobunny.asyncio.get_message_count(msg_type: PBM | type[PBM] | ModuleType) int | None[source]¶
Asynchronously retrieves the current number of pending messages in a queue.
- Parameters:
msg_type – The message instance, class, or module representing the queue.
- Returns:
- The count of messages waiting to be processed, or None
if the backend does not support count retrieval for this type.
- Return type:
int | None
- protobunny.asyncio.get_queue(pkg_or_msg: ProtoBunnyMessage | type['ProtoBunnyMessage'] | ModuleType, backend_name: str | None = None) BaseSyncQueue | BaseAsyncQueue[source]¶
- Factory method to get an AsyncQueue/SyncQueue instance based on
the message type (e.g. mylib.subpackage.subsubpackage.MyMessage)
the mode (e.g. async)
the configured backend or the parameter passed (e.g. “rabbitmq”)
- Parameters:
pkg_or_msg – A message instance, a message class, or a module containing message definitions.
backend_name – backend name to use
- Returns:
A queue instance configured for the relevant topic.
- Return type:
Async/SyncQueue
- async protobunny.asyncio.publish(message: PBM) None[source]¶
Asynchronously publishes a Protobuf message to its corresponding topic.
The destination topic is automatically derived from the message class and package structure. Messages within a ‘.tasks’ package are automatically treated as persistent tasks requiring reliable delivery and queuing logic.
- Parameters:
message – An instance of a class derived from ProtoBunnyMessage.
- async protobunny.asyncio.publish_result(result: Result, topic: str | None = None, correlation_id: str | None = None) None[source]¶
Asynchronously publishes a processing result to the results topic of the source message.
- Parameters:
result – The Result object containing the response payload and source message.
topic – Optional override for the result topic. Defaults to the automatically generated ‘.result’ topic associated with the source message.
correlation_id – Optional ID used to link this result to a specific request.
- async protobunny.asyncio.reset_connection() BaseAsyncConnection[source]¶
Resets the singleton connection and returns it.
- protobunny.asyncio.run_forever(main: Callable[[...], Awaitable[None]]) None[source]¶
Starts the event loop and keeps the process alive to consume messages.
Installs signal handlers for SIGINT and SIGTERM to trigger an orderly async shutdown.
- Parameters:
main – The entry point async function to run before entering the permanent wait state.
- async protobunny.asyncio.subscribe(pkg: type[PBM] | ModuleType, callback: AsyncCallback) BaseAsyncQueue[source]¶
Registers an async callback to consume messages from a specific topic or package.
If a message class is provided, subscribes to that specific topic. If a module is provided, subscribes to all message types defined within that module. For shared tasks (identified by the ‘.tasks’ convention), Protobunny automatically manages shared consumer groups and load balancing.
- Parameters:
pkg – The message class (type[PBM]) or module to subscribe to.
callback – An async callable that accepts the received message.
- Returns:
The queue object managing the active subscription.
- Return type:
BaseAsyncQueue
- async protobunny.asyncio.subscribe_logger(log_callback: LoggerCallback | None = None, prefix: str | None = None) LoggingAsyncQueue[source]¶
Asynchronously subscribes a logging callback to monitor message traffic.
- Parameters:
log_callback – A custom function to handle log messages. Defaults to default_log_callback.
prefix – An optional subject/topic prefix to filter logged messages.
- Returns:
The specialized async queue object for logging.
- Return type:
LoggingAsyncQueue
- async protobunny.asyncio.subscribe_results(pkg: type[PBM] | ModuleType, callback: AsyncCallback) BaseAsyncQueue[source]¶
Asynchronously subscribes to result topics for a message type or package.
Used by services that need to listen for completion signals or data returned by workers processing specific message types.
- Parameters:
pkg – The message class or module whose results should be monitored.
callback – The async function to execute when a result is received.
- Returns:
The queue object managing the result subscription.
- Return type:
BaseAsyncQueue
- async protobunny.asyncio.unsubscribe(pkg: type[PBM] | ModuleType, if_unused: bool = True, if_empty: bool = True) None[source]¶
Asynchronously removes a subscription for a specific message or package.
- Parameters:
pkg – The message class or module to unsubscribe from.
if_unused – If True, only unsubscribes if no other callbacks are attached.
if_empty – If True, only unsubscribes if the local message buffer is empty.
- async protobunny.asyncio.unsubscribe_all(if_unused: bool = True, if_empty: bool = True) None[source]¶
Asynchronously stops all message consumption by canceling every subscription.
Clears standard subscriptions, result listeners, and task workers. Typically invoked during graceful application shutdown.
- Parameters:
if_unused – Policy for evaluating unused standard queues.
if_empty – Policy for evaluating empty standard queues.
Core Package Sync¶
A module providing support for messaging and communication using the configured broker as the backend.
This module includes functionality for publishing, subscribing, and managing message queues, as well as dynamically managing imports and configurations for the backend.
- exception protobunny.ConnectionError[source]
Raised when connection operations fail.
- exception protobunny.RequeueMessage[source]
Raise when a message could not be handled but should be requeued.
- protobunny.connect(**kwargs: Any) BaseSyncConnection[source]
Establishes a connection to the configured messaging broker.
This method initializes and returns a singleton connection. Subsequent calls return the existing connection instance unless it has been explicitly disconnected.
- Parameters:
**kwargs – Backend-specific connection arguments (e.g., host, port, credentials, or protocol-specific tuning parameters).
- Returns:
The active connection singleton.
- Return type:
- protobunny.disconnect() None[source]
Closes the active connection to the broker.
Gracefully terminates heartbeats and background networking tasks. Safe to call if no connection is active.
- protobunny.get_backend(backend: str | None = None) ModuleType[source]
Retrieve and import the specified backend module.
Load the backend module based on the provided name or falls back to the default backend specified in the configuration. If the backend is unavailable or cannot be imported, it exits the program.
- Parameters:
backend (str | None) – The name of the backend to import. If None, the backend from the configuration is used.
- Returns:
The imported backend module.
- protobunny.get_message_count(msg_type: PBM | type[PBM]) int | None[source]
Retrieves the current number of pending messages in a queue.
- Parameters:
msg_type – The message instance or class
- Returns:
- The count of messages waiting to be processed, or None
if the backend does not support count retrieval for this queue type.
- Return type:
int | None
- protobunny.publish(message: PBM) None[source]
Publishes a Protobuf message to its corresponding topic.
The destination topic is automatically derived from the message class and package structure. Messages within a ‘.tasks’ package are automatically treated as persistent tasks requiring reliable delivery and queuing logic across all supported backends.
- Parameters:
message – An instance of a class derived from ProtoBunnyMessage.
- protobunny.publish_result(result: Result, topic: str | None = None, correlation_id: str | None = None) None[source]
Publishes a processing result to be consumed by results subscribers (See subscribe_results).
- Parameters:
result – The Result object containing the response payload and source message from which the Result was generated.
topic – Optional override for the result topic. Defaults to the automatically generated ‘.result’ topic associated with the source message.
correlation_id – Optional ID used to link this result to a specific request.
- protobunny.reset_connection(**kwargs: Any) BaseSyncConnection[source]
Resets the singleton connection and returns it.
- protobunny.run_forever() None[source]
Blocks the main thread to maintain active message consumption.
Installs signal handlers for SIGINT and SIGTERM to trigger an orderly shutdown, ensuring all subscriptions are canceled and the connection is closed before the process exits.
- protobunny.subscribe(pkg_or_msg: type[PBM] | ModuleType, callback: SyncCallback) BaseSyncQueue[source]
Registers a callback to consume messages from a specific topic or package.
If a message class is provided, subscribes to that specific topic. If a module is provided, subscribes to all message types defined within that module. For shared tasks (identified by the ‘.tasks’ convention), Protobunny automatically manages shared consumer groups and load balancing.
- Parameters:
pkg_or_msg – The message class (type[PBM]) or module to subscribe to.
callback – The function to execute when a message is received.
- Returns:
The queue object managing the active subscription.
- Return type:
- protobunny.subscribe_logger(log_callback: LoggerCallback | None = None, prefix: str | None = None) LoggingSyncQueue[source]
Subscribes a specialized logging callback to monitor message traffic.
This creates a logging-specific queue that captures and logs metadata for messages matching the optional prefix.
- Parameters:
log_callback – A custom function to handle log messages. Defaults to default_log_callback (logs routing key, cid, and content).
prefix – An optional subject/topic prefix to filter which messages are logged.
- Returns:
The specialized queue object for logging.
- Return type:
- protobunny.subscribe_results(pkg: type[PBM] | ModuleType, callback: SyncCallback) BaseSyncQueue[source]
Subscribes to result topics for a specific message type or package.
Used by services that need to listen for completion signals or data returned by workers processing specific message types.
- Parameters:
pkg – The message class or module whose results should be monitored.
callback – The function to execute when a result message is received.
- Returns:
The queue object managing the result subscription.
- Return type:
- protobunny.unsubscribe(pkg: type[PBM] | ModuleType, if_unused: bool = True, if_empty: bool = True) None[source]
Cancels an active subscription for a specific message type or package.
- Parameters:
pkg – The message class or module to unsubscribe from.
if_unused – If True, only unsubscribes if no other callbacks are attached.
if_empty – If True, only unsubscribes if the buffer is empty.
- protobunny.unsubscribe_all(if_unused: bool = True, if_empty: bool = True) None[source]
Stops all message consumption by canceling every in-process subscription.
Clears standard subscriptions, result listeners, and task workers. Typically invoked during graceful application shutdown.
- Parameters:
if_unused – Policy for evaluating unused standard queues.
if_empty – Policy for evaluating empty standard queues.
- protobunny.unsubscribe_results(pkg: type[PBM] | ModuleType) None[source]
Remove all in-process subscriptions for a message results topic
- Parameters:
pkg – The message class or module to unsubscribe from its results topic.
Models¶
- class protobunny.models.Envelope(body: bytes, correlation_id: str = '', delivery_mode: str = '', routing_key: str = '')[source]¶
- class protobunny.models.IncomingMessageProtocol(*args, **kwargs)[source]¶
Defines a protocol for incoming messages in protobunny messaging system.
This protocol establishes the set of attributes and methods required for handling an incoming message.
- body¶
The raw message content.
- Type:
bytes
- routing_key¶
The routing key associated with the message, which determines the message’s destination.
- Type:
Optional[str]
- correlation_id¶
An identifier that correlates the message with a specific request or context.
- Type:
Optional[str]
- delivery_mode¶
The delivery mode of the message, which could signify options like persistence or transient state.
- Type:
Any
- class protobunny.models.MessageMixin[source]¶
Utility mixin for protobunny messages.
- property json_content_fields: Iterable[str]¶
the list of fieldnames that are of type commons.JsonContent.
- Type:
Returns
- make_result(return_code: ReturnCode | int | None = None, error: str = '', return_value: dict[str, Any] | None = None) Result[source]¶
Returns a pb.results.Result message for the message, using the betterproto.lib.std.google.protobuf.Any message type.
The property result.source represents the source message.
- Parameters:
return_code
error
return_value
Returns: a Result message.
- property result_topic: str¶
Build the result topic name for the message.
- property source: PBM¶
Return the source message from a Result
The source message is stored as a protobuf.Any message, with its type info and serialized value. The Result.source_message.type_url is used to instantiate the right class to deserialize the source message.
- to_dict(casing: ~typing.Callable[[str, bool], str] = <function camel_case>, include_default_values: bool = False) dict[str, Any][source]¶
Returns a JSON serializable dict representation of this object.
Note: betterproto to_dict converts INT64 to strings, to allow js compatibility.
- to_json(indent: None | int | str = None, include_default_values: bool = False, casing: ~typing.Callable[[str, bool], str] = <function camel_case>) str[source]¶
Overwrite the betterproto to_json to use the custom encoder
- to_pydict(casing: ~typing.Callable[[str, bool], str] = <function camel_case>, include_default_values: bool = False) dict[str, Any][source]¶
Returns a dict representation of this object. Uses enum names instead of int values. Useful for logging
Conversely to the to_dict method, betterproto to_pydict doesn’t convert INT64 to strings.
- property topic: str¶
Build the topic name for the message.
- property type_url: str¶
Return the class fqn for this message.
- exception protobunny.models.MissingRequiredFields(msg: PBM, missing_fields: list[str])[source]¶
Exception raised by MessageMixin.validate_required_fields when required fields are missing.
- protobunny.models.deserialize_message(topic: str | None, body: bytes) ProtoBunnyMessage | None[source]¶
Deserialize the body of a serialized pika message.
- Parameters:
topic – str. The topic. It’s used to determine the type of message.
body – bytes. The serialized message
- Returns:
A deserialized message.
- protobunny.models.deserialize_result_message(body: bytes) Result[source]¶
Deserialize the result message.
- Parameters:
body – bytes. The serialized protobunny.core.results.Result
- Returns:
Instance of Result
- protobunny.models.get_body(message: IncomingMessageProtocol) str[source]¶
Get the json string representation of the message body to use for the logger service. If message couldn’t be parsed, it returns the raw content.
- protobunny.models.get_message_class_from_topic(topic: str) type[ProtoBunnyMessage] | None | Result[source]¶
Return the message class from a topic with lazy import of the user library
- Parameters:
topic – the topic that represents the message queue, mapped to the message class example for redis mylib:tasks:TaskMessage -> mylib.tasks.TaskMessage class
Returns: the message class for the topic or None if the topic is not recognized
- protobunny.models.get_message_class_from_type_url(url: str) type[ProtoBunnyMessage][source]¶
Return the message class from a topic with lazy import of the user library
- Parameters:
url – the fullname message class
Returns: the message class
- protobunny.models.to_json_content(data: dict) JsonContent | None[source]¶
Serialize an object and build a JsonContent message.
- Parameters:
data – A json-serializable object
Returns: A pb.commons.JsonContent instance
- class protobunny.backends.BaseSyncConnection(**kwargs)[source]¶
- connect(**kwargs) None[source]¶
Establish Sync connection.
- Parameters:
timeout – Maximum time to wait for connection (seconds)
- Raises:
ConnectionError – If connection fails
TimeoutError – If connection times out
- disconnect(timeout: float = 10.0) None[source]¶
Close sync and the underlying async connections and stop event loop.
- Parameters:
timeout – Maximum time to wait for cleanup (seconds)
- classmethod get_connection(vhost: str = '') BaseSyncConnection[source]¶
Get singleton instance (sync).
- get_consumer_count(topic: str, timeout: float = 10.0) int[source]¶
Get the number of messages in a queue.
- Parameters:
topic – The queue topic
timeout – Maximum time to wait (seconds)
- Returns:
Number of messages in the queue
- Raises:
ConnectionError – If not connected
TimeoutError – If operation times out
- get_message_count(topic: str, timeout: float = 10.0) int[source]¶
Get the number of messages in a queue.
- Parameters:
topic – The queue topic
timeout – Maximum time to wait (seconds)
- Returns:
Number of messages in the queue
- Raises:
ConnectionError – If not connected
TimeoutError – If operation times out
- publish(topic: str, message: IncomingMessageProtocol, mandatory: bool = False, immediate: bool = False, timeout: float = 10.0) None[source]¶
Publish a message to a topic.
- Parameters:
topic – The routing key/topic
message – The message to publish
mandatory – If True, raise error if message cannot be routed
immediate – If True, publish message immediately to the queue
timeout – Maximum time to wait for publish (seconds)
- Raises:
ConnectionError – If not connected
TimeoutError – If operation times out
- purge(topic: str, timeout: float = 10.0, **kwargs) None[source]¶
Empty a queue of all messages.
- Parameters:
topic – The queue topic to purge
timeout – Maximum time to wait (seconds)
- Raises:
ConnectionError – If not connected
TimeoutError – If operation times out
- subscribe(topic: str, callback: Callable, shared: bool = False, timeout: float = 10.0) str[source]¶
Subscribe to a queue/topic.
- Parameters:
topic – The routing key/topic to subscribe to
callback – Function to handle incoming messages
shared – if True, use shared queue (round-robin delivery)
timeout – Maximum time to wait for subscription (seconds)
- Returns:
Subscription tag identifier
- Raises:
ConnectionError – If not connected
TimeoutError – If operation times out
- unsubscribe(tag: str, timeout: float = 10.0, if_unused: bool = True, if_empty: bool = True) None[source]¶
Unsubscribe from a queue.
- Parameters:
if_unused
if_empty
tag – Subscription identifier returned from subscribe()
timeout – Maximum time to wait (seconds)
- Raises:
TimeoutError – If operation times out
- class protobunny.backends.BaseSyncQueue(topic: str)[source]¶
- publish(message: ProtoBunnyMessage) None[source]¶
Publish a message to the queue.
- Parameters:
message – a ProtoBunnyMessage message
- publish_result(result: Result, topic: str | None = None, correlation_id: str | None = None) None[source]¶
Publish a message to the results topic.
- Parameters:
result – a amlogic_messages.results.Result message
topic
correlation_id
- subscribe(callback: Callable[[PBM], Any] | Callable[[Any, str], Any]) None[source]¶
Subscribe to messages from the queue.
- Parameters:
callback
- subscribe_results(callback: Callable[[Result], Any]) None[source]¶
Subscribe to results from the queue.
See the deserialize_result method for return params.
- Parameters:
callback – function to call when results come in.
- class protobunny.backends.LoggingSyncQueue(prefix: str)[source]¶
Represents a specialized queue for logging purposes.
>>> import protobunny as pb >>> pb.subscribe_logger() # it uses the default logger_callbackYou can add a custom callback that accepts the envelope message from the backend and msg_content: str as arguments. The type of message will respect the protocol IncomingMessageProtocol
>>> def log_callback(message: "IncomingMessageProtocol", msg_content: str): >>> print(message.body) >>> pb.subscribe_logger(log_callback)You can use functools.partial to add more arguments
>>> def log_callback_with_args(message: aio_pika.IncomingMessage, msg_content: str, maxlength: int): >>> print(message.body[maxlength]) >>> import functools >>> functools.partial(log_callback_with_args, maxlength=100) >>> pb.subscribe_logger(log_callback_with_args)
- publish(message: ProtoBunnyMessage) None[source]¶
Publish a message to the queue.
- Parameters:
message – a ProtoBunnyMessage message
RabbitMQ aio backend¶
Implements a RabbitMQ Connection with both sync and async support using aio_pika.
- class protobunny.asyncio.backends.rabbitmq.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, vhost: str = '', worker_threads: int = 2, prefetch_count: int = 1, requeue_delay: int = 3, exchange_name: str = 'amq.topic', dl_exchange: str = 'protobunny-dlx', dl_queue: str = 'protobunny-dlq', heartbeat: int = 1200, timeout: int = 1500, url: str | None = None)[source]¶
Async RabbitMQ Connection wrapper.
- async connect(timeout: float = 30.0) Connection[source]¶
Establish RabbitMQ connection.
- Parameters:
timeout – Maximum time to wait for connection establishment (seconds)
- Raises:
ConnectionError – If connection fails
asyncio.TimeoutError – If connection times out
- property connection: AbstractRobustConnection¶
Get the connection object.
- Raises:
ConnectionError – If not connected
- async disconnect(timeout: float = 10.0) None[source]¶
Close RabbitMQ connection and cleanup resources.
- Parameters:
timeout – Maximum time to wait for cleanup (seconds)
- property exchange: AbstractExchange¶
Get the exchange object.
- Raises:
ConnectionError – If not connected
- async get_consumer_count(topic: str) int[source]¶
Get the number of messages in a queue.
- Parameters:
topic – The queue topic
- Raises:
ConnectionError – If not connected
- async get_message_count(topic: str) int | None[source]¶
Get the number of messages in a queue.
- Parameters:
topic – The queue topic
- Returns:
Number of messages currently in the queue
- Raises:
ConnectionError – If not connected
- property is_connected_event: Event¶
Lazily create the event in the current running loop.
- property lock: Lock¶
Lazy instance lock.
- async publish(topic: str, message: Message, mandatory: bool = True, immediate: bool = False) None[source]¶
Publish a message to a topic.
- Parameters:
topic – The routing key/topic
message – The message to publish
message – The message to publish
mandatory – If True, raise an error if message cannot be routed
immediate – IF True, send message immediately to the queue
- Raises:
ConnectionError – If not connected
- async purge(topic: str, **kwargs) None[source]¶
Empty a queue of all messages.
- Parameters:
topic – The queue topic to purge
- Raises:
ConnectionError – If not connected
- async setup_queue(topic: str, shared: bool = False) AbstractQueue[source]¶
Set up a RabbitMQ queue.
- Parameters:
topic – the queue/routing key topic
shared – if True, all clients share the same queue and receive messages round-robin (task queue). If False, each client has its own anonymous queue and all receive copies of each message (pub/sub).
- Returns:
The configured queue
- Raises:
ConnectionError – If not connected
- async subscribe(topic: str, callback: Callable, shared: bool = False) str[source]¶
Subscribe to a queue/topic.
- Parameters:
topic – The routing key/topic to subscribe to
callback – Function to handle incoming messages. Should accept an aio_pika.IncomingMessage parameter.
shared – if True, use shared queue for round-robin delivery (task queue). If False, use anonymous queue where all subscribers receive all messages (pub/sub).
- Returns:
Subscription tag identifier needed to unsubscribe later
- Raises:
ConnectionError – If not connected
Example
def handle_message(message: aio_pika.IncomingMessage): print(f"Received: {message.body.decode()}") tag = await conn.subscribe("my.events.*", handle_message)
- async unsubscribe(tag: str, if_unused: bool = True, if_empty: bool = True) None[source]¶
Unsubscribe from a queue.
- Parameters:
if_empty – will delete non empty queues if False
if_unused – will delete used queues if False
tag – The subscription identifier returned from subscribe()
- Raises:
ValueError – If tag is not found
- class protobunny.asyncio.backends.rabbitmq.queues.AsyncQueue(topic: str)[source]¶
- async static send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True) None[source]¶
Low-level message sending implementation.
- Parameters:
topic – a topic name for direct routing or a routing key with special binding keys
body – serialized message (e.g. a serialized protobuf message or a json string)
correlation_id – is present for result messages
persistent – if true will use aio_pika.DeliveryMode.PERSISTENT
Returns:
Mosquitto aio backend¶
Implements a Mosquitto (MQTT) Connection with both sync and async support
- class protobunny.asyncio.backends.mosquitto.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, url: str | None = None, worker_threads: int = 2, requeue_delay: int = 3, **kwargs)[source]¶
Async Mosquitto Connection wrapper using aiomqtt.
- build_topic_key(topic: str) str[source]¶
Joins project prefix with topic using the configured delimiter.
- property is_connected_event: Event¶
Lazily create the event in the current running loop.
- async purge(topic: str, **kwargs) None[source]¶
Clears the retained message for the topic. Note: This does not affect messages currently in flight to offline clients with persistent sessions.
- class protobunny.asyncio.backends.mosquitto.queues.AsyncQueue(topic: str)[source]
Redis aio backend¶
Implements a Redis Connection
- class protobunny.asyncio.backends.redis.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, db: int | None = None, vhost: str = '', url: str | None = None, worker_threads: int = 2, prefetch_count: int = 1, requeue_delay: int = 3, **kwargs)[source]¶
Async Redis Connection wrapper.
- async connect(**kwargs) Connection[source]¶
Establish Redis connection.
Args:
- Raises:
ConnectionError – If connection fails
asyncio.TimeoutError – If connection times out
- property connection: Redis¶
Get the connection object.
- Raises:
ConnectionError – If not connected
- async disconnect(timeout: float = 10.0) None[source]¶
Close Redis connection and cleanup resources.
- Parameters:
timeout – Maximum time to wait for cleanup (seconds)
- async get_consumer_count(topic: str) int[source]¶
Get the total number of consumers across all groups for a topic.
- Parameters:
topic – The stream topic
- Returns:
Total number of consumers
- async get_message_count(topic: str) int[source]¶
Get the number of messages in the Redis Stream.
- Parameters:
topic – The stream topic name
- Returns:
Number of entries currently in the stream.
- property is_connected_event: Event¶
Lazily create the event in the current running loop.
- property lock: Lock¶
Lazy instance lock.
- async publish(topic: str, message: IncomingMessageProtocol, **kwargs) None[source]¶
Simulates Topic Exchange routing.
- async purge(topic: str, reset_groups: bool = False) None[source]¶
Empty a Redis Stream and optionally clear all consumer groups.
- Parameters:
topic – The stream/topic name to purge
reset_groups – If True, deletes all consumer groups (resets consumer count to 0)
- async reset_stream_groups(stream_key: str) None[source]¶
Hard reset: Deletes all consumer groups for a topic. To be used with caution.
- async setup_queue(topic: str, shared: bool = False) dict[source]¶
Set up a Redis Stream and Consumer Group for tasks if shared, otherwise use Pub/Sub.
- Parameters:
topic – The stream key / routing key (queue name)
shared – If True, uses a fixed group name for round-robin. If False, just creates metadata to use to subscribe the callback
- Returns:
The name of the consumer group to use
- async subscribe(topic: str, callback: Callable, shared: bool = False) str[source]¶
Subscribe to Redis.
- Parameters:
topic – The stream key/topic to subscribe to
callback – Function to handle incoming messages.
shared – If True, uses a shared consumer group (round-robin). If False, uses a unique group for this instance and let redis pubsub manage the routing.
- Returns:
A unique subscription tag used to stop the consumer later.
- class protobunny.asyncio.backends.redis.queues.AsyncQueue(topic: str)[source]
NATS aio backend¶
Implements a NATS Connection
- class protobunny.asyncio.backends.nats.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, vhost: str = '', url: str | None = None, worker_threads: int = 2, prefetch_count: int = 1, requeue_delay: int = 3, heartbeat: int = 1200)[source]¶
Async NATS Connection wrapper.
- async connect(**kwargs) Connection[source]¶
Establish NATS connection.
Args:
- Raises:
ConnectionError – If connection fails
asyncio.TimeoutError – If connection times out
- property connection: Client¶
Get the connection object.
- Raises:
ConnectionError – If not connected
- async disconnect(timeout: float = 10.0) None[source]¶
Close NATS connection and cleanup resources.
- Parameters:
timeout – Maximum time to wait for cleanup (seconds)
- property is_connected_event: Event¶
Lazily create the event in the current running loop.
- property lock: Lock¶
Lazy instance lock.
- class protobunny.asyncio.backends.nats.queues.AsyncQueue(topic: str)[source]
Python aio backend¶
- class protobunny.asyncio.backends.python.connection.BaseLocalConnection(vhost: str = '/', requeue_delay: int = 3)[source]
Base class with shared logic for python “connections”.
- class protobunny.asyncio.backends.python.connection.Connection(*args, **kwargs)[source]
Asynchronous local connection using asyncio.
- static create_tag(topic: str, shared: bool) str[source]
Generate subscription tag.
- is_connected() bool[source]
Check if connection is established
- property is_connected_event: Event
Lazily create the event in the current running loop.
- async setup_queue(topic: str, shared: bool = False) Queue[source]
Create appropriate queue type.
- class protobunny.asyncio.backends.python.connection.MessageBroker[source]
Centralized message broker
- async create_exclusive_queue(topic: str) Queue[source]
Create an exclusive queue for a topic.
- async create_shared_queue(topic: str) Queue[source]
Get or create a shared queue.
- async get_message_count(topic: str) int[source]
Get queue size of the shared queues.
- async purge_queue(topic: str) None[source]
Empty a shared queue.
- async remove_exclusive_queue(topic: str, queue: Queue) None[source]
Remove an exclusive queue.
- async remove_shared_queues(topic: str) None[source]
Remove all in process subscriptions for a shared queue.
- class protobunny.asyncio.backends.python.queues.AsyncQueue(topic: str)[source]
Message queue backed by asyncio.Queue.
- async send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True)[source]
Low-level message sending implementation.
- Parameters:
topic – a topic name for direct routing or a routing key with special binding keys
body – serialized message (e.g. a serialized protobuf message or a json string)
correlation_id – is present for result messages
persistent – if true will use aio_pika.DeliveryMode.PERSISTENT
Returns:
RabbitMQ backend¶
Implements a RabbitMQ Connection with both sync and async support using aio_pika.
- class protobunny.backends.rabbitmq.connection.Connection(**kwargs)[source]¶
Synchronous wrapper around Async Rmq Connection.
Manages a dedicated event loop in a background thread to run async operations.
Example
with Connection() as conn: conn.publish("my.topic", message) tag = conn.subscribe("my.topic", callback)
- async_class¶
alias of
Connection
- class protobunny.backends.rabbitmq.queues.SyncQueue(topic: str)[source]¶
Message queue backed by pika and RabbitMQ.
- send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True)[source]¶
Low-level message sending implementation.
- Parameters:
topic – a topic name for direct routing or a routing key with special binding keys
body – serialized message (e.g. a serialized protobuf message or a json string)
correlation_id – is present for result messages
persistent – if true will use aio_pika.DeliveryMode.PERSISTENT
Returns:
Mosquitto backend¶
Implements a Mosquitto Connection with sync support.
- class protobunny.backends.mosquitto.connection.Connection(username: str | None = None, password: str | None = None, host: str | None = None, port: int | None = None, url: str | None = None, worker_threads: int = 2, requeue_delay: int = 3, **kwargs)[source]¶
Synchronous Mosquitto Connection wrapper using paho-mqtt.
- classmethod get_connection(vhost: str = '') Connection[source]¶
Get singleton instance (sync).
- class protobunny.backends.mosquitto.queues.SyncQueue(topic: str)[source]
Message queue backed by pika and RabbitMQ.
- send_message(topic: str, body: bytes, correlation_id: str | None = None, **kwargs)[source]
Low-level message sending implementation.
- Parameters:
topic – a topic name for direct routing or a routing key with special binding keys
body – serialized message (e.g. a serialized protobuf message or a json string)
correlation_id – is present for result messages
Returns:
Redis backend¶
Implements a Redis Connection with both sync and async support
- class protobunny.backends.redis.connection.Connection(**kwargs)[source]¶
Synchronous wrapper around the async connection
Example
with SyncRedisConnection() as conn: conn.publish("my.topic", message) tag = conn.subscribe("my.topic", callback)
- async_class¶
alias of
Connection
- class protobunny.backends.redis.queues.SyncQueue(topic: str)[source]
Message queue backed by pika and RabbitMQ.
- send_message(topic: str, body: bytes, correlation_id: str | None = None, **kwargs)[source]
Low-level message sending implementation.
- Parameters:
topic – a topic name for direct routing or a routing key with special binding keys
body – serialized message (e.g. a serialized protobuf message or a json string)
correlation_id – is present for result messages
Returns:
NATS backend¶
Implements a NATS Connection with sync methods
- class protobunny.backends.nats.connection.Connection(**kwargs)[source]¶
Synchronous wrapper around Async Rmq Connection.
Manages a dedicated event loop in a background thread to run async operations.
Example
with Connection() as conn: conn.publish("my.topic", message) tag = conn.subscribe("my.topic", callback)
- async_class¶
alias of
Connection
- class protobunny.backends.nats.queues.SyncQueue(topic: str)[source]
Message queue backed by pika and RabbitMQ.
- send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True)[source]
Low-level message sending implementation.
- Parameters:
topic – a topic name for direct routing or a routing key with special binding keys
body – serialized message (e.g. a serialized protobuf message or a json string)
correlation_id – is present for result messages
persistent – if true will use aio_pika.DeliveryMode.PERSISTENT
Returns:
Python backend¶
- class protobunny.backends.python.connection.BaseLocalConnection(vhost: str = '/', requeue_delay: int = 3)[source]
Base class with shared logic for python “connections”.
- class protobunny.backends.python.connection.Connection(*args, **kwargs)[source]
Synchronous local connection using threads.
- static create_tag(topic: str, shared: bool) str[source]
Generate subscription tag.
- property is_connected_event: Event
Lazily create the event in the current running loop.
- setup_queue(topic: str, shared: bool) Queue[source]
Create appropriate queue type.
- class protobunny.backends.python.queues.SyncQueue(topic: str)[source]
Message queue backed by pika and RabbitMQ.
- send_message(topic: str, body: bytes, correlation_id: str | None = None, persistent: bool = True)[source]
Low-level message sending implementation.
- Parameters:
topic – a topic name for direct routing or a routing key with special binding keys
body – serialized message (e.g. a serialized protobuf message or a json string)
correlation_id – is present for result messages
persistent – if true will use aio_pika.DeliveryMode.PERSISTENT
Returns:
Logger service¶
Logging Service¶
A program for logging MQTT messages with optional message filtering and length truncation.
This program subscribes to an MQTT queue and logs incoming messages based on user-defined parameters such as filtering by regex, truncating message content to a maximum length, and setting logging mode (asynchronous or synchronous). Signal handling is provided to ensure graceful shutdown.
- Modules:
log_callback: Logs incoming MQTT messages to stdout with optional filtering and truncation. main_sync: Entry point for synchronous logging mode with signal handling. main: Entry point for asynchronous logging mode with signal handling.
usage: python -m protobunny.logger [-h] [-f FILTER] [-l MAX_LENGTH] [-m MODE] [-p PREFIX]
MQTT Logger
- options:
- -h, --help
show this help message and exit
- -f FILTER, --filter FILTER
filter messages matching this regex
- -l MAX_LENGTH, --max-length MAX_LENGTH
cut off messages longer than this
- -m MODE, --mode MODE
Set async or sync mode. Default is async.
- -p PREFIX, --prefix PREFIX
Set the prefix for the logger if different from the configured messages-prefix
protobunny utility¶
Protobunny tool
protobunny generateGenerate betterproto classes and automatically includes the path to the custom proto types and add the ProtoBunny mixin for the configured package (i.e.
generated-package-name).See
protobunny generate --helpfor more options.protobunny logStart a logger in console. See
protobunny log --helpfor more options.Full configuration for pyproject.toml
[tool.protobunny] messages-directory = 'messages' messages-prefix = 'acme' generated-package-name = 'mymessagelib.codegen' generated-package-root = "./" backend = "rabbitmq" force-required-fields = true mode = "async" log-redis-tasks = trueThe following command will generate protobunny decorated betterproto classes in the mymessagelib/codegen directory:
protobunny generate