Your source of geek knowledge

Another attempt to batch complete with Azure Service Bus

In the last post, we tried to be smart with batch completion and completed every one-hundredth message. Unfortunately, this way of doing completion might not work under with few messages in the queue. We decided to go back to the drawing board and see if we can come up with a better approach which can cope with high but also small loads of messages. The heart of the code we wrote in the last post (omitting the exception handling part) looks like

var lockTokensToComplete = new ConcurrentStack<Guid>();
 
receiveClient.OnMessageAsync(async message =>
{
   // same as before
   await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
   lockTokensToComplete.Push(message.LockToken);
   // same as before
},..)

Instead of completing messages in batches of hundred lock tokens we can try to move out the batch completion into a dedicated batch completion task. Hopefully that is not too hard. Let’s see

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var batchCompletionTask = Task.Run(async () => {
   while(!token.IsCancellationRequested) {
      var lockTokens = new Guid[100];
      int numberOfItems = lockTokensToComplete.TryPopRange(lockTokens)
      if(numberOfItems > 0) {
         await receiveClient.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
      }
      await Task.Delay(TimeSpan.FromSeconds(5), token).ConfigureAwait(false);
   }
});

We schedule a task to be run on the worker thread pool to offload the actual completion loop away from the thread that is starting the task. We loop until the cancellation is requested on the cancellation token source which is linked to the token passed as closure into the completion loop. We then try top pop a range of lock tokens from the ConcurrentStack. If we received more than zero items, we would complete the lock tokens we popped on the receiveClient. At the end of the loop, we asynchronously sleep until either we shut down or five seconds are over.

This is beautiful and straightforward. We have a dedicated completion circuit, the contention on the concurrent stack is in a controllable range. Under small load, we complete lock tokens in in batches of one to maximum one hundred tokens. If we receive only a limited number of messages, the loop might complete messages with their tokens one by one (for example when we receive a message every 6 seconds). But what happens when the load increases?

When we’d received several hundred messages per seconds our randomly chosen “complete every one-hundredth messages” and then “sleep for five seconds” might turn out to be a suboptimal choice. We will dive more into this topic in upcoming posts. Could we try to optimize a few things in this simple loop? Feel free to exercise your brain or just wait for the next post 😉

 

Async method without cancellation support, oh my!

Sometimes you have to interact with asynchronous APIs that you don’t control. Those APIs might be executing for a very long time but have no way to cancel the request. Dennis Doomen was exactly in such a situation while building his opinionated Event Sourcing projections library for .NET called LiquidProjections.

The library is using NEventStore under the covers. The actual access to the NEventStore is adapted in a class called NEventStoreAdapter. The adapter takes care of loading pages from the store from a given checkpoint onwards. This asynchronous method is running in potentially indefinitely because of the underlying NEventStore EventStore client. Let’s look at code:

await eventStoreClient.GetPageAsync(checkpoint).ConfigureAwait(false);

So in his case, the pretty simple sample code above could execute forever, but it cannot be canceled since there is no overload available which accepts a CancellationToken. Dennis wrote the following extension to be able to write an asynchronous execution which is cancelable:

