Home / Articles posted by Daniel Marbach

MultiProducerConcurrentConsumer – Push in batches

In the last post, I introduced the push method and the timer loop of the MultiProducerConcurrentConsumer. In this post, I’ll focus on the actual PushInBatches method that gets called by the timer loop.

Task PushInBatches() {
    if (Interlocked.Read(ref numberOfPushedItems) == 0) {
        return TaskEx.Completed;
    }

    for (var i = 0; i < numberOfSlots; i++) { 
       var queue = queues[i]; 

       PushInBatchesUpToConcurrencyPerQueueForAGivenSlot(queue, i, pushTasks); 
    } 

    return Task.WhenAll(pushTasks).ContinueWith((t, s) => {
        var tasks = (List<Task>) s;
        tasks.Clear();
    }, pushTasks, TaskContinuationOptions.ExecuteSynchronously);
}

When the method is entered it first checks whether there is anything to push. Since numberOfPushedItems is accessed by multiple threads it uses Interlocked to read it and compare the returned value. When there is nothing to push a completed task is return. By not using the async keyword here we avoid the state machine generated by the compiler. Bear in mind this trick should not be blindly applied.

Then it loops through all the slots. The queue per slot is taken from the queues array, and PushInBatchesUpToConcurrencyPerQueueForAGivenSlot is called with the queue of the current slot and the pushTasks list. Remember the push task list and the queues array was previously allocated as members of the class for efficient reuse.

At the end of the method, a task that completes is created when all of the tasks contained in pushTasks are completed with Task.WhenAll. A synchronous continuation is scheduled that will make sure the pushTasks list gets cleared at the end of the execution. That continuation task is returned to the caller which is the timer loop. This means the timer loop will only continue when the pushTask list is cleared. So instead of creating a list of tasks for every loop cycle we reuse the task list and clear it after each push cycle which reduces the garbage collection pressure.

Here I used another not so well known trick. Instead of accessing the pushTasks list directly and therefore creating a closure over this, I’m passing the pushTasks list as a state parameter to the ContinueWith method and extract the state inside the body of the continuation. Which save an Action delegate allocation per push cycle.

In the next installment, we’ll look into PushInBatchesUpToConcurrencyPerQueueForAGivenSlot since it is complex enough to warrant a dedicated post.

MultiProducerConcurrentConsumer – Push it

In the last post, I introduced the start procedure for the MultiProducerConcurrentConsumer including a rough outline of the timer loop responsible for pushing items in batches after the push interval is elapsed.

To understand how the batch size affects the push let me first explain the push method. The push method’s responsibility is primarily to insert the item of type TItem into the right slot. To achieve that the passed in slot number is range checked against the available number of slots (of course a lower bound would not hurt either 😉 ). The passed in slot number is used to pick the concurrent queue that represents the slot, and the item is enqueued into the slot.

public void Push(TItem item, int slotNumber)
{
    if (slotNumber >= numberOfSlots)
    {
        throw new ArgumentOutOfRangeException(nameof(slotNumber),
          $"Slot number must be between 0 and {numberOfSlots - 1}.");
    }

    queues[slotNumber].Enqueue(item);

    var incrementedCounter = Interlocked.Increment(ref numberOfPushedItems);

    if (incrementedCounter > batchSize)
    {
        syncEvent.Set();
    }
}

The second responsibility of the push method is to count the number of items that were inserted into the MPCC. In my version of the MPCC, I made a simple trade off. Instead of having a counter per slot I’ve chosen a global counter which counts all the pushed items called numberOfPushedItems. I think it is a reasonable tradeoff and makes the implementation much simpler but still achieves its goal (what do you think?). Since multiple threads could insert data concurrently, I’m using Interlocked.Increment to increment the counter. Whenever the incremented counter is greater than the batch size, the push method signals a synchronization primitive* to unleash the push process. But how does the MPCC know when the batch size is reached?

It does it by observing the synchronization primitive. Let’s expand the previously shown TimerLoop slightly:

async Task TimerLoop()
{
    var token = tokenSource.Token;
    while (!tokenSource.IsCancellationRequested)
    {
        try
        {
            await Task.WhenAny(Task.Delay(pushInterval, token),
               syncEvent.WaitAsync(token)).ConfigureAwait(false);
            await PushInBatches().ConfigureAwait(false);
        }
        catch (Exception)
        {
            // intentionally ignored
        }
    }
}

