Azure Service Bus .NET SDK Deep Dive – Sessions

Explains how message sessions can be used to de-multiplex interleaved message streams and guaranteed ordered delivery, for more posts in this series go to Contents.

Usually, when someone asks what the characteristics of a queue is most people would say First-in, first out. The thing is though with regular Azure Service Bus queues that is not guaranteed. Simply because you might do concurrent receive operations on the same queue or a message that failed processing will be handled later due to retries. In contrast, other messages are continued to be processed. Kevin Sookocheff has a great article that goes into more detail about FIFO by looking at AWS SQS FIFO queues.

In most scenarios, it is worth relaxing constraints on ordering because it allows us to better scale. In addition to that, many business cases where we think we might require order turn out to not really require strict ordering or can be achieved by introducing a saga. But as we all know there is usually no definite answer like “you will never need strict ordering”, sometimes it might come in handy when you need true first in first out or want to implement a request-response pattern for streaming messages in a strict order back to the initiator of the request.

In such scenarios, Azure Service Bus has the concept of Message sessions. Sessions enable joint and ordered handling of unbounded sequences of related messages. For example, once a queue has session support enabled,

var client = new ManagementClient(connectionString);
var queueDescription = new QueueDescription(destination)
{
    RequiresSession = true
};
await client.CreateQueueAsync(queueDescription);

a SessionId can be used to group messages that are related together into the same session. All messages that belong to the same session id will be strictly ordered. A receiver accepting a message session of a certain SessionId holds an exclusive lock on that session until it releases the lock (either explicitly or implicitly due to a lock expiration).

The above list of messages sent by the client contains multiple messages that all belong to a different SessionId. For convenience, colors are used to identify the SessionId. In reality, it can be anything, for example, a tenant id, an aggregate id, or something else that has a meaning in your domain. Conceptually a session is like a sub-queue within the queue that has sessions enabled. Per sub-queue, only one receiver can receive a message, and all messages must be handled in order. So for the above example, we can imagine the destination queue of the sender having the following sub-queues:

  • Orange
  • Green
  • Blue
  • Purple

The order of messages in the orange sub-queue would be Orange 1, Orange 2, Orange 3 and Orange 4. If a receiver consumes the Orange SessionId and for example fails to process message with content Orange 2 the broker will redeliver Orange 2 in the next receive operation.

Now comes in my mind the fascinating part of sessions. Sessions can store state up to the maximum message size allowed on the tier that you are on. So for Service Bus Standard the session state can contain up to 256 KB and 1 MB for the Service Premium tier state that is associated with the session until it is cleared. This allows to exchange binary data that is stored reliably on Azure Service Bus per session. This can be very handy if additional metadata needs to be stored and associated with the session. For example, session consumer can store processing information in the session state, should the receiver fail, and the same session would be handled by another processor, the previously stored session state is instantly available on the new processor. Effectively the session state is a highly available state store for session receivers that is bound to a session id meaning the state of one session cannot leak into the other.

To handle messages in a session we need to register a session handler like shown below.

var client = new QueueClient(connectionString, destination);
client.RegisterSessionHandler(
    (session, message, token) =>
    {
        Console.WriteLine(
            $"Received message on session '{session.SessionId}' with '{message.MessageId}' and content '{Encoding.UTF8.GetString(message.Body)}'");
        return Task.CompletedTask;
    },
    new SessionHandlerOptions(
        exception =>
        {
            Console.WriteLine($"Exception: {exception.Exception}");
            Console.WriteLine($"Action: {exception.ExceptionReceivedContext.Action}");
            Console.WriteLine($"ClientId: {exception.ExceptionReceivedContext.ClientId}");
            Console.WriteLine($"Endpoint: {exception.ExceptionReceivedContext.Endpoint}");
            Console.WriteLine($"EntityPath: {exception.ExceptionReceivedContext.EntityPath}");
            return Task.CompletedTask;
        })
    {
        AutoComplete = true,
        MaxConcurrentSessions = 1,
        MessageWaitTimeout = TimeSpan.FromSeconds(2),
        MaxAutoRenewDuration = TimeSpan.FromMinutes(10)
    }
);

The session options allow us to control how many concurrent session we allow to execute concurrently, for example. Let’s see them in action

About the author

Daniel Marbach

Add comment

Recent Posts