Home / Posts tagged "Async"

MultiProducerConcurrentConsumer – Preallocate and reuse

In the last post, I introduced the MultiProducerConcurrentConsumer component. In this post, I’m going to walk through the constructor of the component and explain the preallocation trade-off that was made.

With almost every code that is executed on the hot path of your library, framework or system you need to make tradeoffs regarding allocations. In the end, it boils down to roughly two choices:

  • Allocate as much as you need upfront
  • Allocate on-demand when you need it

The benefit of only allocating when needed is that the memory consumption only grows when it is needed. The downside of this approach is that under highly concurrent scenarios allocating structures in a safe and lock-free way can be tricky.

The drawback of allocating upfront is that even though your application might never need it, the memory is preallocated and will stay that way. On the other hand, if we allocate the potentially needed structures upfront we never need to allocate during concurrent execution. Unfortunately preallocating upfront only works when you know the boundary conditions of the problem at hand.

With the MPCC structure, we can make a few assumptions which allow us to pre-allocate internally needed structures.

public MultiProducerConcurrentCompletion(
    int batchSize, TimeSpan pushInterval, int maxConcurrency, int numberOfSlots)
    this.maxConcurrency = maxConcurrency;
    this.pushInterval = pushInterval;
    this.batchSize = batchSize;
    this.numberOfSlots = numberOfSlots;

    queues = new ConcurrentQueue<TItem>[numberOfSlots];
    for (var i = 0; i < numberOfSlots; i++)
        queues[i] = new ConcurrentQueue<TItem>();
    var maxNumberOfConcurrentOperationsPossible = numberOfSlots*maxConcurrency;
    pushTasks = new List<Task>(maxNumberOfConcurrentOperationsPossible);

    itemListBuffer = new ConcurrentQueue<List<TItem>>();
    for (var i = 0; i < maxNumberOfConcurrentOperationsPossible; i++)
        itemListBuffer.Enqueue(new List<TItem>(batchSize));

Since we know the number of slots up front and we defined the component needs FIFO semantics we can use an array of ConcurrentQueue<TItem> (Line 9) and create all the queues for all the slots ahead of time (Line 10-13).

The maximum concurrency is defined per slot. So we can calculate the maximum number of concurrent operations that are possible at any given time to be number of slots multiplied with maximum concurrency (Line 15). As soon as we calculated maxNumberOfConcurrentOperationsPossible we can allocate a List<Task> which is going to hold concurrently happening operations for graceful shutdown purposes (Line 16).

Last but not least we make the following tradeoff. During runtime, the component will get many items pushed to it and complete items in batches. If we would allocate a List<TItem> everytime the pump callback is called, we’d be creating a lot of Gen0 Garbage. Namely maxNumberOfConcurrentOperationsPossible List<TItem> at maximum per push interval. Depending on the push interval settings this could drastically impact the performance. That’s why the MPCC creates a ConcurrentQueue<List<TItem>> and enqueues List<TItem>(batchSize) into it(Line 18-22). This queue will serve as a List<TItem> buffer. Effectively we are reusing the lists during the whole lifetime of the component. You might have noticed that we initialize the list with the batch size that is also known upfront to prevent unnecessary growing of the lists.

Hope you like this so far. Stay tuned for the next post about MPCC.

Introduction to the MultiProducerConcurrentConsumer for Azure Service Bus message completion

In the last post in the series about Azure Service Bus message completion, I briefly talked how the message batch completion could benefit from a more reactive approach. Before I dive into the inner workings of the reactive structure that I’m going to outline in the next few posts, I need to briefly recap the functional and non-functional requirements we need from the component.

The component shall

  • Make sure messages are only completed on the receiver they came from
  • Reduce the number of threads used when the number of clients is increased
  • Autoscale up under heavy load
  • Scale down under light load
  • Minimise the contention on the underlying collections used
  • Be completely asynchronous
  • Implements a push based model from the producer and consumer perspective
  • Respect the maximum batch sized defined by the client of the component or a predefined push interval
  • Provide FIFO semantics instead of LIFO
  • Be as lock-free as possible

From a design viewpoint the MultiProducerConcurrentConsumer (MPCC) component will look like

The MPCC manages internally N number of slots that can contain items of type TItem. Whenever the timeout or the batch size is reached the component fans out up to a provided concurrency per slot the completion callback.