So instead of only doing a Task.Delay with the push interval we combine the Task.Delay with the syncEvent.WaitAsync into a Task.WhenAny. What it means is the following: Task.WhenAny will return when any of the tasks inside the array passed into the method is completed (successfully, faulted or canceled). So the await will continue when either the push interval task is elapsed (or canceled), or the synchronization primitive was set (or the token canceled). This code has a slight deficiency. Every time we reach the batch size before the push interval has elapsed the task representing the push interval will “leak” until the interval is elapsed. But we will ignore this for now and potentially talk about this in another post.

The code shown so far is now capable of getting items pushed to it into the right slot and when either the push interval or the batch size is reached the component will push items in batches to the pump delegate. In the next post we will cover the actual PushInBatches method.

* I’m using an AsyncAutoResetEvent which I will explain in a later post.

MultiProducerConcurrentConsumer – Start it

In the previous instalment, we discussed the technique of preallocating and reuse to save allocations during runtime. In this post, I will introduce the heart of the MPCC which is the start method and the timer loop outline.

The MPCC can get started at any time after construction. If the component is started, it will start pushing items to the pump delegate whenever the batch size is reached or the push interval. This is how the Start method looks like

public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump)
{
    Start(pump, null);
}

public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state)
{
    if (started)
    {
        throw new InvalidOperationException("Already started");
    }

    tokenSource = new CancellationTokenSource();
    timer = Task.Run(TimerLoop);
    this.pump = pump;
    this.state = state;
    started = true;
}

The MPCC can be started once only. There are two variations for starting, a state based overload and a func which doesn’t provide a state. The state based overload is here to enable scenarios where you want to avoid closure capturing. So you can pass in the state that is required to execute the functionality inside the pump delegate implementation.

Furthermore, the component requires a cancellation token source and a timer loop that executes the interval based pushing of items to the pump. Let’s have a quick look at the timer loop implementation (a few details are left out since we will be exploring those in future posts):

async Task TimerLoop()
{
    var token = tokenSource.Token;
    while (!tokenSource.IsCancellationRequested)
    {
        try
        {
            await Task.Delay(pushInterval, token).ConfigureAwait(false);
            await PushInBatches().ConfigureAwait(false);
        }
        catch (Exception)
        {
            // intentionally ignored
        }
    }
}

The timer loop is scheduled to be executed on the worker thread pool. In its heart, we have a while loop that loops until the token indicates a cancellation was requested. Inside the loop, the component uses a Task.Delay to asynchronously wait until the push interval was reached or the token was cancelled. After the push interval is over the method PushInBatches is called. In this implementation, I choose to ignore any exceptions that might be happening either in the Task.Delay (i.ex. OperationCanceledException was thrown) or exceptions bubbling up from PushInBatches.

Now we have a simple approach to push items in batches based on the push interval defined in the constructor. But this doesn’t address the batch size based pushing of items. We’ll cover this in the next posts.

MultiProducerConcurrentConsumer – Preallocate and reuse

In the last post, I introduced the MultiProducerConcurrentConsumer component. In this post, I’m going to walk through the constructor of the component and explain the preallocation trade-off that was made.

With almost every code that is executed on the hot path of your library, framework or system you need to make tradeoffs regarding allocations. In the end, it boils down to roughly two choices:

  • Allocate as much as you need upfront
  • Allocate on-demand when you need it

The benefit of only allocating when needed is that the memory consumption only grows when it is needed. The downside of this approach is that under highly concurrent scenarios allocating structures in a safe and lock-free way can be tricky.

The drawback of allocating upfront is that even though your application might never need it, the memory is preallocated and will stay that way. On the other hand, if we allocate the potentially needed structures upfront we never need to allocate during concurrent execution. Unfortunately preallocating upfront only works when you know the boundary conditions of the problem at hand.

With the MPCC structure, we can make a few assumptions which allow us to pre-allocate internally needed structures.

