Examples
Producer
A complete example of a program that posts a message and waits for it to be acknowledged by the broker.
Note that on_ack
is an optional argument. However, receiving acknowledgment is the only
way to guarantee that your message was received by the broker. In this example, messages are sent
in a fully synchronous fashion - the program waits for an acknowledgement before terminating.
# Copyright 2019-2023 Bloomberg Finance L.P.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import functools
import threading
import blazingmq
QUEUE_URI = "bmq://bmq.test.mmap.priority/blazingmq-examples"
def on_ack(event: threading.Event, ack: blazingmq.Ack) -> None:
if ack.status != blazingmq.AckStatus.SUCCESS:
print("Received NAck: %r" % ack)
else:
print("Received Ack: %r" % ack)
event.set()
def main() -> None:
with blazingmq.Session(blazingmq.session_events.log_session_event) as session:
print("Connected to BlazingMQ broker")
session.open_queue(QUEUE_URI, write=True)
event = threading.Event()
on_ack_with_event = functools.partial(on_ack, event)
print("Posting message")
session.post(QUEUE_URI, b"\xde\xad\x00\x00\xbe\xef", on_ack=on_ack_with_event)
print("Waiting for acknowledgement")
event.wait(timeout=5.0)
if __name__ == "__main__":
main()
Consumer with threading.Event
A complete example of a program that consumes messages from a BlazingMQ
queue. The main thread is blocked waiting for threading.Event
to be set upon
receiving SIGTERM
, while incoming messages are processed in the callback on
the BlazingMQ event handler thread.
# Copyright 2019-2023 Bloomberg Finance L.P.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import signal
import threading
from typing import Any
import blazingmq
QUEUE_URI = "bmq://bmq.test.mmap.priority/blazingmq-examples"
EXITING = threading.Event()
def on_message(msg: blazingmq.Message, msg_handle: blazingmq.MessageHandle) -> None:
print("Confirming: ", msg)
msg_handle.confirm()
def main() -> None:
with blazingmq.Session(
blazingmq.session_events.log_session_event,
on_message=on_message,
) as session:
print("Connected to BlazingMQ broker")
print("Send SIGTERM to exit")
session.open_queue(QUEUE_URI, read=True)
EXITING.wait()
print("Waiting to process all outstanding messages")
session.configure_queue(QUEUE_URI, blazingmq.QueueOptions(0, 0, 0))
print("Session stopped.")
def on_signal(signum: int, _frame: Any) -> None:
print(f"Received signal: {signum}. Exiting...")
EXITING.set()
if __name__ == "__main__":
signal.signal(signal.SIGINT, on_signal) # handle CTRL-C
signal.signal(signal.SIGTERM, on_signal)
main()
Consumer with queue.Queue
Correct synchronization may be difficult to implement. It sometimes helps to use the
Python standard library queue.Queue
. The following example consumes messages from
a BlazingMQ queue and uses queue.Queue
for synchronization.
The main thread is blocked in a Queue.get
while all new messages are immediately
added to the in-process queue. There will be no more than max_unconfirmed_messages
in the in-process queue at any given time (unless more than max_unconfirmed_bytes
was received first) because the broker will pause delivery once
this value has been reached. Once SIGTERM
is received, a sentinel object is added to
the in-process queue; all BlazingMQ messages received after the signal will be ignored.
Also note that, in this example, we provide suspends_on_bad_host_health=True
when we open the queue. This stops the queue from receiving messages if the
machine is marked unhealthy, so that we don’t unintentionally process a message
on an unhealthy machine.
# Copyright 2019-2023 Bloomberg Finance L.P.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import queue
import signal
from typing import Any
from typing import Optional
import blazingmq
QUEUE_URI = "bmq://bmq.test.mmap.priority/blazingmq-examples"
MESSAGES: queue.Queue[Optional[blazingmq.Message]] = queue.Queue()
def on_message(msg: blazingmq.Message, _msg_handle: blazingmq.MessageHandle) -> None:
MESSAGES.put(msg)
def main() -> None:
print("Starting consumer2")
print("Send SIGTERM to exit.")
with blazingmq.Session(
blazingmq.session_events.log_session_event,
on_message=on_message,
) as session:
print("Connected to BlazingMQ broker")
session.open_queue(
QUEUE_URI,
read=True,
options=blazingmq.QueueOptions(
max_unconfirmed_messages=100,
suspends_on_bad_host_health=False,
),
)
while True:
msg = MESSAGES.get()
if msg is None:
break
print("Confirming: ", msg)
session.confirm(msg)
print("Waiting to receive all outstanding messages")
session.configure_queue(QUEUE_URI, blazingmq.QueueOptions(0, 0, 0))
print("Session stopped.")
def on_signal(signum: int, _frame: Any) -> None:
print(f"Received signal: {signum}. Exiting...")
MESSAGES.put(None)
if __name__ == "__main__":
signal.signal(signal.SIGINT, on_signal) # handle CTRL-C
signal.signal(signal.SIGTERM, on_signal)
main()