API Reference
Session
- class blazingmq.Session(on_session_event, *, on_message=None, broker='tcp://localhost:30114', message_compression_algorithm=CompressionAlgorithmType.NONE, timeout=..., host_health_monitor=None, num_processing_threads=None, blob_buffer_size=None, channel_high_watermark=None, event_queue_watermarks=None, stats_dump_interval=None)[source]
Represents a connection with the BlazingMQ broker.
The session represents a connection to the broker. This object can be manipulated to modify the state of the application from the point of view of the broker, including opening queues, and starting or stopping the connection with the broker.
- Parameters:
on_session_event (Callable[[SessionEvent], None]) – a required callback to process
SessionEvent
events received by the session.on_message (Optional[Callable[[Message, MessageHandle], None]]) – an optional callback to process
Message
objects received by the session.broker (str) – TCP address of the broker (default: ‘tcp://localhost:30114’). If the environment variable
BMQ_BROKER_URI
is set, its value will override whatever broker address is passed via this argument.message_compression_algorithm (CompressionAlgorithmType) – the type of compression to apply to messages being posted via this session object.
timeout (Union[Timeouts, float]) – maximum number of seconds to wait for requests on this session. If not provided, reasonable defaults are used. This argument may either be a simple
float
, which sets the same timeout for each operation, or an instance of theTimeouts
class, which allows setting the timeout for each operation independently.host_health_monitor (Union[BasicHealthMonitor, None]) – A
BasicHealthMonitor
is used by default, so your tests can control whether the session sees the machine as healthy or not by callingset_healthy
andset_unhealthy
on that instance. If you instead passNone
, the session will always see the machine as healthy,HostUnhealthy
andHostHealthRestored
events will never be emitted, and the suspends_on_bad_host_health option ofQueueOptions
cannot be used.num_processing_threads (Optional[int]) – The number of threads for the SDK to use for processing events. This defaults to 1.
blob_buffer_size (Optional[int]) – The size (in bytes) of the blob buffers to use. This defaults to 4k.
channel_high_watermark (Optional[int]) – The size (in bytes) to use for the write cache high watermark on the channel. The default value is 128MB. Note that BlazingMQ reserves 4MB of this value for control messages, so the actual watermark for data published is
channel_high_watermark - 4MB
.event_queue_watermarks (Optional[tuple[int, int]]) – A tuple containing the low and high notification watermark thresholds for the buffer containing all incoming messages from the broker, respectively. A warning
SlowConsumerHighWaterMark
is emitted when the buffer reaches the high watermark value, and a notificationSlowConsumerNormal
is emitted when the buffer is back to the low watermark.stats_dump_interval (Optional[float]) – The interval (in seconds) at which to dump stats into the logs. If 0, disable the recurring dump of stats (final stats are always dumped at the end of the session). The default is 5min; the value must be a multiple of 30s, in the range
[0s - 60min]
.
- Raises:
Error – If the session start request was not successful.
BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.
ValueError – If any of the timeouts are provided and not > 0.0, or if the
stats_dump_interval
is provided and is < 0.0.
- close_queue(queue_uri, *, timeout=...)[source]
Close an opened queue at the specified queue_uri.
Close the queue at the specified queue_uri. After this method successfully returns, the queue_uri will no longer correspond to valid queue to do any operations.
Note
Invoking this method from the
on_message
oron_session_event
of theSession
or theon_ack
callback of a posted message will cause a deadlock.- Parameters:
- Raises:
Error – If the close queue request was not successful.
BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.
ValueError – If timeout is not > 0.0.
- configure_queue(queue_uri, options, *, timeout=...)[source]
Modify the options on an opened queue at the specified queue_uri.
Modify the options of the queue at the specified queue_uri. After this method successfully returns, the queue_uri will be configured with the specified options.
Note
Invoking this method from the
on_message
oron_session_event
of theSession
or theon_ack
callback of a posted message will cause a deadlock.- Parameters:
queue_uri (str) – unique resource identifier for the queue to be configured.
options (QueueOptions) – options to configure the queue with
timeout (float) – maximum number of seconds to wait for this request. If not provided, the timeout provided to the
Session
when it was created it used. If that was not provided either, a reasonable default is used.
- Raises:
Error – If the configure queue request was not successful.
BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.
ValueError – If timeout is not > 0.0.
- confirm(message)[source]
Confirm the specified message from this queue.
Mark the specified message, as confirmed. If successful, this will indicate to the BlazingMQ framework that the processing for this message has been completed and that the message must not be re-delivered, if this queue is opened again by any consumer.
It’s often more convenient to use
MessageHandle.confirm
instead. An instance ofMessageHandle
is received with every newMessage
delivered via theon_message
callback.
- get_queue_options(queue_uri)[source]
Get configured options of an opened queue.
Get the previously set options of an opened queue at the specified queue_uri.
- Parameters:
queue_uri (str) – unique resource identifier for the queue to be configured.
- Returns:
The queue’s configured options.
- Return type:
- Raises:
Error – If the queue with the given URI is not open, or its options cannot be retrieved.
Note
Options that only affect message consumption, including consumer_priority, max_unconfirmed_messages, and max_unconfirmed_bytes, are ignored when opening or configuring a write-only queue, so any attempt to set those options on a write-only queue won’t be reflected in the
QueueOptions
returned by a later call to get_queue_options.
- open_queue(queue_uri, *, read=False, write=False, options=QueueOptions(), timeout=...)[source]
Open a queue with the specified parameters
Open a queue at the specified queue_uri. The queue_uri is the identifier for any future interactions with this opened queue.
Note
Invoking this method from the
on_message
oron_session_event
of theSession
or theon_ack
callback of a posted message will cause a deadlock.Note
It’s possible to override the default tier by adding an optional tier to the queue URI. See URI format.
- Parameters:
queue_uri (str) – unique resource identifier for the queue to be opened.
read (bool) – open the queue for reading, enabling the
Session
to receiveMessage
objects for this queue.write (bool) – open the queue for writing, allowing posting to this queue.
options (QueueOptions) – options to configure the queue with
timeout (float) – maximum number of seconds to wait for this request. If not provided, the timeout provided to the
Session
when it was created it used. If that was not provided either, a reasonable default is used.
- Raises:
Error – If the open queue request was not successful.
BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.
ValueError – If timeout is not > 0.0.
- post(queue_uri, message, *, properties=None, property_type_overrides=None, on_ack=None)[source]
Post a message to an opened queue specified by queue_uri.
Post the payload and optional properties and overrides to the opened queue specified by queue_uri. Optionally take an on_ack callback that is invoked with the incoming acknowledgment for the message posted.
- Parameters:
queue_uri (str) – unique resource identifier for the queue to posted to.
message (bytes) – the payload of the message.
properties (Optional[
PropertyValueDict
]) – optionally provided properties to be associated with the message.property_type_overrides (Optional[
PropertyTypeDict
]) – optionally provided type overrides for the properties.on_ack (Optional[Callable[[Ack], None]]) – optionally specified callback which is invoked with the acknowledgment status of the message being posted.
- Raises:
Error – If the post request was not successful.
- stop()[source]
Teardown the broker connection
Stop the session with the connected BlazingMQ broker which implies tearing down the connection. This session cannot be used to execute any actions after this method returns.
Note
Invoking this method from the
on_message
oron_session_event
of theSession
or theon_ack
callback of a posted message will cause a deadlock.
- classmethod with_options(on_session_event, on_message=None, broker='tcp://localhost:30114', session_options=SessionOptions())[source]
Construct a Session instance using
SessionOptions
.This is the recommended way to construct a new session, as the
SessionOptions
class provides an easier to use interface for configuring only those options you need.- Parameters:
on_session_event (Callable[[SessionEvent], None]) – a required callback to process
SessionEvent
events received by the session.on_message (Callable[[Message, MessageHandle], None] | None) – an optional callback to process
Message
objects received by the session.broker (str) – TCP address of the broker (default: ‘tcp://localhost:30114’). If the environment variable
BMQ_BROKER_URI
is set, its value will override whatever broker address is passed via this argument.session_options (SessionOptions) – an instance of
SessionOptions
that represents the session’s configuration.
- Raises:
Error – If the session start request was not successful.
BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.
ValueError – If any of the timeouts are provided and not > 0.0, or if the
stats_dump_interval
is provided and is < 0.0.
- class blazingmq.Timeouts(connect_timeout=None, disconnect_timeout=None, open_queue_timeout=None, configure_queue_timeout=None, close_queue_timeout=None)[source]
A value semantic type representing session timeouts.
Each option can be set either by passing it as a keyword argument when constructing a Timeouts instance, or by setting it as an attribute on a constructed instance.
The default for every option is
None
. When constructing aSession
, either directly or usingSessionOptions
, options set toNone
are given reasonable default values.- Parameters:
connect_timeout (Optional[float]) – The maximum number of seconds to wait for connection requests on this session.
disconnect_timeout (Optional[float]) – The maximum number of seconds to wait for disconnection requests on this session.
open_queue_timeout (Optional[float]) – The maximum number of seconds to wait for open queue requests on this session.
configure_queue_timeout (Optional[float]) – The maximum number of seconds to wait for configure queue requests on this session.
close_queue_timeout (Optional[float]) – The maximum number of seconds to wait for close queue requests on this session.
- class blazingmq.SessionOptions(message_compression_algorithm=None, timeouts=None, host_health_monitor=None, num_processing_threads=None, blob_buffer_size=None, channel_high_watermark=None, event_queue_watermarks=None, stats_dump_interval=None)[source]
A value semantic type representing session options.
Each option can be set either by passing it as a keyword argument when constructing a SessionOptions instance, or by setting it as an attribute on a constructed instance.
The default for every option is
None
. When constructing aSession
, options set toNone
are given reasonable default values.- Parameters:
message_compression_algorithm (Optional[CompressionAlgorithmType]) – The type of compression to apply to messages being posted via the session this object is configuring.
timeouts (Optional[Timeouts]) – The maximum number of seconds to wait for requests for each operation on this session. If not provided, reasonable defaults are used.
host_health_monitor (Union[BasicHealthMonitor, None]) – A
BasicHealthMonitor
is used by default, so your tests can control whether the session sees the machine as healthy or not by callingset_healthy
andset_unhealthy
on that instance. If you instead passNone
, the session will always see the machine as healthy,HostUnhealthy
andHostHealthRestored
events with never be emitted, and the suspends_on_bad_host_health option ofQueueOptions
cannot be used.num_processing_threads (Optional[int]) – The number of threads for the SDK to use for processing events. This defaults to 1.
blob_buffer_size (Optional[int]) – The size (in bytes) of the blob buffers to use. This defaults to 4k.
channel_high_watermark (Optional[int]) – The size (in bytes) to use for the write cache high watermark on the channel. The default value is 128MB. Note that BlazingMQ reserves 4MB of this value for control messages, so the actual watermark for data published is
channel_high_watermark - 4MB
.event_queue_watermarks (Optional[tuple[int, int]]) – A tuple containing the low and high notification watermark thresholds for the buffer containing all incoming messages from the broker, respectively. A warning
SlowConsumerHighWaterMark
is emitted when the buffer reaches the high watermark value, and a notificationSlowConsumerNormal
is emitted when the buffer is back to the low watermark.stats_dump_interval (Optional[float]) – The interval (in seconds) at which to dump stats into the logs. If 0, disable the recurring dump of stats (final stats are always dumped at the end of the session). The default is 5min; the value must be a multiple of 30s, in the range
[0s - 60min]
.
- class blazingmq.QueueOptions(max_unconfirmed_messages=None, max_unconfirmed_bytes=None, consumer_priority=None, suspends_on_bad_host_health=None)[source]
A value semantic type representing the settings for a queue.
Each option can be set either by passing it as a keyword argument when constructing a QueueOptions instance, or by setting it as an attribute on a constructed instance.
The default for every option is
None
. When callingconfigure_queue
, options set toNone
are not changed from their current setting. When callingopen_queue
, options set toNone
are given default values. These default values are accessible as class attributes on the QueueOptions class.- Parameters:
max_unconfirmed_messages (Optional[int]) – The maximum number of messages that can be delivered to the queue without confirmation. This limit can be reached if the queue receives messages faster than it confirms received messages. Once this limit is reached, at least one message must be confirmed before the queue will receive any more messages.
max_unconfirmed_bytes (Optional[int]) – The maximum number of bytes that can be delivered to the queue without confirmation. Like max_unconfirmed_messages, this limit can be reached if incoming messages are queued up waiting for already delivered messages to be confirmed.
consumer_priority (Optional[int]) – The precedence of this consumer compared to other consumers of the same queue. The consumer with the highest priority receives the messages. If multiple consumers share the highest priority, messages are delivered to them in a round-robin fashion.
suspends_on_bad_host_health (Optional[bool]) – Whether or not this queue should suspend operation while the host machine is unhealthy. While operation is suspended, a queue opened with
read=True
will not receive messages and a queue opened withwrite=True
will raise if you try topost
a message. By default, queues are not sensitive to the host’s health.
- DEFAULT_MAX_UNCONFIRMED_MESSAGES = 1000
The max_unconfirmed_messages used by
open_queue
by default.
- DEFAULT_MAX_UNCONFIRMED_BYTES = 33554432
The max_unconfirmed_bytes used by
open_queue
by default.
- DEFAULT_CONSUMER_PRIORITY = 0
The consumer_priority used by
open_queue
by default.
- DEFAULT_SUSPENDS_ON_BAD_HOST_HEALTH = False
The suspends_on_bad_host_health used by
open_queue
by default.
Message Classes
- class blazingmq.Message[source]
A class representing a message received from BlazingMQ.
A
Message
represents a message delivered by BlazingMQ from a producer to this queue. This message can only be received if the queue is opened with ‘read=True’ mode enabled.- properties
A dictionary of BlazingMQ message properties. The dictionary keys must be
str
representing the property names and the values must be of typestr
,bytes
,bool
orint
.- Type:
- property_types
A mapping of property names to
PropertyType
types. The dictionary is guaranteed to provide a value for each key already present inMessage.properties
- Type:
- class blazingmq.MessageHandle[source]
Operations that can be performed on a
Message
.An instance of this class is received in the
on_message
callback along with an instance of aMessage
.- confirm()[source]
Confirm the message received along with this handle.
See
Session.confirm
for more details.- Raises:
Error – If the confirm message request was not successful.
- class blazingmq.Ack[source]
Acknowledgment message
An
Ack
is a notification from BlazingMQ to the application, specifying that the message has been received. This is valuable for ensuring delivery of messages.These messages will be received in the optionally provided callback to
Session.post()
.An
Ack
is by itself not an indication of success unless it has a status ofAckStatus.SUCCESS
.- guid
a globally unique identifier generated by BlazingMQ for the message that was successfully posted. This can be correlated between the producer and consumer to verify the flow of messages.
- Type:
- queue_uri
the queue that this message was routed to. This is useful if you have many queues and you want to route this particular
Ack
to a particular queue.- Type:
- status
the
AckStatus
indicating the result of the post operation. Unless this is of typeAckStatus.SUCCESS
, the post has failed and potentially needs to be dealt with.- Type:
Health Monitoring
- class blazingmq.BasicHealthMonitor[source]
Control whether a
Session
sees the host as healthy or unhealthy.When a BasicHealthMonitor is passed for the
Session
constructor’s host_health_monitor parameter, you can control whether the BlazingMQ session sees the host as healthy or unhealthy by calling theset_healthy
andset_unhealthy
methods. Newly created instances default to the healthy state.
Enumerations
- class blazingmq.AckStatus[source]
An enum representing the status of an Ack message
An
AckStatus
is a status of a receivedAck
message which is the result of an attempted put to some particular queue. Anything other thanAckStatus.SUCCESS
represents a failure.- SUCCESS
- CANCELED
- INVALID_ARGUMENT
- LIMIT_BYTES
- LIMIT_MESSAGES
- NOT_CONNECTED
- NOT_READY
- NOT_SUPPORTED
- REFUSED
- TIMEOUT
- UNKNOWN
Exceptions
Session Events
- class blazingmq.session_events.QueueEvent[source]
Base type for session events relating to a single queue.
- class blazingmq.session_events.Connected[source]
Notification of successful connection with the broker
- class blazingmq.session_events.Disconnected[source]
Notification of successful disconnection with the broker
- class blazingmq.session_events.ConnectionLost[source]
Notification of a lost connection with the broker
- class blazingmq.session_events.Reconnected[source]
Notification of a re-connection with the broker in case connection was lost earlier
- class blazingmq.session_events.StateRestored[source]
Notification of successfully restoring state of application as it was before lost connection or disconnection
- class blazingmq.session_events.ConnectionTimeout[source]
Notification that a requested connection could not be initiated in within the timeout period
- class blazingmq.session_events.HostUnhealthy[source]
Notification that the host has been marked unhealthy.
This is emitted only if
host_health_monitor=None
was not provided when theSession
was created. This will be emitted whenever the machine becomes unhealthy. It is also emitted if the machine is initially unhealthy when theSession
is created.New in version 0.7.0.
- class blazingmq.session_events.HostHealthRestored[source]
Notification that the host is no longer marked unhealthy.
This is emitted only if
host_health_monitor=None
was not provided when theSession
was created. It will be emitted once the machine is becomes healthy after an earlierHostUnhealthy
event. Before this event is emitted, you will receive aQueueResumed
orQueueResumeFailed
for each queue that was suspended due tosuspends_on_bad_host_health=True
.New in version 0.7.0.
- class blazingmq.session_events.QueueSuspended[source]
Bases:
QueueEvent
A queue that is sensitive to host health has been suspended.
After a
HostUnhealthy
event is emitted, any queue that was opened withsuspend_on_bad_host_health=True
will suspend operation. This event will be emitted once for each suspended queue.Note
If
host_health_monitor=None
was provided when theSession
was created, this event will never be emitted because the host will never be considered unhealthy.New in version 0.7.0.
- class blazingmq.session_events.QueueSuspendFailed[source]
Bases:
QueueEvent
A queue that is sensitive to host health could not be suspended.
Whenever a
QueueSuspended
event would be expected, this event may be emitted instead if the SDK is unable to suspend the queue as expected.Note
The BlazingMQ SDK considers the failure to suspend a queue as evidence of an unusually serious problem with the connection to the broker, so if this event occurs the SDK follows it up by dropping the connection to the broker and trying to re-establish it.
New in version 0.7.0.
- class blazingmq.session_events.QueueResumed[source]
Bases:
QueueEvent
A queue that is sensitive to host health has been resumed.
Once an unhealthy machine becomes healthy again, the SDK will automatically attempt to resume each queue that was suspended when the machine became unhealthy. This event will be emitted once for each queue that had been suspended, only after which will
HostHealthRestored
be emitted.New in version 0.7.0.
- class blazingmq.session_events.QueueResumeFailed[source]
Bases:
QueueEvent
A queue that is sensitive to host health could not be resumed.
Whenever a
QueueResumed
event would be expected, this event may be emitted instead if the SDK is unable to resume the queue as expected.Note
Unlike if suspending a queue fails, the SDK will not automatically drop the connection to the broker if resuming a queue fails.
New in version 0.7.0.
- class blazingmq.session_events.QueueReopened[source]
Bases:
QueueEvent
A queue has been successfully reopened after a connection loss.
If the connection with the broker is lost,
ConnectionLost
is emitted. Once it is reestablished,Reconnected
is emitted, followed by either aQueueReopened
orQueueReopenFailed
for each queue that was previously open, and finallyStateRestored
is emitted.
- class blazingmq.session_events.QueueReopenFailed[source]
Bases:
QueueEvent
A queue couldn’t be reopened after a connection loss.
- class blazingmq.session_events.SlowConsumerNormal[source]
Notification that the consumer has resumed acceptable rate of consumption
Helper Functions
- blazingmq.session_events.log_session_event(event)[source]
Log incoming session event as appropriate
A callback that can be used as a default for the on_session_event parameter on the
Session
object. AllConnected
,Disconnected
,StateRestored
,SlowConsumerNormal
, andQueueReopened
events are logged at INFO level, and anyConnectionLost
,SlowConsumerHighWaterMark
, andReconnected
events are logged at WARN level, as they may indicate issues with the application. Any other events are most likely an error in the application and are logged at ERROR level.- Parameters:
event (SessionEvent) – incoming
SessionEvent
object.
Testing Utilities
The blazingmq.testing
module provides testing utilities.
Nothing in this module should ever be used outside of the context of performing integration tests on your subscribers and publishers.
- blazingmq.testing.HostHealth
This is an alias of
BasicHealthMonitor
.This was originally exposed only for testing purposes, but is now supported for general use. The original name has been retained for backwards compatibility, but new code should prefer to use
blazingmq.BasicHealthMonitor
directly.
Helper Types
- blazingmq.PropertyTypeDict
alias of
Dict
[str
,PropertyType
]