User Guide
The guide will walk you through building a simple producer and consumer using the
blazingmq package. This guide however does not cover some of the more complex
concepts including message properties, queue options, queue configuration and the
Message object. For reference documentation see the API Reference.
Simple Producer
This is the basic example for a BlazingMQ producer posting a single message:
import blazingmq
queue_uri = "bmq://bmq.tutorial.workqueue/example_queue"
with blazingmq.Session(blazingmq.session_events.log_session_event) as session:
session.open_queue(queue_uri, write=True)
session.post(queue_uri, b"Some message here")
session.close_queue(queue_uri)
First, you need create an instance of the Session object.
The only required positional argument is a callback that takes a single argument of
type SessionEvent. For example, the on_session_event callback could look like:
def on_session_event(event):
print(event)
In the following example however, we use the library function
session_events.log_session_event which provides some default configured logging for
incoming session events:
import blazingmq
with blazingmq.Session(blazingmq.session_events.log_session_event) as session:
# session can be used inside this block
Note
There should be only one Session object per process since it is very
heavyweight, holds a lot of state, and consumes both broker and operating system
resources.
The session is the object responsible for network connections, thread pools, internal
memory and storage. The context manager will make sure that its resources are
correctly managed even if an exception is raised in the block. On enter, the context
manager will ensure that the Session is started and valid and on exit, it will
ensure that the Session is stopped and cleaned up.
Using the Session, you can open a queue in write mode. Using write=True enables
our queue for posting messages:
session.open_queue(queue_uri, write=True)
Once opened, you can use the queue URI to post a message on the session like:
session.post(queue_uri, b"Some message here")
The Session.post optionally also takes an on_ack callback if the user wants to
receive an acknowledgment for the message being posted. This on_ack callback will
be invoked with the result of the post.
from blazingmq
def on_ack_callback(ack):
if ack.status != blazingmq.AckStatus.SUCCESS:
print("Post failed")
print("Post success")
session.post(queue_uri, b"Some message here", on_ack=on_ack_callback)
Note
This is the only way for a producer to ensure that a message was received by the BlazingMQ framework via the BlazingMQ broker.
Warning
Invoking any queue related method on the Session object that invoked
the on_ack or on_session_event callback will lead to a deadlock. That
means invoking Session.open_queue, Session.configure_queue, or
Session.close_queue in the callback will deadlock. Additionally, invoking
Session.stop will also deadlock. This implies that the only acceptable Session
method to be called from the callback is is Session.confirm, and consequently
MessageHandle.confirm.
You can use this callback to implement a few useful patterns that are documented in the Acknowledgments section.
Additionally, you can associate message properties with messages being posted. This is documented in Message Properties
Finally, you need to close the queue when you have finished using it.
session.close_queue(queue_uri)
Note
Any acknowledgments that are still outstanding will be negatively acknowledged
before Session.close_queue returns.
Simple Consumer
This is a basic example for a BlazingMQ consumer, printing and confirming all incoming messages:
import blazingmq
import signal
import threading
def on_message_callback(message, message_handle):
print(message.data)
message_handle.confirm()
event = threading.Event()
def handler(signum, frame):
print("Goodbye!")
event.set()
signal.signal(signal.SIGINT, handler)
queue_uri = "bmq://bmq.tutorial.workqueue/example_queue"
with blazingmq.Session(blazingmq.session_events.log_session_event,
on_message=on_message_callback) as session:
session.open_queue(queue_uri, read=True)
event.wait()
session.configure_queue(queue_uri, blazingmq.QueueOptions(0, 0, 0))
session.close_queue(queue_uri)
The first thing you need to do for any BlazingMQ application is to create the
Session. Since we intend to consume messages from a queue opened in read
mode, we also want to specify the optional on_message callback in addition
to the required on_session_event callback:
with blazingmq.Session(blazingmq.session_events.log_session_event,
on_message=on_message_callback) as session:
# session can be used inside this block
Note
There should be only one Session object per process since it is very
heavyweight, holds a lot of state, and consumes both broker and operating system
resources.
Note
The on_message callback will receive messages for all queues in read mode. If
the program is reading from multiple queues, Message.queue_uri will indicate which
queue this message is associated with.
Warning
Invoking any queue related method on the Session object that invoked
the on_message or on_session_event callback will lead to a deadlock. That
means invoking Session.open_queue, Session.configure_queue, or
Session.close_queue in the callback will deadlock. Additionally, invoking
Session.stop will also deadlock. This implies that the only acceptable Session
method to be called from the callback is Session.confirm, and consequently
MessageHandle.confirm.
You can then use the Session to open a queue. When you are opening a queue in read
mode, you must specify an on_message callback to process incoming messages as
documented above:
session.open_queue(queue_uri, read=True)
Note
You can create a queue that is both a producer and a consumer,
by passing in both read=True and write=True.
To open the queue you need to provide the URI that uniquely identifies the
queue within the BlazingMQ framework
(bmq://bmq.tutorial.workqueue/example_queue). To open it in read mode,
read=True is used.
Note
The QueueOptions parameter has been elided, and
the default is being used.
When Session.open_queue method returns, messages directed towards the specified
queue will start being received in the on_message_callback.
Once you get a Message object in the callback, you can inspect the data inside
the message:
print(message.data)
The data contained inside will be of type bytes. To correctly decode the data
inside the Message object you need to know the encoding that the producer
used when it placed the message in the queue. This could be JSON, XML, BER or
any other type of encoding. From the perspective of BlazingMQ, the encoding
does not matter since only bytes are transmitted.
Assuming at this point the processing of the message was successful and you do
not want to receive it again, you can call Session.confirm with this message
passed as an argument. This will notify the BlazingMQ broker that the message
should not be re-delivered to another consumer.
session.confirm(message)
Alternatively, an instance of blazingmq.MessageHandle is received along with every message.
It can be used to confirm the message with which it was received. Notice that you don’t
need to pass the message as an argument.
message_handle.confirm()
At the end, when the queue has served its purpose, you want to first pause incoming
messages and ensure in-flight callbacks to finish processing by calling
Session.configure_queue with zero-ed queue options:
session.configure_queue(queue_uri, blazingmq.QueueOptions(0, 0, 0))
For more about QueueOptions and Session.configure_queue, please refer to
the Queue Options section.
Finally, once this returns successfully, you can safely close it by calling
Session.close_queue with the appropriate queue URI:
session.close_queue(queue_uri)
Once this method returns, you will no longer receive messages for the queue and the
queue URI can no longer be used for any operations, other than Session.open_queue.
Host Health Monitoring
You can pass host_health_monitor=None to the Session constructor if you
don’t want any host health monitoring, in which case you won’t be able to use
the suspends_on_bad_host_health queue option, and you will never get any host
health related session events.
For testing purposes, you can pass an instance of BasicHealthMonitor as the
host_health_monitor argument for the Session constructor, and your tests
can control whether the Session believes the host is healthy or not by
calling the set_healthy and set_unhealthy methods of that instance.
New in version 0.7.0: Host health monitoring and queue suspension