Azure Service Bus .NET SDK Deep Dive – Receiving a message

Explains receiving a message from a queue, for more posts in this series go to Contents.

In the previous post I showed the basic bits to send a message into a queue. But how can we get it out again? The good news is that the SDK makes it pretty simple to fetch a message from a queue. We can register an event handler delegate on the receiverclient. The delegate is automatically called when a message arrives in the queue we are listening to.

 var processorOptions = new ServiceBusProcessorOptions
 {
     AutoCompleteMessages = false,
     MaxConcurrentCalls = 1,
     MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
     ReceiveMode = ServiceBusReceiveMode.PeekLock,
     PrefetchCount = 10
 };
 
 await using var receiver = serviceBusClient.
   CreateProcessor(destination, processorOptions); 

 receiver.ProcessMessageAsync += async messageEventArgs =>
 {
     var message = messageEventArgs.Message;
     // throw new InvalidOperationException();
     await messageEventArgs.CompleteMessageAsync(message);
 };
await receiver.StartProcessingAsync();

As we can see, it is possible to influence the behavior of the message pump that fetches messages from the queue by providing appropriate ServiceBusProcessorOptions. For example, by default the SDK auto-completes messages when the delegate did not throw an exception. In certain cases we might want to take control over message completion or message abandoning. If that is the case the AutoCompleteMessages flag can be set to false. If done so it is the job of the handler to call CompleteMessageAsync. If an exception is thrown in the handler and CompleteMessageAsync hasn’t been called the message is retried and redelivered.

Furthermore we can control the concurrency that we are allowing to happen on the message handler delegate. With the MaxConcurrentCalls property set to one we tell the SDK we only ever want one delegate at a time to be executed. So essentially we are sequentially processing messages one by one. The setting can be tweaked according to the limiting resources at play. For example if you database only ever allows 100 concurrent calls at any given time it might be advisable to set MaxConcurrentCalls to 100. So even if your queue had more than a hundred messages available only ever a hundred concurrent receives would be happening.

The PrefetchCount setting controls how many messages are actively prefetched from Azure Service Bus and held locally for future processing. This setting comes in handy to optimize the number of round trips needed to fetch messages from the cloud.

MaxAutoLockRenewalDuration is a setting that allows to control how long the message lock is extended by the SDK. By default Azure Service Bus uses a mechanism called Peek Lock. A receiver acquires a look on a message. During the duration of the lock the message is not visible for any other consumer on the same queue. The setting allows the receiver to control how long a peek lock duration is extended. In the above case it would mean that the message handler could take up to ten minutes. If it doesn’t complete within ten minutes the message currently processed is released to the input queue and might be consumed by other competing consumers (essentially processing the same message multiple times). Extending the peek lock duration is a best effort operation and not guaranteed.

Sending and receiving a message in action
 receiver.ProcessErrorAsync += async errorEventArgs =>
 {
     await Out.WriteLineAsync($"Exception: 
   {errorEventArgs.Exception}");
     await Out.WriteLineAsync($"FullyQualifiedNamespace: 
   {errorEventArgs.FullyQualifiedNamespace}");
     await Out.WriteLineAsync($"ErrorSource: 
   {errorEventArgs.ErrorSource}");
     await Out.WriteLineAsync($"EntityPath: 
   {errorEventArgs.EntityPath}");
 }; 

The above video demonstrates the discussed code in action. Now let’s see what happens when an exception is thrown.

The Azure Service Bus SDK calls the delegate that we passed to the ServiceBusProcessorOptions with all the details of what went wrong. The message delivery is retried multiple times up to the delivery count on the queue.

Updated: 2021-03-23 to use the new SDK

About the author

Daniel Marbach

2 comments

By Daniel Marbach

Recent Posts