Recipes¶
These examples are for sync context.
For async, imports the asyncio module and the logic remains the same, just with async/await.
from protobunny import asyncio as pb
Subscribe to a queue¶
To subscribe to a specific message type, use the subscribe method. This creates an exclusive queue by default (one consumer per queue instance).
import protobunny as pb
import mymessagelib as mml
def on_message(message: mml.tests.TestMessage) -> None:
print("Received:", message.content)
# Subscribe to the message class
pb.subscribe(mml.tests.TestMessage, on_message)
# Block and wait for messages
pb.run_forever()
For the async version, run_forever accepts your main async method as coroutine, that will contain the await pb.subscribe calls.
from protobunny import asyncio as pb
import mymessagelib as mml
async def on_message(message: mml.tests.TestMessage) -> None:
print("Received:", message.content)
async def main():
await pb.subscribe(mml.tests.TestMessage, on_message)
pb.run_forever(main)
Publish¶
Publishing is straightforward. Protobunny automatically determines the correct topic and queue routing based on the message class.
import protobunny as pb
import mymessagelib as mml
# Create the message instance
msg = mml.tests.TestMessage(content="Hello World", number=42)
# Publish it
pb.publish(msg)
Results workflow¶
The results workflow allows you to send and receive feedback for a specific message, using the built-in Result message type.
Publishing a Result¶
Inside a message handler, you can create and publish a result tied to the source message.
def on_message(message: mml.tests.TestMessage) -> None:
# ... process message ...
# Create a result from the source message
result = message.make_result(
return_value={"status": "success", "processed_at": "12:00"}
)
pb.publish_result(result)
Subscribing to Results¶
To listen for results of a specific message type:
def on_result(res: pb.results.Result) -> None:
# Access the original message that triggered this result
print("Source message:", res.source)
print("Data:", res.return_value)
pb.subscribe_results(mml.tests.TestMessage, on_result)
Requeuing¶
If message processing fails and you want the broker to requeue it for another attempt, raise the RequeueMessage exception.
from protobunny import RequeueMessage
import mymessagelib as mml
def on_message(message: mml.tests.TestMessage) -> None:
try:
# Attempt processing
do_work(message)
except Exception:
# This tells the backend to put the message back in the queue
raise RequeueMessage("Service busy, retrying...")