Defer message completion on Azure Service Bus

In the last post, we started sketching code around the idea of deferring the actual message completion out of the message receive callback. A simple way to achieve our goals would be to kick off the completion task without await it like the following code shows

receiveClient.OnMessageAsync(async message =>
{
   try {
      await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
      message.CompleteAsync();
   catch(Exception) {
      // in case of an exception make it available again immediately
      await message.AbandonAsync().ConfigureAwait(false);
   }
},..)

This is inelegant in multiple ways. Since the code is contained in a delegate which is marked as async, the compiler would warn us with CS4014 that the call is not awaited. Of course, we could work around that by suppressing the compiler warning or by writing an empty extension method which “consumes the task returned” *1. With this approach, the message completion would become a fire & forget operation.

The fire & forget nature of this approach imposes yet another problem. We don’t know what the outcome of the operation is. The operation might fail for various reasons like when there is a connection interruption to the broker or when the client is shutting down, and the receiver is closed. Furthermore, when we’d like to shut down in a clean way, we could have up to 100 concurrent message completions (remember MaxConcurrentCall was set to 100) which we might want to await to complete or fail. We could track the tasks somehow like the following:

var runningTasks = new ConcurrentDictionary<Task, Task>();

receiveClient.OnMessageAsync(async message =>
{
   try {
      await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
      var completionTask = message.CompleteAsync();
      runningTasks.TryAdd(completionTask, completionTask);
      completionTask.ContinueWith(t => { 
         Task toBeRemoved;
         runningTasks.TryRemove(t, out toBeRemoved);
      }, TaskContinuationOptions.ExecuteSynchronously)).Ignore();
   catch(Exception) {
      // in case of an exception make it available again immediately
      await message.AbandonAsync().ConfigureAwait(false);
   }
},..)

In the above sample code, we use a ConcurrentDictionary to track the tasks. The trick here is that we rely on the Task object having a proper object identity. We use a dictionary since we want to remove a specific Task which completed. *2 We add the completion task to the runningTasks dictionary. After that, we schedule a continuation which removes the completed task from the dictionary for a proper housekeeping. The continuation is scheduled to be executed synchronously for efficiency reasons.

When we shutdown we first close the receive client to no longer pump messages to us and then we await all pending runningTasks until they completed or failed. *3

public async Task Stop() {
   await receiveClient.CloseAsync().ConfigureAwait(false);
   await Task.WhenAll(runningTasks.Values).ConfigureAwait(false);
   runningTasks.Clear();
}

By using the fire & forget approach we were able to defer the completion of the message into the background and out of the receive loop. We were able to achieve our goals to remove the latency of the completion call from the actual message receive. We are still completing messages one by one which means each completion is still a remote call. Can we do even better? Yes we can by leveraging batch completion capabilities of the Azure SDK. But that is a topic for the next post!
*1

public static class TaskExtensions {
    public static void Ignore(this Task task) { // intentionally ignored }
}

*2 Since we know the maximum number of concurrent requests we could also preallocate an array of Tasks, keep track of them with an index which is incremented in a thread safe way and kept inside the boundaries of the array as well as passing that current index to the delegate which is scheduled as a continuation of the completion task (exercise left to the readers of this post πŸ˜‰ ).

*3Β This solution would indefinitely hang when one of the running tasks hangs, feel free to improve the code (again an exercise left to the readers πŸ˜‰ ).

About the author

Daniel Marbach

3 comments

By Daniel Marbach

Recent Posts