Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Subscriptions

Introduction

Subscriptions provide consumer applications a powerful mechanism to only receive the messages from a queue that match a specified expression. In essence, subscriptions allow the user to achieve topic-based message filtering and/or message routing.

In the absence of subscriptions, a consumer attached to a queue can receive and should be able to process any and all messages posted on the queue. In other words, the queue is viewed as a logical stream of homogeneous data. While this may work in some or most cases, there are scenarios where this is limiting.

For example, a user may prefer one set of consumers to handle messages of a certain type, and another set of consumers to handle messages of a certain other type. Or, a user may have a queue of messages that should all be processed by some consumer applications, but certain applications may only be interested in a certain subset of messages and want to ignore messages of a certain type. This is where subscriptions come in – they enable consumer applications to “subscribe” to messages of a certain type and enable users to filter out messages for certain applications but not others, thereby logically converting a queue into a stream of heterogeneous data.

Concretely speaking, producer applications can put any interesting message attributes in the message properties section of the message (message properties are a list of key/value pairs that a producer can associate with a message), and consumers can request BlazingMQ to filter messages using one or more of those message properties.

For example, if a message contains these three properties:

  • CustomerId = 1234
  • DestinationId = "ABC"
  • OrderType = EXPRESS

A consumer can provide a filter (“subscription expression”) like so to “match” the above message:

  • CustomerId == 1234 && OrderType == EXPRESS

In this case, a message having the properties as shown above will be routed to the consumer with the above filter (note that if a property is not specified in the subscription expression, it is considered to be a wildcard).

Similarly, users can spin up any number of consumers, each with different filters. Users have to ensure that every message can be processed by at least one consumer.

Detailed Overview

This section provides an in-depth overview of subscriptions – motivation, high level design, selective implementation details, etc. This section assumes that reader is familiar with various routing strategies (aka ‘queue modes’) as well as general BlazingMQ terminology like PUT, PUSH, ACK, CONFIRM messages, etc.

Subscription Types

BlazingMQ provides two types of subscriptions:

  • Application Subscriptions (message filtering)
  • Consumer Subscriptions (message routing)

Users can leverage either or both types of subscriptions to achieve the desired behavior. The two types of subscriptions are described below.

Application Subscriptions

Application Subscriptions provide the ability to filter out messages from an application’s queue in the BlazingMQ broker.

When a message is produced to a queue, BlazingMQ will evaluate all Application Subscriptions and auto-confirm the message on behalf of an application if the message does not match the application’s subscription expression. Since BlazingMQ only routes unconfirmed messages to consumers, consumers will only receive messages that match the configured Application Subscription.

Application Subscriptions are specified in the domain’s configuration:

  • Application Subscriptions are configured and evaluated per-AppId for fanout queues.
    • Note the BlazingMQ broker will still store each message until it is confirmed by all AppIds, either via auto-confirm or a consumer.
  • Application Subscriptions are configured with an empty AppId (i.e. appId="") for priority and broadcast queues. Auto-confirms apply to all consumers of these queues.

Consumer Subscriptions

Consumer subscriptions allow each consumer instance to express the messages it is capable of processing when it attaches to the queue. This allows users to define the subset of consumers that BlazingMQ can route any given message to.

When a message is produced to a queue, BlazingMQ will evaluate all Application Subscriptions (as described above), and then evaluate Consumer Subscriptions to determine which consumers are capable of processing the message. Then, all standard routing logic (i.e. consumer priorities, round-robin, respecting maxUnconfirmed* configurations) is used to deliver the message to a consumer.

Notes:

  • BlazingMQ will only route a message to a consumer if the message matches that consumer’s subscription. If a consumer has no subscription, BlazingMQ can route any message to it.

  • If there is no matching consumer subscription for a message, the message will remain in the queue, unconfirmed, until a consumer configures a subscription matching the message. The message will count against the configured queue/domain quota limits until it is confirmed or expires due to TTL.

  • Each consumer instance can specify a different subscription.

  • Users have to ensure that every message can be processed by at least one consumer.

Background

  1. Consider a scenario where multiple consumers are attached to a queue in work queue or priority mode. Without subscriptions, there is no way for consumers to specify any restrictions on the messages that BlazingMQ will route to them. All consumers must be in a position to process any message posted on the queue. As an example, let’s assume that all messages being posted to the queue have an attribute called CustomerId with various possible values representing different customers. There could be a case where some consumers can process only those messages where CustomerId == 1000 while others can process messages only where CustomerId == 2000 and so on.

  2. Consider a slight variation of above scenario, such that although every consumer can process message with any CustomerId, but in order to provide better isolation and QoS, owners of the consumer application want to dedicate a set of consumers for every firm. So M consumers are dedicated for CustomerId == 1000 , N consumers for CustomerId == 2000 , etc.

  3. Consider another scenario where one or more consumers are attached to a queue, but none of them are ever interested in messages where CustomerId is different from 1000 or 2000 . Such a scenario can occur where one team is producing on the queue and other teams are consuming from it, and the later don’t control the types of messages being posted on the queue.

