Examples

Producer

A complete example of a program that posts a message and waits for it to be acknowledged by the broker.

Note that on_ack is an optional argument. However, receiving acknowledgment is the only way to guarantee that your message was received by the broker. In this example, messages are sent in a fully synchronous fashion - the program waits for an acknowledgement before terminating.

# Copyright 2019-2023 Bloomberg Finance L.P.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import functools
import threading

import blazingmq

QUEUE_URI = "bmq://bmq.test.mmap.priority/blazingmq-examples"


def on_ack(event: threading.Event, ack: blazingmq.Ack) -> None:
    if ack.status != blazingmq.AckStatus.SUCCESS:
        print("Received NAck: %r" % ack)
    else:
        print("Received Ack: %r" % ack)
    event.set()


def main() -> None:
    with blazingmq.Session(blazingmq.session_events.log_session_event) as session:
        print("Connected to BlazingMQ broker")
        session.open_queue(QUEUE_URI, write=True)
        event = threading.Event()
        on_ack_with_event = functools.partial(on_ack, event)
        print("Posting message")
        session.post(QUEUE_URI, b"\xde\xad\x00\x00\xbe\xef", on_ack=on_ack_with_event)
        print("Waiting for acknowledgement")
        event.wait(timeout=5.0)


if __name__ == "__main__":
    main()

Consumer with threading.Event

A complete example of a program that consumes messages from a BlazingMQ queue. The main thread is blocked waiting for threading.Event to be set upon receiving SIGTERM, while incoming messages are processed in the callback on the BlazingMQ event handler thread.

# Copyright 2019-2023 Bloomberg Finance L.P.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import signal
import threading
from typing import Any

import blazingmq

QUEUE_URI = "bmq://bmq.test.mmap.priority/blazingmq-examples"
EXITING = threading.Event()


def on_message(msg: blazingmq.Message, msg_handle: blazingmq.MessageHandle) -> None:
    print("Confirming: ", msg)
    msg_handle.confirm()


def main() -> None:
    with blazingmq.Session(
        blazingmq.session_events.log_session_event,
        on_message=on_message,
    ) as session:
        print("Connected to BlazingMQ broker")
        print("Send SIGTERM to exit")
        session.open_queue(QUEUE_URI, read=True)

        EXITING.wait()
        print("Waiting to process all outstanding messages")
        session.configure_queue(QUEUE_URI, blazingmq.QueueOptions(0, 0, 0))
    print("Session stopped.")


def on_signal(signum: int, _frame: Any) -> None:
    print(f"Received signal: {signum}. Exiting...")
    EXITING.set()


if __name__ == "__main__":
    signal.signal(signal.SIGINT, on_signal)  # handle CTRL-C
    signal.signal(signal.SIGTERM, on_signal)
    main()

Consumer with queue.Queue

Correct synchronization may be difficult to implement. It sometimes helps to use the Python standard library queue.Queue. The following example consumes messages from a BlazingMQ queue and uses queue.Queue for synchronization.

The main thread is blocked in a Queue.get while all new messages are immediately added to the in-process queue. There will be no more than max_unconfirmed_messages in the in-process queue at any given time (unless more than max_unconfirmed_bytes was received first) because the broker will pause delivery once this value has been reached. Once SIGTERM is received, a sentinel object is added to the in-process queue; all BlazingMQ messages received after the signal will be ignored.

Also note that, in this example, we provide suspends_on_bad_host_health=True when we open the queue. This stops the queue from receiving messages if the machine is marked unhealthy, so that we don’t unintentionally process a message on an unhealthy machine.

# Copyright 2019-2023 Bloomberg Finance L.P.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import queue
import signal
from typing import Any
from typing import Optional

import blazingmq

QUEUE_URI = "bmq://bmq.test.mmap.priority/blazingmq-examples"

MESSAGES: queue.Queue[Optional[blazingmq.Message]] = queue.Queue()


def on_message(msg: blazingmq.Message, _msg_handle: blazingmq.MessageHandle) -> None:
    MESSAGES.put(msg)


def main() -> None:
    print("Starting consumer2")
    print("Send SIGTERM to exit.")
    with blazingmq.Session(
        blazingmq.session_events.log_session_event,
        on_message=on_message,
    ) as session:
        print("Connected to BlazingMQ broker")
        session.open_queue(
            QUEUE_URI,
            read=True,
            options=blazingmq.QueueOptions(
                max_unconfirmed_messages=100,
                suspends_on_bad_host_health=False,
            ),
        )

        while True:
            msg = MESSAGES.get()
            if msg is None:
                break
            print("Confirming: ", msg)
            session.confirm(msg)
        print("Waiting to receive all outstanding messages")
        session.configure_queue(QUEUE_URI, blazingmq.QueueOptions(0, 0, 0))

    print("Session stopped.")


def on_signal(signum: int, _frame: Any) -> None:
    print(f"Received signal: {signum}. Exiting...")
    MESSAGES.put(None)


if __name__ == "__main__":
    signal.signal(signal.SIGINT, on_signal)  # handle CTRL-C
    signal.signal(signal.SIGTERM, on_signal)
    main()