Explains how message sessions can be used to de-multiplex interleaved message streams and guaranteed ordered delivery, for more posts in this series go to Contents.
Usually, when someone asks what the characteristics of a queue is most people would say First-in, first out. The thing is though with regular Azure Service Bus queues that is not guaranteed. Simply because you might do concurrent receive operations on the same queue or a message that failed processing will be handled later due to retries. In contrast, other messages are continued to be processed. Kevin Sookocheff has a great article that goes into more detail about FIFO by looking at AWS SQS FIFO queues.
In most scenarios, it is worth relaxing constraints on ordering because it allows us to better scale. In addition to that, many business cases where we think we might require order turn out to not really require strict ordering or can be achieved by introducing a saga. But as we all know there is usually no definite answer like “you will never need strict ordering”, sometimes it might come in handy when you need true first in first out or want to implement a request-response pattern for streaming messages in a strict order back to the initiator of the request.
In such scenarios, Azure Service Bus has the concept of Message sessions. Sessions enable joint and ordered handling of unbounded sequences of related messages. For example, once a queue has session support enabled,
var client = new ServiceBusAdministrationClient(connectionString);
var queueDescription = new CreateQueueOptions(destination)
{
RequiresSession = true
};
await client.CreateQueueAsync(queueDescription);
a SessionId can be used to group messages that are related together into the same session. All messages that belong to the same session id will be strictly ordered. A receiver accepting a message session of a certain SessionId holds an exclusive lock on that session until it releases the lock (either explicitly or implicitly due to a lock expiration).
The above list of messages sent by the client contains multiple messages that all belong to a different SessionId. For convenience, colors are used to identify the SessionId. In reality, it can be anything, for example, a tenant id, an aggregate id, or something else that has a meaning in your domain. Conceptually a session is like a sub-queue within the queue that has sessions enabled. Per sub-queue, only one receiver can receive a message, and all messages must be handled in order. So for the above example, we can imagine the destination queue of the sender having the following sub-queues:
- Orange
- Green
- Blue
- Purple
The order of messages in the orange sub-queue would be Orange 1, Orange 2, Orange 3 and Orange 4. If a receiver consumes the Orange SessionId and for example fails to process message with content Orange 2 the broker will redeliver Orange 2 in the next receive operation.
Now comes in my mind the fascinating part of sessions. Sessions can store state up to the maximum message size allowed on the tier that you are on. So for Service Bus Standard the session state can contain up to 256 KB and 1 MB for the Service Premium tier state that is associated with the session until it is cleared. This allows to exchange binary data that is stored reliably on Azure Service Bus per session. This can be very handy if additional metadata needs to be stored and associated with the session. For example, session consumer can store processing information in the session state, should the receiver fail, and the same session would be handled by another processor, the previously stored session state is instantly available on the new processor. Effectively the session state is a highly available state store for session receivers that is bound to a session id meaning the state of one session cannot leak into the other.
To handle messages in a session we need to register a session handler like shown below.
await using var receiver = serviceBusClient.CreateSessionProcessor(destination, new ServiceBusSessionProcessorOptions
{
AutoCompleteMessages = true,
MaxConcurrentSessions = 1,
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
SessionIdleTimeout = TimeSpan.FromSeconds(2)
});
receiver.ProcessMessageAsync += async processMessageEventArgs =>
{
var message = processMessageEventArgs.Message;
await Console.Error.WriteLineAsync(
$"Received message on session '{processMessageEventArgs.SessionId}' with '{message.MessageId}' and content '{Encoding.UTF8.GetString(message.Body)}'");
};
await receiver.StartProcessingAsync();
The session options allow us to control how many concurrent session we allow to execute concurrently, for example. Let’s see them in action
Updated: 2021-03-26 to use the new SDK
Hi Daniel, this is great information.
One piece I am trying to understand better is related to the maximum size of the state than can be stored. You stated that this is between 256k and 1 MB. Is this per session or overall? (I assume the former, but wanted to check). Also, how long does this state stay around? We are using this for a system that will have thousands of orders come through the system every day. We are concerned of the storage that might be needed if it stays around a long time. It really would not have to hang around for more than a couple hours for how we are using it, but I don’t see that we have any control of it being cleaned up.
Thanks!
Hi Wayne
Per session
https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions#message-session-state
and it depends on the tier you are on.
> Session state remains as long as it isn’t cleared up (returning null), even if all messages in a session are consumed.
Hope that helps
Daniel