Without subscriptions, there is no way in BlazingMQ in which users can implement a solution for these scenario. Some workarounds exist. Let’s look at them:

  • Scenarios (1) and (2) can be solved by creating one BlazingMQ queue for each CustomerId and having producer(s) post a message on the appropriate queue. This solution can work as long as the total number of CustomerIds (and thus, queues) is limited to a few hundred (a large number of queues impacts applications’ and BlazingMQ cluster’s startup and failover times). Moreover, producer and consumer applications would have to agree on the naming convention of the queues.

  • Scenario (3) can be implemented by consumer applications by simply confirming those messages which they are not interested in. However, such messages are still routed to the consumers, which leads to wasted network bandwidth and CPU.

Design

Scenarios described in the previous section can be solved by introducing the notion of subscriptions in BlazingMQ, and routing a message to consumer(s) with matching subscription(s). Here’s how subscriptions work at a high level:

  • Producers add any ‘interesting’ attributes of the message in its message properties.

  • Users specify one or more Application Subscriptions in the domain configuration for one or more AppIds. Each AppId can have one or more boolean expression containing one or more message properties. If there is no subscription for an AppId, the application will receive all messages.

  • Consumers specify one or more boolean expressions when opening the queue. Each expression can contain one or more message properties. As an example, an expression can look like:

    • CustomerId == 1000
    • CustomerId == 2000 && DestinationId == "ABC"
    • CustomerId >= 1000 && CustomerId <= 2000

    where CustomerId and DestinationId are message properties. Every such expression will be a subscription and we will be using these two terms interchangeably. Of course, subscriptions are optional and if a consumer does not specify any subscriptions, it will receive all of the messages posted on the queue.

  • In addition to specifying subscriptions when opening the queue (using the openQueue API), consumers will also be able to specify them when configuring the queue (configureQueue API). Consumers will be able to tweak existing subscriptions, remove an existing subscription or add a new subscription using the configureQueue API. In that regard, subscriptions will be completely dynamic.

  • In addition, consumers will also be able to specify certain options like priority and flow-control parameters for each subscription. Recall that currently, these options – bmqt::QueueOptions – are specified for the entire queue. However, after the introduction of subscriptions, consumers will be able to specify different values for each subscription. This means that priorities will now be applicable at the subscription level. For example, a consumer application can advertise priority 10 for one subscription, and priority 5 for another. These priorities will be taken into consideration by BlazingMQ when routing messages (as is currently the case). In addition, it will also help BlazingMQ provide some determinism when routing a message in a queue having multiple subscriptions (see Order of Evaluation bullet in Implementation Details section below).

  • Existing APIs will continue to work and consumer applications which do not use subscriptions will not need to make any changes.

  • In the BlazingMQ back-end, upon the arrival of a new message, the BlazingMQ primary node will first check each Application Subscription. The message will be auto-confirmed for each application that does not have a matching subscription. If there is a matching Application Subscription, Blazing will then try to match the message with a consumer’s subscription and route the message to the corresponding consumer instance. See Implementation Details section below for more info.

  • Multiple expressions can be provided when using Application and/or Consumer Subscriptions. The BlazingMQ primary node will check if a message matches each provided expression, resulting in an implicit “OR” between expressions.

Implementation Details