public MultiProducerConcurrentCompletion(
    int batchSize, TimeSpan pushInterval, int maxConcurrency, int numberOfSlots)
{
    this.maxConcurrency = maxConcurrency;
    this.pushInterval = pushInterval;
    this.batchSize = batchSize;
    this.numberOfSlots = numberOfSlots;

    queues = new ConcurrentQueue<TItem>[numberOfSlots];
    for (var i = 0; i < numberOfSlots; i++)
    {
        queues[i] = new ConcurrentQueue<TItem>();
    }
    
    var maxNumberOfConcurrentOperationsPossible = numberOfSlots*maxConcurrency;
    pushTasks = new List<Task>(maxNumberOfConcurrentOperationsPossible);

    itemListBuffer = new ConcurrentQueue<List<TItem>>();
    for (var i = 0; i < maxNumberOfConcurrentOperationsPossible; i++)
    {
        itemListBuffer.Enqueue(new List<TItem>(batchSize));
    }
}

Since we know the number of slots up front and we defined the component needs FIFO semantics we can use an array of ConcurrentQueue<TItem> (Line 9) and create all the queues for all the slots ahead of time (Line 10-13).

The maximum concurrency is defined per slot. So we can calculate the maximum number of concurrent operations that are possible at any given time to be number of slots multiplied with maximum concurrency (Line 15). As soon as we calculated maxNumberOfConcurrentOperationsPossible we can allocate a List<Task> which is going to hold concurrently happening operations for graceful shutdown purposes (Line 16).

Last but not least we make the following tradeoff. During runtime, the component will get many items pushed to it and complete items in batches. If we would allocate a List<TItem> everytime the pump callback is called, we’d be creating a lot of Gen0 Garbage. Namely maxNumberOfConcurrentOperationsPossible List<TItem> at maximum per push interval. Depending on the push interval settings this could drastically impact the performance. That’s why the MPCC creates a ConcurrentQueue<List<TItem>> and enqueues List<TItem>(batchSize) into it(Line 18-22). This queue will serve as a List<TItem> buffer. Effectively we are reusing the lists during the whole lifetime of the component. You might have noticed that we initialize the list with the batch size that is also known upfront to prevent unnecessary growing of the lists.

Hope you like this so far. Stay tuned for the next post about MPCC.

Introduction to the MultiProducerConcurrentConsumer for Azure Service Bus message completion

In the last post in the series about Azure Service Bus message completion, I briefly talked how the message batch completion could benefit from a more reactive approach. Before I dive into the inner workings of the reactive structure that I’m going to outline in the next few posts, I need to briefly recap the functional and non-functional requirements we need from the component.

The component shall

  • Make sure messages are only completed on the receiver they came from
  • Reduce the number of threads used when the number of clients is increased
  • Autoscale up under heavy load
  • Scale down under light load
  • Minimise the contention on the underlying collections used
  • Be completely asynchronous
  • Implements a push based model from the producer and consumer perspective
  • Respect the maximum batch sized defined by the client of the component or a predefined push interval
  • Provide FIFO semantics instead of LIFO
  • Be as lock-free as possible

From a design viewpoint the MultiProducerConcurrentConsumer (MPCC) component will look like

The MPCC manages internally N number of slots that can contain items of type TItem. Whenever the timeout or the batch size is reached the component fans out up to a provided concurrency per slot the completion callback.

class MultiProducerConcurrentConsumer<TItem> {
    public MultiProducerConcurrentConsumer(int batchSize, TimeSpan pushInterval, 
int maxConcurrency, int numberOfSlots) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state) { }

    public void Push(TItem item, int slotNumber) { }

    public async Task Complete(bool drain = true) { }
}    

Shown above is the class outline that we are going to use for the component. The constructor allows specifying the batch size, the push interval, the number of slots and the concurrency per slot. The component has a method to push items to it called Push. As soon as the component is started the provided pump delegate will be called whenever the batch size is reached or the push interval. The concurrency per slot is probably best explained by giving an example.

Let’s assume we have a batch size of 500 and a push interval of 2 seconds. When producers can produce 1500 items per second the batch size is reached three times per second. MPCC will auto-scale in that case up to the provided concurrency. So in the above example, it will take out 500 items and invoke the callback and then concurrently try to fetch another 500 items and again another 500 items until the slot is empty or the maximum concurrency is reached.

The Complete method is available for the deterministic stopping of the MPCC. It allows to asynchronously wait for all slots to be drained (by default) or immediately stop and throw away all items that haven’t been processed.

In the next post, we are going to depict the inner workings of the MultiProducerConcurrentConsumer, hope you’ll enjoy.

Complete messages where they came from with Azure Service Bus

In the last post, we made a simplistic attempt to speed up batch completion of messages by having multiple dedicated background completion tasks. Unfortunately, this introduced an interesting side effect.