class MultiProducerConcurrentConsumer<TItem> {
    public MultiProducerConcurrentConsumer(int batchSize, TimeSpan pushInterval, 
int maxConcurrency, int numberOfSlots) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state) { }

    public void Push(TItem item, int slotNumber) { }

    public async Task Complete(bool drain = true) { }

Shown above is the class outline that we are going to use for the component. The constructor allows specifying the batch size, the push interval, the number of slots and the concurrency per slot. The component has a method to push items to it called Push. As soon as the component is started the provided pump delegate will be called whenever the batch size is reached or the push interval. The concurrency per slot is probably best explained by giving an example.

Let’s assume we have a batch size of 500 and a push interval of 2 seconds. When producers can produce 1500 items per second the batch size is reached three times per second. MPCC will auto-scale in that case up to the provided concurrency. So in the above example, it will take out 500 items and invoke the callback and then concurrently try to fetch another 500 items and again another 500 items until the slot is empty or the maximum concurrency is reached.

The Complete method is available for the deterministic stopping of the MPCC. It allows to asynchronously wait for all slots to be drained (by default) or immediately stop and throw away all items that haven’t been processed.

In the next post, we are going to depict the inner workings of the MultiProducerConcurrentConsumer, hope you’ll enjoy.

Async method without cancellation support, do it my way.

In the last post, I talked about Dennis Doomen’s LiquidProjects project and the challenge they faced with asynchronous APIs that were not cancelable. Dennis came up with the following solution to the infinitely running Task.Delay operations.

public static class TaskExtensions {
    public static async Task<TResult> WithWaitCancellation<TResult>(this Task<TResult> task,
            CancellationToken cancellationToken) {
        using (var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))            
            var delayTask = Task.Delay(Timeout.Infinite, combined.Token);
            Task completedTask = await Task.WhenAny(task, delayTask);
            if (completedTask == task)
                return await task;
                throw new InvalidOperationException("Infinite delay task completed.");

The code above creates a linked token source, an infinitely delayed task which observes the token referenced by the linked token source. When the outer token source cancels the linked token source will also cancel the token owned by it. When the task returned by Task.WhenAny is the actual work task then the linked token source is canceled. This will automatically cancel the delayed task. The benefit of this implementation is its simplicity. The runtime performance is good but we can do better especially when we’d be using the extension method over and over again we can do a few optimizations. Here is a possible approach

public static Task<TResult> WaitWithCancellation<TResult>(this Task<TResult> task, CancellationToken token = default(CancellationToken))
    var tcs = new TaskCompletionSource<TResult>();
    var registration = token.Register(s => {
        var source = (TaskCompletionSource<TResult>) s;
    }, tcs);