The Design section above gives a high level overview of the feature. There are, however, some additional details which are worth specifying.

  1. Overlapping Consumer Subscriptions: if consumers specify overlapping subscriptions (e.g., CustomerId == 0 and CustomerId >= 0 ), BlazingMQ will not make any attempt to merge those subscriptions, and the two subscriptions will be treated independently of each other. NOTE: While overlapping subscriptions are supported as described in this and next bullet, having overlapping subscriptions is questionable and can be confusing, and generally speaking, should be avoided.

  2. Order of Evaluation: A very common scenario would be a queue having consumers with different subscriptions. When a message arrives in a queue, BlazingMQ primary node takes the greedy approach and routes the message to the first matching subscription. A natural question that arises is in what order will the primary node evaluate subscriptions? BlazingMQ primary node will attempt to order subscriptions by priority and evaluate them from highest to lowest priority. This will help provide some determinism to the order of evaluation and can also help users implement interesting scenarios. As an example, taking the case of overlapping subscriptions in the previous bullet, if CustomerId == 0 subscription has priority 10 and CustomerId >= 0 subscription has priority 5, BlazingMQ will always try to route a message with CustomerId = 0 attribute to the CustomerId == 0 subscription.

  3. Order of Message Delivery: Messages will be evaluated for matching subscriptions in the order that they arrive on the queue. So if a matching subscription exists for every message, messages will be dispatched to consumer(s) in order. Of course, if there are multiple consumers, each with different subscriptions, messages could be processed by out of order. This behavior will be same as priority as well as fan-out modes. Order of delivery will not be guaranteed in case messages are re-routed due to consumer crashes. Again, this behavior will be same as priority as well as fan-out modes.

  4. Spillover to Lower Priority Consumers: If multiple consumers attach to a queue with the same subscription but with different priorities, BlazingMQ will route messages only to the consumer with highest priority. If there are multiple consumers with highest priority, BlazingMQ will round-robin messages across them. More importantly, if highest priority consumers reach capacity (flow-controlled), messages will not be spilled to lower priority consumers. This behavior is similar to the existing logic in various queue modes in BlazingMQ.

  5. Merging of Subscriptions: In case multiple consumers specify the same subscription, BlazingMQ back-end will seamlessly merge them by combining the options advertised by the consumers for that subscription. For example, if consumer A subscribes to CustomerId == 1000 with priority 10 and capacity X , and consumer B subscribes to CustomerId == 1000 with priority 10 and capacity Y , the two subscriptions will be merged by an intermediate hop in BlazingMQ back-end and advertised to the primary node as CustomerId == 1000 with priority 10 and capacity X + Y . Two subscriptions will be determined to be same if they compare equal lexicographically. Merging of subscriptions will ensure that load-balancing across consumers having same subscription works seamlessly.

  6. Evaluating Expressions: BlazingMQ primary node will match subscriptions by reading corresponding message properties from the message and evaluating the expression. For example, while evaluating the CustomerId == 1000 expression, primary node will read CustomerId property from the message and compare its value against 1000 to determine a match. It is worth noting that BlazingMQ back-end can read a property in a message extremely efficiently (in O(1) time) – we essentially build a hashtable on the wire.

Expression Language

The expression language for subscriptions implements basic string and integer manipulation, as a tiny subset of the C programming language.

Expression

  • An expression consists of a combination of:
    • Identifiers
    • Integer, string and boolean literals
    • Arithmetic, relational and boolean operators
    • Parentheses
  • Spaces, tabs and line feeds are ignored
  • The language has three types: integer, string, and boolean
  • The final result of an expression must be a boolean
  • Limited to 128 characters in length

Identifiers

  • Identifiers consist of a combination of upper and lower-case alphabetical letters, digits, and underscores.

  • The first character may not be an underscore.

  • Property names are case sensitive. Example of valid identifiers are: ask, Ask, askPrice, ask_price, sp500; note that ask and Ask are different identifiers.

  • Examples of invalid identifiers are:
    • _ask (begins with underscore)
    • 1Y (begins with digit)
    • a+b (would be parsed as three tokens: a, +, and b).
  • When an identifier is evaluated, the value of the eponymous property, in the current message, is returned.

  • Note that the identifiers in an expression are not necessarily all evaluated (see boolean operators below).

Integer Literals

  • Integer literals consist of a sequence of digits, possibly preceded by the minus sign.

String Literals

  • String literals consist of a sequence of characters, enclosed in double quotes.

  • To insert a double quote or a backslash in the sequence, prefix it with a backslash (thus: "a\"b" is the string a"b, and "a\\b" is the string a\b).

  • No character other than " or \ is allowed after a \. This is the syntax of strings in the C language.

Boolean Literals

  • Boolean literals are true and false.

Arithmetic Operators

  • Arithmetic operators are:
    • +
    • -
    • *
    • /
    • % (modulus)
  • They require two integer arguments.

Relational Operators

  • Relational operators are:
    • =
    • !=
    • <
    • <=
    • >
    • >=
  • They require two arguments of the same type.

Boolean Operators

  • Boolean operators are:
    • &&
    • || (junctions)
    • ! (negation).
  • Negation requires one argument
  • Junctions require two boolean arguments
  • Junctions are short-circuiting: if the left side of && is false, or if the left side of || is true, the right side is not evaluated.
    • As a consequence, identifiers in an expression do not all need to have a corresponding property of the right type.
    • For example, type == "i" && shares 1000 || shares == "all" is valid as long as the value of type and the type of shares are properly correlated. order == "limit" and limit == 0 can be used to detect the messages that represent a Limit Order with a silly limit value, and e.g. log an error message.

Operator Precedence

  • Operator precedence and associativity are the same as in C; from high precedence to low:
OperatorAssociativity
!Right
* / %Left
+ -Left
< <= > >=Left
== !=Left
&&Left
||Left
  • Expressions between parentheses are evaluated first, starting with the innermost.

  • Operations with higher precedence are evaluated before operations with lower precedence. For example, a*x+b>0 is evaluated as ((a*x)+b)>0.

  • When two operators have the same precedence, they are evaluated from left to right, except for ! which is evaluated right to left. For example, in a-b-c is evaluated as (a-b)-c. !!ok is evaluated as !(!ok).

For a formal specification, see Flex rules for the tokenizer and the Bison grammar.