User Guide
The guide will walk you through building a simple producer and consumer using the
blazingmq
package. This guide however does not cover some of the more complex
concepts including message properties, queue options, queue configuration and the
Message
object. For reference documentation see the API Reference.
Simple Producer
This is the basic example for a BlazingMQ producer posting a single message:
import blazingmq
queue_uri = "bmq://bmq.tutorial.workqueue/example_queue"
with blazingmq.Session(blazingmq.session_events.log_session_event) as session:
session.open_queue(queue_uri, write=True)
session.post(queue_uri, b"Some message here")
session.close_queue(queue_uri)
First, you need create an instance of the Session
object.
The only required positional argument is a callback that takes a single argument of
type SessionEvent
. For example, the on_session_event
callback could look like:
def on_session_event(event):
print(event)
In the following example however, we use the library function
session_events.log_session_event
which provides some default configured logging for
incoming session events:
import blazingmq
with blazingmq.Session(blazingmq.session_events.log_session_event) as session:
# session can be used inside this block
Note
There should be only one Session
object per process since it is very
heavyweight, holds a lot of state, and consumes both broker and operating system
resources.
The session is the object responsible for network connections, thread pools, internal
memory and storage. The context manager will make sure that its resources are
correctly managed even if an exception is raised in the block. On enter, the context
manager will ensure that the Session
is started and valid and on exit, it will
ensure that the Session
is stopped and cleaned up.
Using the Session
, you can open a queue in write mode. Using write=True
enables
our queue for posting messages:
session.open_queue(queue_uri, write=True)
Once opened, you can use the queue URI to post a message on the session like:
session.post(queue_uri, b"Some message here")
The Session.post
optionally also takes an on_ack
callback if the user wants to
receive an acknowledgment for the message being posted. This on_ack
callback will
be invoked with the result of the post.
from blazingmq
def on_ack_callback(ack):
if ack.status != blazingmq.AckStatus.SUCCESS:
print("Post failed")
print("Post success")
session.post(queue_uri, b"Some message here", on_ack=on_ack_callback)
Note
This is the only way for a producer to ensure that a message was received by the BlazingMQ framework via the BlazingMQ broker.
Warning
Invoking any queue related method on the Session
object that invoked
the on_ack
or on_session_event
callback will lead to a deadlock. That
means invoking Session.open_queue
, Session.configure_queue
, or
Session.close_queue
in the callback will deadlock. Additionally, invoking
Session.stop
will also deadlock. This implies that the only acceptable Session
method to be called from the callback is is Session.confirm
, and consequently
MessageHandle.confirm
.
You can use this callback to implement a few useful patterns that are documented in the Acknowledgments section.
Additionally, you can associate message properties with messages being posted. This is documented in Message Properties
Finally, you need to close the queue when you have finished using it.
session.close_queue(queue_uri)
Note
Any acknowledgments that are still outstanding will be negatively acknowledged
before Session.close_queue
returns.
Simple Consumer
This is a basic example for a BlazingMQ consumer, printing and confirming all incoming messages:
import blazingmq
import signal
import threading
def on_message_callback(message, message_handle):
print(message.data)
message_handle.confirm()
event = threading.Event()
def handler(signum, frame):
print("Goodbye!")
event.set()
signal.signal(signal.SIGINT, handler)
queue_uri = "bmq://bmq.tutorial.workqueue/example_queue"
with blazingmq.Session(blazingmq.session_events.log_session_event,
on_message=on_message_callback) as session:
session.open_queue(queue_uri, read=True)
event.wait()
session.configure_queue(queue_uri, blazingmq.QueueOptions(0, 0, 0))
session.close_queue(queue_uri)
The first thing you need to do for any BlazingMQ application is to create the
Session
. Since we intend to consume messages from a queue opened in read
mode, we also want to specify the optional on_message
callback in addition
to the required on_session_event
callback:
with blazingmq.Session(blazingmq.session_events.log_session_event,
on_message=on_message_callback) as session:
# session can be used inside this block
Note
There should be only one Session
object per process since it is very
heavyweight, holds a lot of state, and consumes both broker and operating system
resources.
Note
The on_message
callback will receive messages for all queues in read mode. If
the program is reading from multiple queues, Message.queue_uri
will indicate which
queue this message is associated with.
Warning
Invoking any queue related method on the Session
object that invoked
the on_message
or on_session_event
callback will lead to a deadlock. That
means invoking Session.open_queue
, Session.configure_queue
, or
Session.close_queue
in the callback will deadlock. Additionally, invoking
Session.stop
will also deadlock. This implies that the only acceptable Session
method to be called from the callback is Session.confirm
, and consequently
MessageHandle.confirm
.
You can then use the Session
to open a queue. When you are opening a queue in read
mode, you must specify an on_message
callback to process incoming messages as
documented above:
session.open_queue(queue_uri, read=True)
Note
You can create a queue that is both a producer and a consumer,
by passing in both read=True
and write=True
.
To open the queue you need to provide the URI that uniquely identifies the
queue within the BlazingMQ framework
(bmq://bmq.tutorial.workqueue/example_queue
). To open it in read mode,
read=True
is used.
Note
The QueueOptions
parameter has been elided, and
the default is being used.
When Session.open_queue
method returns, messages directed towards the specified
queue will start being received in the on_message_callback
.
Once you get a Message
object in the callback, you can inspect the data inside
the message:
print(message.data)
The data contained inside will be of type bytes
. To correctly decode the data
inside the Message
object you need to know the encoding that the producer
used when it placed the message in the queue. This could be JSON, XML, BER or
any other type of encoding. From the perspective of BlazingMQ, the encoding
does not matter since only bytes are transmitted.
Assuming at this point the processing of the message was successful and you do
not want to receive it again, you can call Session.confirm
with this message
passed as an argument. This will notify the BlazingMQ broker that the message
should not be re-delivered to another consumer.
session.confirm(message)
Alternatively, an instance of blazingmq.MessageHandle
is received along with every message.
It can be used to confirm the message with which it was received. Notice that you don’t
need to pass the message as an argument.
message_handle.confirm()
At the end, when the queue has served its purpose, you want to first pause incoming
messages and ensure in-flight callbacks to finish processing by calling
Session.configure_queue
with zero-ed queue options:
session.configure_queue(queue_uri, blazingmq.QueueOptions(0, 0, 0))
For more about QueueOptions
and Session.configure_queue
, please refer to
the Queue Options section.
Finally, once this returns successfully, you can safely close it by calling
Session.close_queue
with the appropriate queue URI:
session.close_queue(queue_uri)
Once this method returns, you will no longer receive messages for the queue and the
queue URI can no longer be used for any operations, other than Session.open_queue
.
Host Health Monitoring
You can pass host_health_monitor=None
to the Session
constructor if you
don’t want any host health monitoring, in which case you won’t be able to use
the suspends_on_bad_host_health queue option, and you will never get any host
health related session events.
For testing purposes, you can pass an instance of BasicHealthMonitor
as the
host_health_monitor argument for the Session
constructor, and your tests
can control whether the Session
believes the host is healthy or not by
calling the set_healthy
and set_unhealthy
methods of that instance.
New in version 0.7.0: Host health monitoring and queue suspension