API Reference

Session

class blazingmq.Session(on_session_event, *, on_message=None, broker='tcp://localhost:30114', message_compression_algorithm=CompressionAlgorithmType.NONE, timeout=..., host_health_monitor=None, num_processing_threads=None, blob_buffer_size=None, channel_high_watermark=None, event_queue_watermarks=None, stats_dump_interval=None)[source]

Represents a connection with the BlazingMQ broker.

The session represents a connection to the broker. This object can be manipulated to modify the state of the application from the point of view of the broker, including opening queues, and starting or stopping the connection with the broker.

Parameters:
  • on_session_event (Callable[[SessionEvent], None]) – a required callback to process SessionEvent events received by the session.

  • on_message (Optional[Callable[[Message, MessageHandle], None]]) – an optional callback to process Message objects received by the session.

  • broker (str) – TCP address of the broker (default: ‘tcp://localhost:30114’). If the environment variable BMQ_BROKER_URI is set, its value will override whatever broker address is passed via this argument.

  • message_compression_algorithm (CompressionAlgorithmType) – the type of compression to apply to messages being posted via this session object.

  • timeout (Union[Timeouts, float]) – maximum number of seconds to wait for requests on this session. If not provided, reasonable defaults are used. This argument may either be a simple float, which sets the same timeout for each operation, or an instance of the Timeouts class, which allows setting the timeout for each operation independently.

  • host_health_monitor (Union[BasicHealthMonitor, None]) – A BasicHealthMonitor is used by default, so your tests can control whether the session sees the machine as healthy or not by calling set_healthy and set_unhealthy on that instance. If you instead pass None, the session will always see the machine as healthy, HostUnhealthy and HostHealthRestored events will never be emitted, and the suspends_on_bad_host_health option of QueueOptions cannot be used.

  • num_processing_threads (Optional[int]) – The number of threads for the SDK to use for processing events. This defaults to 1.

  • blob_buffer_size (Optional[int]) – The size (in bytes) of the blob buffers to use. This defaults to 4k.

  • channel_high_watermark (Optional[int]) – The size (in bytes) to use for the write cache high watermark on the channel. The default value is 128MB. Note that BlazingMQ reserves 4MB of this value for control messages, so the actual watermark for data published is channel_high_watermark - 4MB.

  • event_queue_watermarks (Optional[tuple[int, int]]) – A tuple containing the low and high notification watermark thresholds for the buffer containing all incoming messages from the broker, respectively. A warning SlowConsumerHighWaterMark is emitted when the buffer reaches the high watermark value, and a notification SlowConsumerNormal is emitted when the buffer is back to the low watermark.

  • stats_dump_interval (Optional[float]) – The interval (in seconds) at which to dump stats into the logs. If 0, disable the recurring dump of stats (final stats are always dumped at the end of the session). The default is 5min; the value must be a multiple of 30s, in the range [0s - 60min].

Raises:
  • Error – If the session start request was not successful.

  • BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.

  • ValueError – If any of the timeouts are provided and not > 0.0, or if the stats_dump_interval is provided and is < 0.0.

close_queue(queue_uri, *, timeout=...)[source]

Close an opened queue at the specified queue_uri.

Close the queue at the specified queue_uri. After this method successfully returns, the queue_uri will no longer correspond to valid queue to do any operations.

Note

Invoking this method from the on_message or on_session_event of the Session or the on_ack callback of a posted message will cause a deadlock.

Parameters:
  • queue_uri (str) – unique resource identifier for the queue to be closed.

  • timeout (float) – maximum number of seconds to wait for this request. If not provided, the timeout provided to the Session when it was created it used. If that was not provided either, a reasonable default is used.

Raises:
  • Error – If the close queue request was not successful.

  • BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.

  • ValueError – If timeout is not > 0.0.

configure_queue(queue_uri, options, *, timeout=...)[source]

Modify the options on an opened queue at the specified queue_uri.

Modify the options of the queue at the specified queue_uri. After this method successfully returns, the queue_uri will be configured with the specified options.

Note

Invoking this method from the on_message or on_session_event of the Session or the on_ack callback of a posted message will cause a deadlock.

Parameters:
  • queue_uri (str) – unique resource identifier for the queue to be configured.

  • options (QueueOptions) – options to configure the queue with

  • timeout (float) – maximum number of seconds to wait for this request. If not provided, the timeout provided to the Session when it was created it used. If that was not provided either, a reasonable default is used.

Raises:
  • Error – If the configure queue request was not successful.

  • BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.

  • ValueError – If timeout is not > 0.0.

confirm(message)[source]

Confirm the specified message from this queue.

Mark the specified message, as confirmed. If successful, this will indicate to the BlazingMQ framework that the processing for this message has been completed and that the message must not be re-delivered, if this queue is opened again by any consumer.

It’s often more convenient to use MessageHandle.confirm instead. An instance of MessageHandle is received with every new Message delivered via the on_message callback.

Parameters:

message (Message) – message to be confirmed.

Raises:

Error – If the confirm message request was not successful.

get_queue_options(queue_uri)[source]

Get configured options of an opened queue.

Get the previously set options of an opened queue at the specified queue_uri.

Parameters:

queue_uri (str) – unique resource identifier for the queue to be configured.

Returns:

The queue’s configured options.

Return type:

QueueOptions

Raises:

Error – If the queue with the given URI is not open, or its options cannot be retrieved.

Note

Options that only affect message consumption, including consumer_priority, max_unconfirmed_messages, and max_unconfirmed_bytes, are ignored when opening or configuring a write-only queue, so any attempt to set those options on a write-only queue won’t be reflected in the QueueOptions returned by a later call to get_queue_options.

open_queue(queue_uri, *, read=False, write=False, options=QueueOptions(), timeout=...)[source]

Open a queue with the specified parameters

Open a queue at the specified queue_uri. The queue_uri is the identifier for any future interactions with this opened queue.

Note

Invoking this method from the on_message or on_session_event of the Session or the on_ack callback of a posted message will cause a deadlock.

Note

It’s possible to override the default tier by adding an optional tier to the queue URI. See URI format.

Parameters:
  • queue_uri (str) – unique resource identifier for the queue to be opened.

  • read (bool) – open the queue for reading, enabling the Session to receive Message objects for this queue.

  • write (bool) – open the queue for writing, allowing posting to this queue.

  • options (QueueOptions) – options to configure the queue with

  • timeout (float) – maximum number of seconds to wait for this request. If not provided, the timeout provided to the Session when it was created it used. If that was not provided either, a reasonable default is used.

Raises:
  • Error – If the open queue request was not successful.

  • BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.

  • ValueError – If timeout is not > 0.0.

post(queue_uri, message, *, properties=None, property_type_overrides=None, on_ack=None)[source]

Post a message to an opened queue specified by queue_uri.

Post the payload and optional properties and overrides to the opened queue specified by queue_uri. Optionally take an on_ack callback that is invoked with the incoming acknowledgment for the message posted.

Parameters:
  • queue_uri (str) – unique resource identifier for the queue to posted to.

  • message (bytes) – the payload of the message.

  • properties (Optional[PropertyValueDict]) – optionally provided properties to be associated with the message.

  • property_type_overrides (Optional[PropertyTypeDict]) – optionally provided type overrides for the properties.

  • on_ack (Optional[Callable[[Ack], None]]) – optionally specified callback which is invoked with the acknowledgment status of the message being posted.

Raises:

Error – If the post request was not successful.

stop()[source]

Teardown the broker connection

Stop the session with the connected BlazingMQ broker which implies tearing down the connection. This session cannot be used to execute any actions after this method returns.

Note

Invoking this method from the on_message or on_session_event of the Session or the on_ack callback of a posted message will cause a deadlock.

classmethod with_options(on_session_event, on_message=None, broker='tcp://localhost:30114', session_options=SessionOptions())[source]

Construct a Session instance using SessionOptions.

This is the recommended way to construct a new session, as the SessionOptions class provides an easier to use interface for configuring only those options you need.

Parameters:
Raises:
  • Error – If the session start request was not successful.

  • BrokerTimeoutError – If the broker didn’t respond to the request within a reasonable amount of time.

  • ValueError – If any of the timeouts are provided and not > 0.0, or if the stats_dump_interval is provided and is < 0.0.

class blazingmq.Timeouts(connect_timeout=None, disconnect_timeout=None, open_queue_timeout=None, configure_queue_timeout=None, close_queue_timeout=None)[source]

A value semantic type representing session timeouts.

Each option can be set either by passing it as a keyword argument when constructing a Timeouts instance, or by setting it as an attribute on a constructed instance.

The default for every option is None. When constructing a Session, either directly or using SessionOptions, options set to None are given reasonable default values.

Parameters:
  • connect_timeout (Optional[float]) – The maximum number of seconds to wait for connection requests on this session.

  • disconnect_timeout (Optional[float]) – The maximum number of seconds to wait for disconnection requests on this session.

  • open_queue_timeout (Optional[float]) – The maximum number of seconds to wait for open queue requests on this session.

  • configure_queue_timeout (Optional[float]) – The maximum number of seconds to wait for configure queue requests on this session.

  • close_queue_timeout (Optional[float]) – The maximum number of seconds to wait for close queue requests on this session.

class blazingmq.SessionOptions(message_compression_algorithm=None, timeouts=None, host_health_monitor=None, num_processing_threads=None, blob_buffer_size=None, channel_high_watermark=None, event_queue_watermarks=None, stats_dump_interval=None)[source]

A value semantic type representing session options.

Each option can be set either by passing it as a keyword argument when constructing a SessionOptions instance, or by setting it as an attribute on a constructed instance.

The default for every option is None. When constructing a Session, options set to None are given reasonable default values.

Parameters:
  • message_compression_algorithm (Optional[CompressionAlgorithmType]) – The type of compression to apply to messages being posted via the session this object is configuring.

  • timeouts (Optional[Timeouts]) – The maximum number of seconds to wait for requests for each operation on this session. If not provided, reasonable defaults are used.

  • host_health_monitor (Union[BasicHealthMonitor, None]) – A BasicHealthMonitor is used by default, so your tests can control whether the session sees the machine as healthy or not by calling set_healthy and set_unhealthy on that instance. If you instead pass None, the session will always see the machine as healthy, HostUnhealthy and HostHealthRestored events with never be emitted, and the suspends_on_bad_host_health option of QueueOptions cannot be used.

  • num_processing_threads (Optional[int]) – The number of threads for the SDK to use for processing events. This defaults to 1.

  • blob_buffer_size (Optional[int]) – The size (in bytes) of the blob buffers to use. This defaults to 4k.

  • channel_high_watermark (Optional[int]) – The size (in bytes) to use for the write cache high watermark on the channel. The default value is 128MB. Note that BlazingMQ reserves 4MB of this value for control messages, so the actual watermark for data published is channel_high_watermark - 4MB.

  • event_queue_watermarks (Optional[tuple[int, int]]) – A tuple containing the low and high notification watermark thresholds for the buffer containing all incoming messages from the broker, respectively. A warning SlowConsumerHighWaterMark is emitted when the buffer reaches the high watermark value, and a notification SlowConsumerNormal is emitted when the buffer is back to the low watermark.

  • stats_dump_interval (Optional[float]) – The interval (in seconds) at which to dump stats into the logs. If 0, disable the recurring dump of stats (final stats are always dumped at the end of the session). The default is 5min; the value must be a multiple of 30s, in the range [0s - 60min].

class blazingmq.QueueOptions(max_unconfirmed_messages=None, max_unconfirmed_bytes=None, consumer_priority=None, suspends_on_bad_host_health=None)[source]

A value semantic type representing the settings for a queue.

Each option can be set either by passing it as a keyword argument when constructing a QueueOptions instance, or by setting it as an attribute on a constructed instance.

The default for every option is None. When calling configure_queue, options set to None are not changed from their current setting. When calling open_queue, options set to None are given default values. These default values are accessible as class attributes on the QueueOptions class.

Parameters:
  • max_unconfirmed_messages (Optional[int]) – The maximum number of messages that can be delivered to the queue without confirmation. This limit can be reached if the queue receives messages faster than it confirms received messages. Once this limit is reached, at least one message must be confirmed before the queue will receive any more messages.

  • max_unconfirmed_bytes (Optional[int]) – The maximum number of bytes that can be delivered to the queue without confirmation. Like max_unconfirmed_messages, this limit can be reached if incoming messages are queued up waiting for already delivered messages to be confirmed.

  • consumer_priority (Optional[int]) – The precedence of this consumer compared to other consumers of the same queue. The consumer with the highest priority receives the messages. If multiple consumers share the highest priority, messages are delivered to them in a round-robin fashion.

  • suspends_on_bad_host_health (Optional[bool]) – Whether or not this queue should suspend operation while the host machine is unhealthy. While operation is suspended, a queue opened with read=True will not receive messages and a queue opened with write=True will raise if you try to post a message. By default, queues are not sensitive to the host’s health.

DEFAULT_MAX_UNCONFIRMED_MESSAGES = 1000

The max_unconfirmed_messages used by open_queue by default.

DEFAULT_MAX_UNCONFIRMED_BYTES = 33554432

The max_unconfirmed_bytes used by open_queue by default.

DEFAULT_CONSUMER_PRIORITY = 0

The consumer_priority used by open_queue by default.

DEFAULT_SUSPENDS_ON_BAD_HOST_HEALTH = False

The suspends_on_bad_host_health used by open_queue by default.

Message Classes

class blazingmq.Message[source]

A class representing a message received from BlazingMQ.

A Message represents a message delivered by BlazingMQ from a producer to this queue. This message can only be received if the queue is opened with ‘read=True’ mode enabled.

data

Payload for the message received from BlazingMQ.

Type:

bytes

guid

Globally unique id for this message.

Type:

bytes

queue_uri

Queue URI this message is for.

Type:

str

properties

A dictionary of BlazingMQ message properties. The dictionary keys must be str representing the property names and the values must be of type str, bytes, bool or int.

Type:

dict

property_types

A mapping of property names to PropertyType types. The dictionary is guaranteed to provide a value for each key already present in Message.properties

Type:

dict

class blazingmq.MessageHandle[source]

Operations that can be performed on a Message.

An instance of this class is received in the on_message callback along with an instance of a Message.

confirm()[source]

Confirm the message received along with this handle.

See Session.confirm for more details.

Raises:

Error – If the confirm message request was not successful.

class blazingmq.Ack[source]

Acknowledgment message

An Ack is a notification from BlazingMQ to the application, specifying that the message has been received. This is valuable for ensuring delivery of messages.

These messages will be received in the optionally provided callback to Session.post().

An Ack is by itself not an indication of success unless it has a status of AckStatus.SUCCESS.

guid

a globally unique identifier generated by BlazingMQ for the message that was successfully posted. This can be correlated between the producer and consumer to verify the flow of messages.

Type:

bytes

queue_uri

the queue that this message was routed to. This is useful if you have many queues and you want to route this particular Ack to a particular queue.

Type:

str

status

the AckStatus indicating the result of the post operation. Unless this is of type AckStatus.SUCCESS, the post has failed and potentially needs to be dealt with.

Type:

AckStatus

Health Monitoring

class blazingmq.BasicHealthMonitor[source]

Control whether a Session sees the host as healthy or unhealthy.

When a BasicHealthMonitor is passed for the Session constructor’s host_health_monitor parameter, you can control whether the BlazingMQ session sees the host as healthy or unhealthy by calling the set_healthy and set_unhealthy methods. Newly created instances default to the healthy state.

set_healthy()[source]

Tell any associated Session that the host is healthy.

set_unhealthy()[source]

Tell any associated Session that the host is unhealthy.

Enumerations

class blazingmq.AckStatus[source]

An enum representing the status of an Ack message

An AckStatus is a status of a received Ack message which is the result of an attempted put to some particular queue. Anything other than AckStatus.SUCCESS represents a failure.

SUCCESS
CANCELED
INVALID_ARGUMENT
LIMIT_BYTES
LIMIT_MESSAGES
NOT_CONNECTED
NOT_READY
NOT_SUPPORTED
REFUSED
TIMEOUT
UNKNOWN
UNRECOGNIZED

The AckStatus was not recognized by the binding layer

class blazingmq.CompressionAlgorithmType[source]

An enum representing compression algorithm used by a producer

NONE
ZLIB
class blazingmq.PropertyType[source]

An enum representing various data types understood by BlazingMQ

BOOL
CHAR
SHORT
INT32
INT64
STRING
BINARY

Exceptions

exception blazingmq.Error[source]

Generic BlazingMQ exception type

exception blazingmq.exceptions.BrokerTimeoutError[source]

Provided time to wait has expired

Session Events

class blazingmq.session_events.SessionEvent[source]

Base session event type

class blazingmq.session_events.QueueEvent[source]

Base type for session events relating to a single queue.

queue_uri

Queue URI this event is associated with.

Type:

str

class blazingmq.session_events.Connected[source]

Notification of successful connection with the broker

class blazingmq.session_events.Disconnected[source]

Notification of successful disconnection with the broker

class blazingmq.session_events.ConnectionLost[source]

Notification of a lost connection with the broker

class blazingmq.session_events.Reconnected[source]

Notification of a re-connection with the broker in case connection was lost earlier

class blazingmq.session_events.StateRestored[source]

Notification of successfully restoring state of application as it was before lost connection or disconnection

class blazingmq.session_events.ConnectionTimeout[source]

Notification that a requested connection could not be initiated in within the timeout period

class blazingmq.session_events.HostUnhealthy[source]

Notification that the host has been marked unhealthy.

This is emitted only if host_health_monitor=None was not provided when the Session was created. This will be emitted whenever the machine becomes unhealthy. It is also emitted if the machine is initially unhealthy when the Session is created.

New in version 0.7.0.

class blazingmq.session_events.HostHealthRestored[source]

Notification that the host is no longer marked unhealthy.

This is emitted only if host_health_monitor=None was not provided when the Session was created. It will be emitted once the machine is becomes healthy after an earlier HostUnhealthy event. Before this event is emitted, you will receive a QueueResumed or QueueResumeFailed for each queue that was suspended due to suspends_on_bad_host_health=True.

New in version 0.7.0.

class blazingmq.session_events.QueueSuspended[source]

Bases: QueueEvent

A queue that is sensitive to host health has been suspended.

After a HostUnhealthy event is emitted, any queue that was opened with suspend_on_bad_host_health=True will suspend operation. This event will be emitted once for each suspended queue.

Note

If host_health_monitor=None was provided when the Session was created, this event will never be emitted because the host will never be considered unhealthy.

queue_uri

URI of the queue that has been successfully suspended.

Type:

str

New in version 0.7.0.

class blazingmq.session_events.QueueSuspendFailed[source]

Bases: QueueEvent

A queue that is sensitive to host health could not be suspended.

Whenever a QueueSuspended event would be expected, this event may be emitted instead if the SDK is unable to suspend the queue as expected.

Note

The BlazingMQ SDK considers the failure to suspend a queue as evidence of an unusually serious problem with the connection to the broker, so if this event occurs the SDK follows it up by dropping the connection to the broker and trying to re-establish it.

queue_uri

URI of the queue that could not be suspended.

Type:

str

New in version 0.7.0.

class blazingmq.session_events.QueueResumed[source]

Bases: QueueEvent

A queue that is sensitive to host health has been resumed.

Once an unhealthy machine becomes healthy again, the SDK will automatically attempt to resume each queue that was suspended when the machine became unhealthy. This event will be emitted once for each queue that had been suspended, only after which will HostHealthRestored be emitted.

queue_uri

URI of the queue that has been successfully resumed.

Type:

str

New in version 0.7.0.

class blazingmq.session_events.QueueResumeFailed[source]

Bases: QueueEvent

A queue that is sensitive to host health could not be resumed.

Whenever a QueueResumed event would be expected, this event may be emitted instead if the SDK is unable to resume the queue as expected.

Note

Unlike if suspending a queue fails, the SDK will not automatically drop the connection to the broker if resuming a queue fails.

queue_uri

URI of the queue that could not be resumed.

Type:

str

New in version 0.7.0.

class blazingmq.session_events.QueueReopened[source]

Bases: QueueEvent

A queue has been successfully reopened after a connection loss.

If the connection with the broker is lost, ConnectionLost is emitted. Once it is reestablished, Reconnected is emitted, followed by either a QueueReopened or QueueReopenFailed for each queue that was previously open, and finally StateRestored is emitted.

queue_uri

URI of the queue that has been successfully reopened.

Type:

str

class blazingmq.session_events.QueueReopenFailed[source]

Bases: QueueEvent

A queue couldn’t be reopened after a connection loss.

queue_uri

URI of the queue that could not be reopened.

Type:

str

class blazingmq.session_events.SlowConsumerNormal[source]

Notification that the consumer has resumed acceptable rate of consumption

class blazingmq.session_events.SlowConsumerHighWaterMark[source]

Notification that the consumer is consuming at the lowest rate acceptable

class blazingmq.session_events.Error[source]

Notification of a miscellaneous error

class blazingmq.session_events.InterfaceError[source]

The BlazingMQ SDK behaved in an unexpected way.

Helper Functions

blazingmq.session_events.log_session_event(event)[source]

Log incoming session event as appropriate

A callback that can be used as a default for the on_session_event parameter on the Session object. All Connected, Disconnected, StateRestored, SlowConsumerNormal, and QueueReopened events are logged at INFO level, and any ConnectionLost, SlowConsumerHighWaterMark, and Reconnected events are logged at WARN level, as they may indicate issues with the application. Any other events are most likely an error in the application and are logged at ERROR level.

Parameters:

event (SessionEvent) – incoming SessionEvent object.

Testing Utilities

The blazingmq.testing module provides testing utilities.

Nothing in this module should ever be used outside of the context of performing integration tests on your subscribers and publishers.

blazingmq.testing.HostHealth

This is an alias of BasicHealthMonitor.

This was originally exposed only for testing purposes, but is now supported for general use. The original name has been retained for backwards compatibility, but new code should prefer to use blazingmq.BasicHealthMonitor directly.

Helper Types

blazingmq.PropertyTypeDict

alias of Dict[str, PropertyType]

blazingmq.PropertyValueDict

alias of Dict[str, Union[int, bytes, str]]