static async Task BatchCompletionLoop() {
   while(!token.IsCancellationRequested) {
         ...
         await receiveClient.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
         ...
      }
      await Task.Delay(TimeSpan.FromSeconds(5), token).ConfigureAwait(false);
   }
}

The code above is using a single receive client to complete message lock tokens in batches. Those lock token could come from multiple receive clients, but they would end up being completed on the same instance. Depending on the transport type you use it might work or it won’t. When you are using the NetMessaging (also known as SBMP) transport type the above code will just work. If you switch to AMQP as the transport type, the code will blow up with the following exception

Microsoft.ServiceBus.Messaging.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue.
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndCompleteBatch(IAsyncResult result)

For Azure Service Bus v3 and higher AMQP is the new standard protocol and SMBP is an opt-in choice.

The proprietary SBMP protocol that is also supported is being phased out in favor of AMQP. see documentation

What does that mean for our message completion? To have a robust completion code which works for both transport types, we have to make sure we only complete lock tokens on receivers the messages were coming from.

In its simplest form, we would need the following moving pieces. A concurrent stack or queue per message receivers, a generic message handling body which closes over the concurrent stack as an input parameter and similarly a parameterized completion loop.

var lockTokensToComplete = new ConcurrentStack<Guid>[numberOfReceivers];
// initialize the concurrent stacks
  