public static class TaskExtensions {
   public static async Task<TResult> WithWaitCancellation<TResult>(this Task<TResult> task,
              CancellationToken cancellationToken) {
   Task completedTask = await Task.WhenAny(task, Task.Delay(Timeout.Infinite, cancellationToken));		             
   if (completedTask == task) {		
      return await task;		
   }		
   else {		             
      cancellationToken.ThrowIfCancellationRequested();		
      throw new InvalidOperationException("Infinite delay task completed.");
   }
}

and here the usage of this extension method

await eventStoreClient.GetPageAsync(checkpoint).WithWaitCancellation(cancelationToken).ConfigureAwait(false);

The idea of the code was the following:

We package the actual task to be executed together with a Task.Delay(Timeout.Infinite) into a Task.WhenAny. When the actual task completes, it is returned from WhenAny, and we await the outcome of the task*. If the completed task is not the actual task, then we ran into the case where the Task.Delay was canceled because the cancellation token passed into that task was canceled. In this case, it is sufficient to throw the OperationCanceledException with ThrowIfCancellationRequested.

That is a nifty idea and a quite clever solution. Unfortunately, Dennis found out that this extension method produced a nasty memory leak when running for a longer period. When the actual task completes the task representing the Task.Delay operation will still be executing and therefore leak memory. Dennis came up with a nice solution with linked cancellation token sources.

His tweet

Note to self: always await or cancel a call to Task.Delay if one wants to avoid memory leaks…

caught my attention and I started working on a more robust and scalable extension method that doesn’t require Task.Delay. I will discuss the extension method I came up with in the next post. Thanks Dennis for this nice asynchronous challenge!

* Remember await a task makes the task result being materialized. If there is a result it is returned, if the task is faulted the exception is unpacked and rethrown.

Let’s try batch completion of messages on Azure Service Bus

In the last post, we deferred the message completion task into the background to remove the additional latency from the message receive callback. This time we want to save even more latency by batching multiple message completions together into a single Azure Service Bus broker call. It turns out this is possible with the Azure SDK.

The MessageReceiver has a method called CompleteBatchAsync which accepts an IEnumerable<Guid> called lockTokens. But where do we get these lock tokens from? It turns out the BrokeredMessage type that is passed into the OnMessageAsync callback contains a GUID property called LockToken. The lock token is a GUID generated by the service bus which uniquely identifies a message on the broker. So if we managed to track the lock tokens for the messages, we receive we can at a later stage execute CompleteBatchAsync. Let’s see how this would change our message receive logic.

var lockTokensToComplete = new ConcurrentStack<Guid>();
 
receiveClient.OnMessageAsync(async message =>
{
   try {
      await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
      lockTokensToComplete.Push(message.LockToken);
   catch(Exception) {
      // in case of an exception make it available again immediately
      await message.AbandonAsync().ConfigureAwait(false);
   }
},..)

With this small change, we would be pushing lock tokens of messages to be completed into a ConcurrentStack. Only if a message fails we’d be directly abandoning the message asynchronously inside the message receive callback. The assumption / we are making here is that the contention on the ConcurrentStack will be far less than the latency on the remote call to complete a message. But at some point, we have to complete the messages.Can we do it directly in the callback like illustrated with the following code?


receiveClient.OnMessageAsync(async message =>
{
   // same as before
   var lockTokens = new Guid[100];
   if(lockTokensToComplete.Count > 100 && lockTokensToComplete.TryPopRange(lockTokens) > 0) {
      await receiveClient.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
   }
   // same as before
},..)

We’ve chosen a concurrent stack here in order to be able to do range pops of lock tokens. This might seem a bit strange since we’d we completing messages with the lock token in another order than we received it. Normally that is not a big deal since in concurrency scenarios we should not make any assumptions about ordering.

With a lot of messages in the queue, and when the queue constantly has a particular load of messages coming in, this code would improve the latency. We would only do a remote call every hundred messages. So we theoretically save 99% of the latency we’d have when completing every message. What a success! But wait, there is a problem in this code: What if we only receive one message every ten seconds?

Assuming a peek lock duration of roughly thirty seconds, we would almost always run into the expiration of the peek lock duration and therefore the messages would be processed over and over again until they’d get dead lettered by the Azure Service Bus. That’s some bad hat harry!

So how can we make this trouble maker better? Well, I think you know it already: That’s a topic for another post.

Defer message completion on Azure Service Bus

In the last post, we started sketching code around the idea of deferring the actual message completion out of the message receive callback. A simple way to achieve our goals would be to kick off the completion task without await it like the following code shows

receiveClient.OnMessageAsync(async message =>
{
   try {
      await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
      message.CompleteAsync();
   catch(Exception) {
      // in case of an exception make it available again immediately
      await message.AbandonAsync().ConfigureAwait(false);
   }
},..)

This is inelegant in multiple ways. Since the code is contained in a delegate which is marked as async, the compiler would warn us with CS4014 that the call is not awaited. Of course, we could work around that by suppressing the compiler warning or by writing an empty extension method which “consumes the task returned” *1. With this approach, the message completion would become a fire & forget operation.

The fire & forget nature of this approach imposes yet another problem. We don’t know what the outcome of the operation is. The operation might fail for various reasons like when there is a connection interruption to the broker or when the client is shutting down, and the receiver is closed. Furthermore, when we’d like to shut down in a clean way, we could have up to 100 concurrent message completions (remember MaxConcurrentCall was set to 100) which we might want to await to complete or fail. We could track the tasks somehow like the following:

var runningTasks = new ConcurrentDictionary<Task, Task>();

receiveClient.OnMessageAsync(async message =>
{
   try {
      await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
      var completionTask = message.CompleteAsync();
      runningTasks.TryAdd(completionTask, completionTask);
      completionTask.ContinueWith(t => { 
         Task toBeRemoved;
         runningTasks.TryRemove(t, out toBeRemoved);
      }, TaskContinuationOptions.ExecuteSynchronously)).Ignore();
   catch(Exception) {
      // in case of an exception make it available again immediately
      await message.AbandonAsync().ConfigureAwait(false);
   }
},..)

In the above sample code, we use a ConcurrentDictionary to track the tasks. The trick here is that we rely on the Task object having a proper object identity. We use a dictionary since we want to remove a specific Task which completed. *2 We add the completion task to the runningTasks dictionary. After that, we schedule a continuation which removes the completed task from the dictionary for a proper housekeeping. The continuation is scheduled to be executed synchronously for efficiency reasons.

When we shutdown we first close the receive client to no longer pump messages to us and then we await all pending runningTasks until they completed or failed. *3

public async Task Stop() {
   await receiveClient.CloseAsync().ConfigureAwait(false);
   await Task.WhenAll(runningTasks.Values).ConfigureAwait(false);
   runningTasks.Clear();
}

By using the fire & forget approach we were able to defer the completion of the message into the background and out of the receive loop. We were able to achieve our goals to remove the latency of the completion call from the actual message receive. We are still completing messages one by one which means each completion is still a remote call. Can we do even better? Yes we can by leveraging batch completion capabilities of the Azure SDK. But that is a topic for the next post!
*1

public static class TaskExtensions {
    public static void Ignore(this Task task) { // intentionally ignored }
}

*2 Since we know the maximum number of concurrent requests we could also preallocate an array of Tasks, keep track of them with an index which is incremented in a thread safe way and kept inside the boundaries of the array as well as passing that current index to the delegate which is scheduled as a continuation of the completion task (exercise left to the readers of this post 😉 ).

*3 This solution would indefinitely hang when one of the running tasks hangs, feel free to improve the code (again an exercise left to the readers 😉 ).

Latency to Azure Service Bus matters

In the last post, we optimized the throughput on the client by increasing the concurrency of the message receive loop. What else could we try to improve? Let’s have a closer look at the lambda body of the OnMessageAsync declaration.

receiveClient.OnMessageAsync(async message =>
{
   // do something with the message
   await message.CompleteAsync().ConfigureAwait(false);
},..)

As we can see, we have technically two operations inside the callback delegate. The first operation is the code we want to execute when a message is pushed to us (here represented with comment). The second operation is the completion or abandon of the message we received depending on the outcome of the business logic.

For the business logic, there is an important aspect we need to take into consideration. If the business logic happens to call into I/O bound resources the whole call stack of the business logic on the I/o bound path needs to be asynchronous and return a Task or Task<T>.

receiveClient.OnMessageAsync(async message =>
{
   await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
   await message.CompleteAsync().ConfigureAwait(false);
},..)

This makes the callback delegate fully asynchronous and therefore allows to more efficiently use the resources provided by the Azure SDK since we are never blocking on the I/O bound path. Naturally, we would also make sure that the implementation of DoSomethingWithTheMessageAsync is as efficient as possible (for example following best-practices of async ADO.NET calls when we call into a SQL Azure Database and more).

Could we save more? How about the second operation? Can we just leave it out? Not really, since if we would leave it out the message would not be completed and therefore always reappear after the PeekLock duration is over. So we’d be processing the same Thanksgiving order over and over again. So at some point in time, we need to complete the message. But what if we could complete it at a later stage? It turns out we can by making a few trade-offs:

receiveClient.OnMessageAsync(async message =>
{
   try {
      await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
      // omit the completion of the message
   catch(Exception) {
      // in case of an exception make it available again immediately
      await message.AbandonAsync().ConfigureAwait(false);
   }
},..)

By assuming exceptions are like the name suggests “exceptional” we abandoned immediately when an exception occurred in our message processing logic to make the message directly accessible again without needing to wait for the PeekLock duration to expire. For the good case, we omit the completion of the BrokeredMessage and delay the completion slightly.

Why is that useful? The CompleteAsync operation on the brokered message calls to the Azure Service Bus. So essentially it is a remote call which can take up a few dozens of milliseconds depending on the latency of the datacenter you’re connecting to. While I’m writing this post, the latency from my computer to Azure is roughly 40-50 ms (usually it averages between 20-30 ms)*. By omitting the complete call, we save per message receive 40 ms in my example. Saving 40 ms per message with a load of 120K messages means we save 4.8 Million Milliseconds (4800 seconds) on the entire batch of messages.

By not hammering Azure Service Bus for each message received we gain more throughput because the overall execution time of the actual message received is reduced. So instead of bobby car speed we are probably now at rusty old car speed.

But hey, I’m totally cheating now! Since at some point in time, we need to complete the messages not to let them reappear. We also should make sure we complete the messages in the time of a PeekLock duration of a received message. But that is a topic for another post!

* Of course by placing your application into the data center the latency would decrease. But there will always be latency. Remember the speed of light is a constant ;)?

Let’s save our Thanksgiving sales with Azure Service Bus

As we saw in the last post, naive receiving of messages with Azure Service Bus destroys our Thanksgiving sales. But how can we do better to satisfy our bosses and first and foremost our precious customers? As Sean Feldman rightfully pointed out in the comments section, our sample was using MaxConcurrentCalls set to one.

How silly! Who wrote that… Oh, it was me 😉

So how about we correct my mistake and set to 100 to like the following

receiveClient.OnMessageAsync(async message => {...},
new OnMessageOptions { AutoComplete = false, MaxConcurrentCalls = 100 })

With this tiny change, we can suddenly receive 100 messages concurrently as well as complete and abandon (if necessary) them concurrently. Let’s do the math again. Our 120 K orders we received ca now be processed in 20 minutes instead of 33 hours. On a lucky Thanksgiving rush, we’d be able to process the 1.2 million orders we received in about 200 minutes or 3 hours and 20 minutes. That is a significant improvement for such a tiny change!

The hard part is to pick a decent concurrency number. From an Azure SDK perspective setting the MaxConcurrentCalls to a higher number doesn’t necessarily mean the SDK will use a significant amount of more resources like Threads. Internally the SDK is almost completely asynchronous using the old APM style. The concurrency is limited by using an asynchronous semaphore* which is acquired before each asynchronous receive and released as soon as the callback delegate is executed (that is a bit of a simplification but let’s stick to that analogy). So potentially we could set it to a much higher number.

But it depends on what we are executing inside the callback delegate. For example, if we’d be executing operations against a database like SQL Azure for argument’s sake we’d be needing dedicated connections for each concurrent access to the database. SqlConnections are pooled, and by default the pool, size is set to 100. So if we’d set the concurrency to a higher number, we would eventually be exhausting the connection pool, and the throughput would suffer significantly. So let’s stick for now with 100 for MaxConcurrentCalls.

We went from Snail orders to orders that are processed at least as fast as my son can drive his bobby car. So I think your Thanksgiving sales is almost saved and we can enjoy our turkey. Or can we do even better? Of course, we can! But that is a topic for another post.

 

* A semaphore is like a nightclub: it has a certain capacity, enforced by a bouncer. Once it’s full, no more people can enter, and a queue builds up outside. Then, for each person that leaves, one person enters from the head of the queue. […] Albahari

Don’t spoil your Thanksgiving sales with naive Azure Service Bus client code

Edit: The initial title was: “Naive receiving of messages on Azure Service Bus destroys Thanksgiving sales” I changed it because I don’t want to imply that Azure Service Bus as a service is not behaving correctly. Sorry if this cause any inconvenience. 

The Azure Service Bus SDK provides a lot of very smart functionality to build robust libraries, frameworks and application code on top of Azure Service Bus. In the most simplistic scenario, you write code like

var receiveClient = QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.PeekLock);
receiveClient.OnMessageAsync(async message =>
{
   // do something with the message
   await message.CompleteAsync().ConfigureAwait(false);
},
new OnMessageOptions { AutoComplete = false, MaxConcurrentCalls = 1 })

This code instructs the SDK to push messages into your code passed into OnMessagesAsync as soon as they are available with a maximum concurrency of one, meaning that your delegate will be executed at maximum once concurrently. So if more messages are available in the queue, the OnMessageAsync delegate will only be called when a previous push of a message is over. Since AutoComplete is set to false it is the responsibility of the delegate implementation to call CompleteAsync (completes the messages) or AbandonAsync (puts the message back into the queue) on the message. The code sample shown here uses the PeekLock mode. In the PeekLock mode, the client sends a request to the server to receive a message and sends another request to complete the message. During the lock duration (can be configured) the message is not visible to other clients connected to the same queue. It might become visible again if a client fails to complete or abandon the message in the given PeekLock duration. For our simplistic sample above we already need to know quite a bit. Assuming the delegate would take one second to execute in our code above we could handle a message every second only. This means we have a throughput of one message per second (1 msg/s).

Imagine the receive client would consume our Thanksgiving special sales order queue and our customers would send in 2000 orders per second (yeah our imaginary products are that amazing 😉 ). In one minute of our special Thanksgiving sale, we would receive 120000 orders, but we would be able to consume 60 orders only. To catch up with those 120K orders, it would require us roughly 33 hours to catch up with the orders we received in one minute! Now imagine we’d have a lucky Thanksgiving rush and actually sell products at that rate for ten minutes. We would need almost 24 days to process these orders. I’m sure this code wouldn’t please our bosses and certainly not our imaginary customers!

With our snail orders we’d probably loose all of them and get really bad reviews online. We can do better for sure but that is a topic of the next post.

Context Matters

Async/await makes asynchronous code much easier to write because it hides away a lot of the details. Many of these details are captured in the SynchronizationContext which may change the behavior of your async code entirely depending on the environment where you’re executing your code (e.g. WPF, Winforms, Console, or ASP.NET). By ignoring the influence of the SynchronizationContext you may run into deadlocks and race conditions.

The SynchronizationContext controls how and where task continuations are scheduled and there are many different contexts available. If you’re writing a WPF application, building a website, or an API using ASP.NET you’re already using a special SynchronizationContext which you should be aware of.

SynchronizationContext in a console application

To make this less abstract, let’s have a look at some code from a console application:

public class ConsoleApplication
{
    public static void Main()
    {
        Console.WriteLine($"{DateTime.Now.ToString("T")} - Starting");
        var t1 = ExecuteAsync(() => Library.BlockingOperation());
        var t2 = ExecuteAsync(() => Library.BlockingOperation()));
        var t3 = ExecuteAsync(() => Library.BlockingOperation()));

        Task.WaitAll(t1, t2, t3);
        Console.WriteLine($"{DateTime.Now.ToString("T")} - Finished");
        Console.ReadKey();
    }

    private static async Task ExecuteAsync(Action action)
    {
        // Execute the continuation asynchronously
        await Task.Yield();  // The current thread returns immediately to the caller
                             // of this method and the rest of the code in this method
                             // will be executed asynchronously

        action();

        Console.WriteLine($"{DateTime.Now.ToString("T")} - Completed task on thread {Thread.CurrentThread.ManagedThreadId}");
    }
}

Where Library.BlockingOperation() may be a third party library we’re using that blocks the thread. It can be any blocking operation but for testing purposes, you can use Thread.Sleep(2) as an implementation.

When we run the application, the output looks like this:

16:39:15 - Starting
16:39:17 - Completed task on thread 11
16:39:17 - Completed task on thread 10
16:39:17 - Completed task on thread 9
16:39:17 - Finished

In the sample, we create three tasks that block the thread for some period of time. Task.Yield forces a method to be asynchronous by scheduling everything after this statement (called the _continuation_) for execution but immediately returning control to the caller. As you can see from the output, due to Task.Yield all the operations ended up being executed in parallel resulting in a total execution time of just two seconds.

Now let’s port this code over to ASP.NET.

Continue reading

Flexible work days as a remote worker

I enjoy remote working! I no longer have to commute, and I can freely adjust my work day schedule as I see fit. For example, the warmer it gets during summer time I can just start earlier in the morning and finish earlier in the afternoon. If I have a doctor appointment, I’m free to schedule it anytime during the day. Going to the gym? No problem! I just plan my gym hours whenever they fit into the day so bypassing the rush hours. That’s great! But flexibility requires responsibility. You have to make sure other things don’t distract you from getting your work done. Or even worse that you don’t end up scheduling many other unrelated activities that you start working later and later, therefore delaying, moving or shortening your regular sleep hours. What helps for me is constant self-reflection, good workmates who call you out when you “just sneak in”, my wife who is my best voice-of-reason, and controversially a regular(-ish) schedule that helps predict and plan my work week.

Sounds weird, isn’t it? I have all the freedom I want, and I end up working roughly according to a fixed plan. Well, I still have more freedom than before since the self-imposed agenda is more a guideline than a schedule.

What challenges do you face with your flexible work-hours?