    task.ContinueWith((t, s) => {
        var tcsAndRegistration = (Tuple<TaskCompletionSource<TResult>, CancellationTokenRegistration>) s;

        if (t.IsFaulted && t.Exception!= null) {

        if (t.IsCanceled) {

        if (t.IsCompleted) {

    }, Tuple.Create(tcs, registration), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

    return tcs.Task;

Instead of using a delayed task with an infinite time span we use a TaskCompletionSource. The TaskCompletionSource represents a task which is only completed, canceled or faulted when we instruct the source accordingly. So essentially this is an infinitely running task. On the token passed in we register a registration which will get triggered when the token gets canceled. When the token gets canceled, we set the completion source to canceled as well. On the actual task, we register a continuation which sets the state of the task completion source according to the state of the antecedent task (which is the actual worker task). Inside the continuation, we also dispose the token registration not to leak memory. The continuation is scheduled with a runtime hint ExecuteSynchronously to tell the TPL runtime that the work running inside the continuation is short lived and can try to be executed while completing the antecedent task.

To avoid closure allocations we use for the token registration and the continuation the overloads which allow us to pass in a state object. Where we need to pass in multiple things into the delegate we use a tuple to represent those items. Furthermore, we use the TrySet methods to set the task completion source in a safe manner into its final state to avoid raising exceptions that could occur due to races.

Let’s see what the runtime differences of both code snippets are:

You can try it yourself with BenchmarkDotNet, and the tests found on my MicroBenchmark repo. So the second snippet is more complex to understand but it is overall faster and slightly better because it allocates less garbage and omits the asynchronous state machine generation.

Please don’t get me wrong this is slightly esoteric. Depending on your application or system you are building, I would usually go for Dennis’ approach because it is simpler to understand and good enough. If you need a bit more performance and want to save allocations the second approach written by me might come in handy.

Thanks again Dennis for this cool asynchronous challenge.

Async method without cancellation support, oh my!

Sometimes you have to interact with asynchronous APIs that you don’t control. Those APIs might be executing for a very long time but have no way to cancel the request. Dennis Doomen was exactly in such a situation while building his opinionated Event Sourcing projections library for .NET called LiquidProjections.

The library is using NEventStore under the covers. The actual access to the NEventStore is adapted in a class called NEventStoreAdapter. The adapter takes care of loading pages from the store from a given checkpoint onwards. This asynchronous method is running in potentially indefinitely because of the underlying NEventStore EventStore client. Let’s look at code:

await eventStoreClient.GetPageAsync(checkpoint).ConfigureAwait(false);

So in his case, the pretty simple sample code above could execute forever, but it cannot be canceled since there is no overload available which accepts a CancellationToken. Dennis wrote the following extension to be able to write an asynchronous execution which is cancelable:

public static class TaskExtensions {
   public static async Task<TResult> WithWaitCancellation<TResult>(this Task<TResult> task,
              CancellationToken cancellationToken) {
   Task completedTask = await Task.WhenAny(task, Task.Delay(Timeout.Infinite, cancellationToken));		             
   if (completedTask == task) {		
      return await task;		
   else {		             
      throw new InvalidOperationException("Infinite delay task completed.");

and here the usage of this extension method

await eventStoreClient.GetPageAsync(checkpoint).WithWaitCancellation(cancelationToken).ConfigureAwait(false);

The idea of the code was the following:

We package the actual task to be executed together with a Task.Delay(Timeout.Infinite) into a Task.WhenAny. When the actual task completes, it is returned from WhenAny, and we await the outcome of the task*. If the completed task is not the actual task, then we ran into the case where the Task.Delay was canceled because the cancellation token passed into that task was canceled. In this case, it is sufficient to throw the OperationCanceledException with ThrowIfCancellationRequested.

That is a nifty idea and a quite clever solution. Unfortunately, Dennis found out that this extension method produced a nasty memory leak when running for a longer period. When the actual task completes the task representing the Task.Delay operation will still be executing and therefore leak memory. Dennis came up with a nice solution with linked cancellation token sources.

His tweet

Note to self: always await or cancel a call to Task.Delay if one wants to avoid memory leaks…

caught my attention and I started working on a more robust and scalable extension method that doesn’t require Task.Delay. I will discuss the extension method I came up with in the next post. Thanks Dennis for this nice asynchronous challenge!

* Remember await a task makes the task result being materialized. If there is a result it is returned, if the task is faulted the exception is unpacked and rethrown.

Context Matters

Async/await makes asynchronous code much easier to write because it hides away a lot of the details. Many of these details are captured in the SynchronizationContext which may change the behavior of your async code entirely depending on the environment where you’re executing your code (e.g. WPF, Winforms, Console, or ASP.NET). By ignoring the influence of the SynchronizationContext you may run into deadlocks and race conditions.

The SynchronizationContext controls how and where task continuations are scheduled and there are many different contexts available. If you’re writing a WPF application, building a website, or an API using ASP.NET you’re already using a special SynchronizationContext which you should be aware of.

SynchronizationContext in a console application

To make this less abstract, let’s have a look at some code from a console application:

public class ConsoleApplication
    public static void Main()
        Console.WriteLine($"{DateTime.Now.ToString("T")} - Starting");
        var t1 = ExecuteAsync(() => Library.BlockingOperation());
        var t2 = ExecuteAsync(() => Library.BlockingOperation()));
        var t3 = ExecuteAsync(() => Library.BlockingOperation()));

        Task.WaitAll(t1, t2, t3);
        Console.WriteLine($"{DateTime.Now.ToString("T")} - Finished");

    private static async Task ExecuteAsync(Action action)
        // Execute the continuation asynchronously
        await Task.Yield();  // The current thread returns immediately to the caller
                             // of this method and the rest of the code in this method
                             // will be executed asynchronously


        Console.WriteLine($"{DateTime.Now.ToString("T")} - Completed task on thread {Thread.CurrentThread.ManagedThreadId}");

Where Library.BlockingOperation() may be a third party library we’re using that blocks the thread. It can be any blocking operation but for testing purposes, you can use Thread.Sleep(2) as an implementation.

When we run the application, the output looks like this:

16:39:15 - Starting
16:39:17 - Completed task on thread 11
16:39:17 - Completed task on thread 10
16:39:17 - Completed task on thread 9
16:39:17 - Finished

In the sample, we create three tasks that block the thread for some period of time. Task.Yield forces a method to be asynchronous by scheduling everything after this statement (called the _continuation_) for execution but immediately returning control to the caller. As you can see from the output, due to Task.Yield all the operations ended up being executed in parallel resulting in a total execution time of just two seconds.

Now let’s port this code over to ASP.NET.

Continue reading

Avoid ThreadStatic, ThreadLocal and AsyncLocal. Float the state instead!

In the article on The dangers of ThreadLocal I explained how the introduction of async/await forces us to unlearn what we perceived to be true in the “old world” when Threads dominated our software lingo. We’re now at a point where we need to re-evaluate how we approach thread safety in our codebases while using the async/await constructs. In today’s post-thread era, we should strive to remove all thread (task-local) state and let the state float into the code which needs it. Let’s take a look at how we can implement the idea of floating state and make our code robust and ready for concurrency. Continue reading

The dangers of ThreadLocal

Languages and frameworks evolve. We as developers have to learn new things constantly and unlearn already-learned knowledge. Speaking for myself, unlearning is the most difficult part of continuous learning. When I first came into contact with multi-threaded applications in .NET, I stumbled over the ThreadStatic attribute. I made a mental note that this attribute is particularly helpful when you have static fields that should not be shared between threads. At the time that the .NET Framework 4.0 was released, I discovered the ThreadLocal class and how it does a better job assigning default values to thread-specific data. So I unlearned the ThreadStaticAttribute, favoring instead ThreadLocal<T>.

Fast forward to some time later, when I started digging into async/await. I fell victim to a belief that thread-specific data still worked. So I was wrong, again, and had to unlearn, again! If only I had known about AsyncLocal earlier.

Let’s learn and unlearn together!

If you want to learn more about async/await and why you shouldn’t use ThreadStatic or ThreadLocal with async/await, I suggest you read the full blog post originally posted on the particular blog.

TransactionScope and Async/Await. Be one with the flow!

I’m doing a series of async/await related blog post on the particular blog. This one might also be interesting for you.

You might not know this, but the 4.5.0 version of the .NET Framework contains a serious bug regarding System.Transactions.TransactionScope and how it behaves with  async/await. Because of this bug, a TransactionScope can’t flow through into your asynchronous continuations. This potentially changes the threading context of the transaction, causing exceptions to be thrown when the transaction scope is disposed.

This is a big problem, as it makes writing asynchronous code involving transactions extremely error-prone.

The good news is that as part of the .NET Framework 4.5.1, Microsoft released the fix for that “asynchronous continuation” bug. The thing is that developers like us now need to explicitly opt-in to get this new behavior. Let’s take a look at how to do just that.


  • If you are using TransactionScope and async/await together, you should really upgrade to .NET 4.5.1 right away.
  • A TransactionScope wrapping asynchronous code needs to specifyTransactionScopeAsyncFlowOption.Enabled in its constructor.

If you want to learn more about async/await and how to flow TransactionScopes over continuations, I suggest you read the full blog post originally posted on the particular blog.

Async/Await: It’s time

I wanted to briefly mention a blog post which I wrote on the particular blog. It is a product centric announcement around the Particular Platform about the move towards an async-only API. Although it is product centric I believe this post contains a lot of valuable information around async/await and its benefits and caveats. In my biased opinion, it is definitely worth a read.

Async/Await is a language feature introduced in C# 5.0 together with Visual Studio 2012 and the .NET 4.5 runtime. With Visual Studio 2015 almost ready to be shipped to end-users, we can see that async/await has been around for quite some time now. Yet NServiceBus hasn’t been exposing asynchronous APIs to its users. Why the await?

We have been carefully observing the adoption of async/await in the market and weighing its benefits against its perceived complexity. Over the years async/await has matured and proven its worth. Now the time has come to discuss our plans regarding the async/await adoption in NServiceBus and our platform.


  • Future versions of the NServiceBus API will be async only.
  • Synchronous versions of the API will be removed in future versions.
  • Microsoft and the community have made this decision for you, by moving toward making all APIs which involve IO operations async-only.
  • Don’t panic! We will help you through this difficult time.

You’re going to be blown away by the additional benefits that asynchronous NServiceBus APIs will provide. But first, let’s review why asynchronous APIs are such a big deal in the first place.

If you want to learn more about async/await, I suggest you read the full blog post originally posted on the particular blog.