Azure Service Bus .NET SDK Deep Dive – Publish / Subscribe with Topics

Shows how to publish and subscribe with topics, for more posts in this series go to Contents.

In contrast to queues, in which a single consumer processes each message, topics and subscriptions provide a one-to-many form of communication, in a publish/subscribe pattern. The pattern is useful for scaling to large numbers of recipients, and each published message is made available to each subscription registered with the topic. The main benefits are:

  • The publisher only knows Topics
  • Receivers only know about Subscriptions (a way of subscribing to the topic)
  • Subscriptions allow having complex filter rules and actions associated with it (subscriptions can have many rules)
  • Rules are evaluated using OR logic

Messages are sent to a topic and delivered to one or more associated subscriptions, depending on filter rules that can be set on a per-subscription basis. The subscriptions can use additional filters to restrict the messages that they want to receive. Messages are sent to a topic in the same way they are sent to a queue, but messages are not received from the topic directly. Instead, they are received from subscriptions. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.

In the above example, when the sender or publisher publishes a message to the topic two copies of the message get created. A copy of the message is then sent to each subscription. If the rule matches the message goes into the subscription, if the rule doesn’t match, the message is “discarded”.

Before we dive deeper into subscriptions and rules, let’s first explore how we can create a topic that allows us to decouple the publisher from the subscribers.

var topicName = "topic"
var client = new ManagementClient(connectionString);
var topicDescription = new TopicDescription(topicName);
await client.CreateTopicAsync(topicDescription);

Let’s suppose we want to create two subscriptions. One is called alwaysInRush and the other one is called maybeRich. The subscription alwaysInRush should only get messages that have a rush label while the maybeRich subscription should only match messages that have a user property called currency set to CHF (Swiss francs). Let’s create those subscriptions and rules associated with it.

var rushSubscription = "alwaysInRush";
var subscriptionDescription = new SubscriptionDescription(topicName, rushSubscription);
await client.CreateSubscriptionAsync(subscriptionDescription);

var currencySubscription= "maybeRich";
subscriptionDescription = new SubscriptionDescription(topicName, currencySubscription);
await client.CreateSubscriptionAsync(subscriptionDescription);

After the subscriptions have been created it is possible to attach rules to it. But what is a rule and what types of rules can be created?

A rule has a name, is attached to a subscription, has a filter and an action.

  • Boolean Filter: Provides a TrueFilter and a FalseFilter that either causes all arriving messages to be accepted (true) or none of the arriving messages (false) to be accepted for the subscription.
  • SQL Filter: It can hold a SQL-like conditional expression that is evaluated on the broker against all the messages. It is possible to inspect user-defined and system properties. The SQL-language subset for filter conditions tests for the existence of properties (EXISTS), as well as for null-values (IS NULL), logical NOT/AND/OR, relational operators, simple numeric arithmetic, and simple text pattern matching with LIKE.
  • Correlation Filter: A CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message’s user and system properties. They match when a user-defined or a system property is equal to the value specified in the correlation filter. String comparison is case sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition, meaning for the filter to match, all conditions must match.
  • Actions: Can be combined with SQL Filter. Actions allow for annotating the message matching a filter by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.

All filters evaluate message properties. Filters cannot evaluate the message body. Complex filter rules require processing capacity. In particular, the use of SQL filter rules results in lower overall message throughput at the subscription, topic, and namespace level. Whenever possible, applications should choose correlation filters over SQL-like filters, since they are much more efficient in processing and therefore have less impact on throughput. Nonetheless, let’s see what fancy things we can do with rules.

var ruleDescription = new RuleDescription
{
    Name = "MessagesWithRushlabel",
    Filter = new CorrelationFilter
    {
        Label = "rush"
    },
    Action = null
};
await client.CreateRuleAsync(topicName, rushSubscription, ruleDescription);

ruleDescription = new RuleDescription
{
    Name = "MessagesWithCurrencyCHF",
    Filter = new SqlFilter("currency = 'CHF'"),
    Action = new SqlRuleAction("SET currency = 'Złoty'")
};
await client.CreateRuleAsync(topicName, currencySubscription, ruleDescription);

The rule MessageWithRushLabel creates a CorrelationFilter that matches when the label on the message is set to rush. The rule MessageWithCurrencyCHF is a bit more complex. It adds a SqlFilter that matches the user property currency = CHF and just for fun adds a SqlRuleAction that overrides that property to Złoty (Polish currency). Let’s send some messages to the topic.

var client = new MessageSender(connectionString, topicName);
var message = new Message();
message.Body = Encoding.UTF8.GetBytes("Damn I have not time!");
message.Label = "rush";
await client.SendAsync(message);

message = new Message();
message.Body = Encoding.UTF8.GetBytes("I'm rich! I have 1000");
message.UserProperties.Add("currency", "CHF");
await client.SendAsync(message);

Let’s explore the alwaysInRush subscription

and last but not least the maybeRich subscription.

About the author

Daniel Marbach

2 comments

By Daniel Marbach

Recent Posts