receiveClient1.OnMessageAsync(message => ReceiveMessage(message, lockTokensToComplete[0]);
...
receiveClientN.OnMessageAsync(message => ReceiveMessage(message, lockTokensToComplete[N-1]);

static async Task ReceiveMessage(BrokeredMessage message, ConcurrentStack<Guid> lockTokensToComplete) {
   await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
   lockTokensToComplete.Push(message.LockToken);
}

and the completion loop

for(int i = 0; i < numberOfReceivers; i++) { 
   completionTasks[i] = Task.Run(() => BatchCompletionLoop(receivers[i], lockTokensToComplete[i]));
}
 
static async Task BatchCompletionLoop(MessageReceiver receiver, ConcurrentStack<Guid> lockTokensToComplete) {
   while(!token.IsCancellationRequested) {
    var lockTokens = new Guid[100];
      int numberOfItems = lockTokensToComplete.TryPopRange(lockTokens)
      if(numberOfItems > 0) {
         await receiver.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
      }
      await Task.Delay(TimeSpan.FromSeconds(5), token).ConfigureAwait(false);
   }
}

We have reduced the contention since we have a dedicated lock token concurrent collection to operate on per receiver. We have a dedicated completion loop which is self-contained and only operates on the correct receiver and lock token collection pair. So far so good. Despite that we are still wasting a lot of unnecessary resources. We acquired up to the number receivers dedicated completion background tasks which might idle. And we still don’t exactly know if this code constructs scales elastically up and down depending on the number of messages received and the number of clients.

Ideally, we would have a reactive approach which automatically adapts based on parameters we give it. Building such a reactive approach will be the focus of the next few posts. Stay reactive!

Batch completion with multiple receivers on Azure Service Bus

In the last post, we created multiple receivers with multiple factories to speed up our message processing. Unfortunately, this has some consequences for our background completion loop. Since the message processing logic is shared between multiple receivers, all the receivers will try to push lock tokens into the concurrent stack. So the following code will be called by numberOfReceivers * concurrencyPerReceiver concurrently.

// same as before
await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
lockTokensToComplete.Push(message.LockToken);
// same as before

So for example when we’d use 10 receivers with each a concurrency setting of 32 we’d be ending up pushing lock tokens to the concurrent stack from up to 320 simultaneous operations. Not a big deal you could say since the ConcurrentStack implementation is lock-free. Unfortunately, lock-free doesn’t necessarily mean it is contention free.

ConcurrentQueue<T> and ConcurrentStack<T> are completely lock-free in this way. They will never take a lock, but they may end up spinning and retrying an operation when faced with contention (when the CAS operations fail). Read more

The concurrent stack implementation internally uses a technique called spinning and retrying (for more information see the excellent Blocking vs. Spinning section from Joe Albahari). Under contention with a large number of concurrent operations these operations can fail multiple times and might become less efficient than we assumed them to be.

In the previous post I made the following statement:

When we’d received several hundred messages per seconds our randomly chosen “complete every one-hundredth messages” and then “sleep for five seconds” might turn out to be a suboptimal choice.

With multiple receivers pushing lock tokens into the concurrent stack, we might have made our problem even worse. It is entirely possible that our multiple concurrent receivers can fill the concurrent stack faster with lock tokens than our completion loop manage to complete. Our previously chosen five seconds sleep duration might turn out to be way too long under heavy load.

How about we just spin up multiple batch completion tasks like the following?

var completionTasks = new Task[numberOfReceivers];

for(int i = 0; i < numberOfReceivers; i++) { 
   completionTasks[i] = Task.Run(() => BatchCompletionLoop());
}

static async Task BatchCompletionLoop() {
   while(!token.IsCancellationRequested) {
    var lockTokens = new Guid[100];
      int numberOfItems = lockTokensToComplete.TryPopRange(lockTokens)
      if(numberOfItems > 0) {
         await receiveClient.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
      }
      await Task.Delay(TimeSpan.FromSeconds(5), token).ConfigureAwait(false);
   }
}

Now we’ve just made the contention problem on the concurrent stack even worse. Multiple background completion operations are competing on the concurrent stack as well. Because all those completion tasks would have the same Task.Delay value, there is a high chance we would see a pattern where multiple background completion operations are dispatch on a worker thread but only a few would succeed and idle again, effectively wasting a lot of resources in production.

Leaving contention and resource waste aside we’ve now introduced another major flaw in our completion logic. Can you spot it? If not, don’t worry. I’ll pick it up in the next post.

Multiple message receivers with Azure Service Bus

In the first post of this series we looked at a simple message receive loop. We tried to optimize the receive loop by moving out the message completion into a dedicated background operation. It turns out we can do more to boost the message throughput. Here is how our simple loop looked like

var receiveClient = QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.PeekLock);
receiveClient.OnMessageAsync(async message =>
{
...
},
..)

We created a queue client with QueueClient.CreateFromConnection. If we carefully read the best practices for improvements using Service Bus Messaging we can see that there are multiple ways of creating queue clients or message receivers. We can also create receivers with a MessagingFactory.

var factory = await MessagingFactory.CreateAsync(address, settings).ConfigureAwait(false);
var receiveClient = await factory.CreateMessageReceiverAsync(queueName, ReceiveMode.PeekLock).ConfigureAwait(false);
receiveClient.OnMessageAsync(async message =>
{
...
},
..)

So far we haven’t achieved much. We’ve just changed the creation of the receiver. But we could now try to create multiple receive clients like the following snippets illustrate.

var factory = await MessagingFactory.CreateAsync(address, settings).ConfigureAwait(false);
var receiver1 = await factory.CreateMessageReceiverAsync(queueName, ReceiveMode.PeekLock).ConfigureAwait(false);
var receiver2 = await factory.CreateMessageReceiverAsync(queueName, ReceiveMode.PeekLock).ConfigureAwait(false);
receiver1.OnMessageAsync(async message => { ... }, ..);
receiver2.OnMessageAsync(async message => { ... }, ..);

To avoid duplication of the message receive loop logic, we would need to move the logic out into a dedicated method.

receiver1.OnMessageAsync(ReceiveMessage, ..);
receiver2.OnMessageAsync(ReceiveMessage, ..);

static async Task ReceiveMessage(BrokeredMessage message) {
   ...
}

Unfortunately, this will not bring us the desired throughput. The performance as mentioned earlier best practice documentation states:

all clients (senders in addition to receivers) that are created by the same factory share one TCP connection. The maximum message throughput is limited by the number of operations that can go through this TCP connection. The throughput that can be obtained with a single factory varies greatly with TCP round-trip times and message size. To obtain higher throughput rates, you should use multiple messaging factories.

So ideally we would need to have a one-to-one relationship between factories and receivers. Let’s apply that to your snippet above.

var factory1 = await MessagingFactory.CreateAsync(address, settings).ConfigureAwait(false);
var factory2 = await MessagingFactory.CreateAsync(address, settings).ConfigureAwait(false);
var receiver1 = await factory1.CreateMessageReceiverAsync(queueName, ReceiveMode.PeekLock).ConfigureAwait(false);
var receiver2 = await factory2.CreateMessageReceiverAsync(queueName, ReceiveMode.PeekLock).ConfigureAwait(false);
receiver1.OnMessageAsync(ReceiveMessage, ..);
receiver2.OnMessageAsync(ReceiveMessage, ..);

With this approach, each receiver is created by a dedicated messaging factory. There is no TCP connection sharing involved anymore, and each receiver has a dedicated connection. The concurrency settings can be applied as desired to each receiver. With this approach, we can have as many factories and receivers as needed and concurrently fetch messages from the same queue on multiple dedicated connections. If we combine this with a wisely chosen PrefetchCount and potentially partitioned queues or topics we can speed up our message, receive loop to race car speed.

How this influences the batch completion will be outlined in the next installment.

Async method without cancellation support, do it my way.

In the last post, I talked about Dennis Doomen’s LiquidProjects project and the challenge they faced with asynchronous APIs that were not cancelable. Dennis came up with the following solution to the infinitely running Task.Delay operations.

public static class TaskExtensions {
    public static async Task<TResult> WithWaitCancellation<TResult>(this Task<TResult> task,
            CancellationToken cancellationToken) {
        using (var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))            
        {
            var delayTask = Task.Delay(Timeout.Infinite, combined.Token);
            Task completedTask = await Task.WhenAny(task, delayTask);
            if (completedTask == task)
            {
                combined.Cancel();
                return await task;
            }
            else
            {
                cancellationToken.ThrowIfCancellationRequested();
                throw new InvalidOperationException("Infinite delay task completed.");
            }
        }
    }
}

The code above creates a linked token source, an infinitely delayed task which observes the token referenced by the linked token source. When the outer token source cancels the linked token source will also cancel the token owned by it. When the task returned by Task.WhenAny is the actual work task then the linked token source is canceled. This will automatically cancel the delayed task. The benefit of this implementation is its simplicity. The runtime performance is good but we can do better especially when we’d be using the extension method over and over again we can do a few optimizations. Here is a possible approach

public static Task<TResult> WaitWithCancellation<TResult>(this Task<TResult> task, CancellationToken token = default(CancellationToken))
{
    var tcs = new TaskCompletionSource<TResult>();
    var registration = token.Register(s => {
        var source = (TaskCompletionSource<TResult>) s;
        source.TrySetCanceled();
    }, tcs);

    task.ContinueWith((t, s) => {
        var tcsAndRegistration = (Tuple<TaskCompletionSource<TResult>, CancellationTokenRegistration>) s;

        if (t.IsFaulted && t.Exception!= null) {
            tcsAndRegistration.Item1.TrySetException(t.Exception.GetBaseException());
        }

        if (t.IsCanceled) {
            tcsAndRegistration.Item1.TrySetCanceled();
        }

        if (t.IsCompleted) {
            tcsAndRegistration.Item1.TrySetResult(t.Result);
        }

        tcsAndRegistration.Item2.Dispose();
    }, Tuple.Create(tcs, registration), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

    return tcs.Task;
}

Instead of using a delayed task with an infinite time span we use a TaskCompletionSource. The TaskCompletionSource represents a task which is only completed, canceled or faulted when we instruct the source accordingly. So essentially this is an infinitely running task. On the token passed in we register a registration which will get triggered when the token gets canceled. When the token gets canceled, we set the completion source to canceled as well. On the actual task, we register a continuation which sets the state of the task completion source according to the state of the antecedent task (which is the actual worker task). Inside the continuation, we also dispose the token registration not to leak memory. The continuation is scheduled with a runtime hint ExecuteSynchronously to tell the TPL runtime that the work running inside the continuation is short lived and can try to be executed while completing the antecedent task.

To avoid closure allocations we use for the token registration and the continuation the overloads which allow us to pass in a state object. Where we need to pass in multiple things into the delegate we use a tuple to represent those items. Furthermore, we use the TrySet methods to set the task completion source in a safe manner into its final state to avoid raising exceptions that could occur due to races.

Let’s see what the runtime differences of both code snippets are:

You can try it yourself with BenchmarkDotNet, and the tests found on my MicroBenchmark repo. So the second snippet is more complex to understand but it is overall faster and slightly better because it allocates less garbage and omits the asynchronous state machine generation.

Please don’t get me wrong this is slightly esoteric. Depending on your application or system you are building, I would usually go for Dennis’ approach because it is simpler to understand and good enough. If you need a bit more performance and want to save allocations the second approach written by me might come in handy.

Thanks again Dennis for this cool asynchronous challenge.