Your source of geek knowledge

MultiProducerConcurrentConsumer – Asynchronous synchronisation

In the last post I explained how the push in batches up to the concurrency per slot works. This post covers the asynchronous synchronisation approach I used to unleash the push in batches when the batch size is reached.

In the code shown in the Push It post I simplified the synchronisation primitive and called it syncEvent. In reality, the code uses a TaskCompletionSource for synchronisation.

The MPCC declares a field of type TaskCompletionSource<bool>. For the MPCC the TResult type of the TaskCompletionSource doesn’t matter. It could be anything.

TaskCompletionSource<bool> batchSizeReached = new TaskCompletionSource<bool>();

The Push method looks like

public void Push(TItem item, int slotNumber) {
    // as before

    if (incrementedCounter > batchSize)
    {
        batchSizeReached.TrySetResult(true);
    }
}

Whenever the batch size is reached the component tries to set the task completion source into the completed state. It is crucial to use the TrySetResult method since it is possible that the timer loop will try to complete it at the same time, due to the push interval being reached. Using SetResult in Push would result in exceptions bubbling up to the caller of Push in that case.

async Task TimerLoop() {
    var token = tokenSource.Token;
    while (!tokenSource.IsCancellationRequested) {
        try
        {
            await Task.WhenAny(Task.Delay(pushInterval, token), batchSizeReached.Task).ConfigureAwait(false);
            
            batchSizeReached.TrySetResult(true); 

            batchSizeReached = new TaskCompletionSource<bool>();
            await PushInBatches().ConfigureAwait(false);
        }
        catch (Exception)
        {
            // intentionally ignored
        }
    }
}

The timer loop awaits with a Task.WhenAny either the push interval delay task or the batchSizeReached task that is represented by the TaskCompletionSource. When the WhenAny task returns the batchSizeReached task completion source is set to completed with TrySetResult because it might have been the push interval task that completed. Then the field is reset to a new TaskCompletionSource because TaskCompletionSource cannot be reused. The code doesn’t use locking around setting the TaskCompletionSource. The assumption is that field exchange is atomic and there is only one timer loop that resets the field. What could happen though is the following:

Concurrent operations might do a TrySetResult in the push method on the previous task completion source. In that case, the next push will automatically lead to unleashing the task completion source since the incrementedCounter hasn’t been reset. In the worst case if no push will happen the MPCC will only push when the next push interval is due but in my opinion, this is a reasonable tradeoff to make compared to the performance overhead locking around the task completion source would incorporate.

In the next installement, I’m going to talk about the completion method of the MPCC.

MultiProducerConcurrentConsumer – Push in batches up to concurrency per slot

In the previous post I introduced the PushInBatches method who’s responsibility it is to start the PushInBatchesUpToConcurrencyPerQueueForAGivenSlot process. In this post I will focus on the mouthful method above.

Like the name of the method already suggest its responsibility is to push items for a slot in batches to the pump delegate while respecting the concurrency settings per slot as provided by the user and explained in the introduction post.

Without further ado here is the whole code.

void PushInBatchesUpToConcurrencyPerQueueForAGivenSlot(ConcurrentQueue<TItem> queue, 
    int currentSlotNumber, List<Task> tasks)  {

    int numberOfItems;
    var concurrency = 1;
    do {
        numberOfItems = 0;
        List<TItem> items = null;
        for (var i = 0; i < batchSize; i++) {
            TItem item;
            if (!queue.TryDequeue(out item)) {
                break;
            }

            if (items == null && !itemListBuffer.TryDequeue(out items)) {
                items = new List<TItem>(batchSize);
            }

            items.Add(item);
            numberOfItems++;
        }

        if (numberOfItems <= 0) {
            return;
        }

        Interlocked.Add(ref numberOfPushedItems, -numberOfItems);
        concurrency++;
        var task = pump(items, currentSlotNumber, state, tokenSource.Token)
        .ContinueWith((t, taskState) => {

            var itemListAndListBuffer = 
                (Tuple<List<TItem>, ConcurrentQueue<List<TItem>>>) taskState;
            itemListAndListBuffer.Item1.Clear();
            itemListAndListBuffer.Item2.Enqueue(itemListAndListBuffer.Item1);

        }, Tuple.Create(items, itemListBuffer), TaskContinuationOptions.ExecuteSynchronously);
        tasks.Add(task);
    } while (numberOfItems == batchSize && concurrency <= maxConcurrency);
}

The method sets the initial concurrency to one and starts a while loop. The while loop will continue as longs as the numberOfItems that have been picked from the slots is equal to the batch size and the concurrency is less or equal than the maximum concurrency that is allowed per slot. Inside the while loop, there are essentially two subroutines. The first part consists of a for loop to get items from the queue and the second part concurrently dispatches the pump tasks as well as a cleanup continuation which returns and clears the borrowed item buffer and makes it available to be reused again.

The for loop starts from zero to the batch size and tries to dequeue items from the queue of the slot. When there is nothing to dequeue the for loop is left. When there is something to dequeue, and the items buffer has not yet been acquired it tries to borrow a buffer from the buffer pool. After that, the item is added to the items list and the numberOfItems is increased by one. This process continues until the queue has no more items or the batch size is reached. When numberOfItems is zero, we return because there is nothing ready to be pushed to the pump delegate.

When there are items to be pushed we first decrease the numberOfPushedItems with numberOfItems in a thread safe manner (using Interlocked.Add). Then we increase the current concurrency. After that, the pump delegate is executed without awaiting the task. Since we want to execute the pump delegate concurrently, the task cannot be awaited If the task would be awaited the code would execute sequentially instead of concurrently. To the pump task a continuation is attached that avoids closure allocations by passing in the state the lambda body needs into the state parameter of the ContinueWith method. The lambda extracts the state when it is executed and clears the buffer list and returns the buffer list to the buffer pool for further reuse. The continuation has the TaskContinuationOptions.ExecuteSynchronously to hint to the TPL runtime that the lambda executed there doesn’t need to be rescheduled (it might still be depending on runtime circumstances but I’m getting off topic here). The continuation task is then added to the task list so that the PushInBatches method can await all those tasks concurrently.

In the next instalment, I’m going to shed some light on the asynchronous synchronisation primitive I quickly described in the Push It post. I hope you enjoy so far this deep dive series into the MPCC component. Feel free to leave comments, likes, criticism and more over the usual channels.

Running integration tests inside Service Fabric

I’m currently diving into Service Fabric. One of the difficult things that you’ll be faced with when you write code against the Service Fabric APIs is how you can integration test those components. Infrastructure like the reliable state manager is only available inside Service Fabric. So somehow you need to run the tests that are testing the components that use infrastructure like reliable state manager inside the cluster. In this post, I show you an approach which allows running integration tests written in NUnit inside Service Fabric. The full version of the code can be found in my ServiceFabric.Testing GitHub repository.

The idea is simple:

  1. Create an application with a stateful service that hosts the integration tests
  2. Run the tests inside the stateful service hosted in the cluster
  3. Report the progress and the results back to the build server (I’ll be using TeamCity as an example)

Let’s build this thing. For simplicity reasons, I show only how you could get access to the reliable state manager. All tests that need access to the reliable state manager could inherit from StatefulServiceContextAwareBase. The base class extracts the reliable state manager from the TestContext provided by NUnit.

public abstract class StatefulServiceContextAwareBase
{
    public IReliableStateManager StateManager { get; set; } =
        TestContext.CurrentContext.Test.Properties.Get("ReliableStateManager") as IReliableStateManager;
}

But somehow the state manager needs to get into the text context. This magic happens in the RunAsync method of the stateful service that hosts the tests.

ConcurrentQueue<string> output = new ConcurrentQueue<string>();

protected override async Task RunAsync(CancellationToken cancellationToken) {
    
    var runner = new NUnitTestAssemblyRunner(new DefaultTestAssemblyBuilder());
    runner.Load(GetType().Assembly, new Dictionary<string, object>());
    runner.RunAsync(new CompositeListener(
        new ContextAwareTestListener(StateManager),
        new TeamCityEventListener(new TextWriterConcurrenctQueueDecorator(output))), TestFilter.Empty);

    using (cancellationToken.Register(() => runner.StopRun(force: false)))
    {
        // rest omitted
    }
}

As a package dependency, I used NUnitLite. It has everything you need to host tests inside a library, console or like we do inside a ServiceFabric stateful service. The assumption of the sample I show here is that all tests are written in the same assembly that contains the stateful service definition. With that assumption / convention, I can create an NUnitTestAssemblyRunner and load the current assembly as a test assembly into the runner. In the RunAsync method of the test runner, we can provide implementations of the ITestListener interface. Test listeners can report state changes of tests or interact with the tests definitions.  In my case, I used a composite listener who manages a series of listeners. The responsibility of the ContextAwareTestListener is to fill the reliable state manager into the test properties so that the StatefulServiceContextAwareBase can read it again. Let’s see how it looks like

class ContextAwareTestListener : ITestListener {
    private IReliableStateManager statefulStateManager;

    public ContextAwareTestListener(IReliableStateManager stateManager) {
        statefulStateManager = stateManager;
    }

    public void TestStarted(ITest test) {
        test.Properties.Add("ReliableStateManager", statefulStateManager);
    }

    // ...
}

Since I’m running the tests inside TeamCity, I add the TeamCityEventListener defined in the NUnitLite package. The listener adds messages to a TextWriter which can be interpreted by TeamCity to visualise the state and the outcome of test runs. In my example, I used a ConcurrentQueue<string> to insert the lines and read it again. So I wrote a StringWriter adapter which forwards WriteLines into the concurrent queue.

class TextWriterConcurrenctQueueDecorator : StringWriter {
    private ConcurrentQueue<string> output;

    public TextWriterConcurrenctQueueDecorator(ConcurrentQueue<string> output) {
        this.output = output;
    }

    public override void WriteLine(string format, object arg0) {
        output.Enqueue(string.Format(format, arg0));
    }

    // ...
}

To get the test output from inside the cluster to TeamCity I’ve created a communication listener which exposes a web listener inside ServiceFabric.

private async Task ProcessInternalRequest(HttpListenerContext context, CancellationToken cancelRequest) {
    try
    {
        if (output.Count == 0)
        {
            return;
        }

        using (HttpListenerResponse response = context.Response)
        using(var streamWriter = new StreamWriter(response.OutputStream))
        {
            response.ContentType = "text/plain";
            string line;
            while (output.TryDequeue(out line))
            {
                await streamWriter.WriteLineAsync(line).ConfigureAwait(false);
            }
            await streamWriter.FlushAsync().ConfigureAwait(false);
            streamWriter.Close();
            response.Close();
        }
    }
    catch (Exception)
    {
        // stream closed etc.
    }
}

Everytime a web request comes in the content of the concurrent queue is written to the output stream. For the endpoint to properly work we need to declare it in the ServiceManifest.xml like

  <Resources>
    <Endpoints>
      <Endpoint Name="Web" Protocol="http" UriScheme="http" Port="8089" />
...
    </Endpoints>
  </Resources>

The client code becomes dead simple since ServiceFabric has a reverse proxy built-in we can call to the proxy URI according to the following schema http://{fqdn}:{port}/{Application}/{Service}/{Endpoint}. Since we are running on localhost with TestApplication and TestRunner as service name the URI should be http://localhost:19081/TestApplication/TestRunner/Web.

var httpClient = new HttpClient();

var content = await httpClient.GetAsync("http://localhost:19081/TestApplication/TestRunner/Web");
var stringContent = await content.Content.ReadAsStringAsync();
if (!string.IsNullOrEmpty(stringContent))
{
    Console.Write(stringContent);
}

The code above could be executed in a loop until the test suite end is seen in the http content.

The final part is to setup TeamCity build steps

And here is how the report looks like with live updates.

I hope that helps to get started.

My friend Szymon Kulec came up with an even more elegant way that also works from inside Visual Studio or the Resharper, test runner. Make sure you follow his blog!

MultiProducerConcurrentConsumer – Push in batches

In the last post, I introduced the push method and the timer loop of the MultiProducerConcurrentConsumer. In this post, I’ll focus on the actual PushInBatches method that gets called by the timer loop.

Task PushInBatches() {
    if (Interlocked.Read(ref numberOfPushedItems) == 0) {
        return TaskEx.Completed;
    }

    for (var i = 0; i < numberOfSlots; i++) { 
       var queue = queues[i]; 

       PushInBatchesUpToConcurrencyPerQueueForAGivenSlot(queue, i, pushTasks); 
    } 

    return Task.WhenAll(pushTasks).ContinueWith((t, s) => {
        var tasks = (List<Task>) s;
        tasks.Clear();
    }, pushTasks, TaskContinuationOptions.ExecuteSynchronously);
}

When the method is entered it first checks whether there is anything to push. Since numberOfPushedItems is accessed by multiple threads it uses Interlocked to read it and compare the returned value. When there is nothing to push a completed task is return. By not using the async keyword here we avoid the state machine generated by the compiler. Bear in mind this trick should not be blindly applied.

Then it loops through all the slots. The queue per slot is taken from the queues array, and PushInBatchesUpToConcurrencyPerQueueForAGivenSlot is called with the queue of the current slot and the pushTasks list. Remember the push task list and the queues array was previously allocated as members of the class for efficient reuse.

At the end of the method, a task that completes is created when all of the tasks contained in pushTasks are completed with Task.WhenAll. A synchronous continuation is scheduled that will make sure the pushTasks list gets cleared at the end of the execution. That continuation task is returned to the caller which is the timer loop. This means the timer loop will only continue when the pushTask list is cleared. So instead of creating a list of tasks for every loop cycle we reuse the task list and clear it after each push cycle which reduces the garbage collection pressure.

Here I used another not so well known trick. Instead of accessing the pushTasks list directly and therefore creating a closure over this, I’m passing the pushTasks list as a state parameter to the ContinueWith method and extract the state inside the body of the continuation. Which save an Action delegate allocation per push cycle.

In the next installment, we’ll look into PushInBatchesUpToConcurrencyPerQueueForAGivenSlot since it is complex enough to warrant a dedicated post.

MultiProducerConcurrentConsumer – Push it

In the last post, I introduced the start procedure for the MultiProducerConcurrentConsumer including a rough outline of the timer loop responsible for pushing items in batches after the push interval is elapsed.

To understand how the batch size affects the push let me first explain the push method. The push method’s responsibility is primarily to insert the item of type TItem into the right slot. To achieve that the passed in slot number is range checked against the available number of slots (of course a lower bound would not hurt either 😉 ). The passed in slot number is used to pick the concurrent queue that represents the slot, and the item is enqueued into the slot.

public void Push(TItem item, int slotNumber)
{
    if (slotNumber >= numberOfSlots)
    {
        throw new ArgumentOutOfRangeException(nameof(slotNumber),
          $"Slot number must be between 0 and {numberOfSlots - 1}.");
    }

    queues[slotNumber].Enqueue(item);

    var incrementedCounter = Interlocked.Increment(ref numberOfPushedItems);

    if (incrementedCounter > batchSize)
    {
        syncEvent.Set();
    }
}

The second responsibility of the push method is to count the number of items that were inserted into the MPCC. In my version of the MPCC, I made a simple trade off. Instead of having a counter per slot I’ve chosen a global counter which counts all the pushed items called numberOfPushedItems. I think it is a reasonable tradeoff and makes the implementation much simpler but still achieves its goal (what do you think?). Since multiple threads could insert data concurrently, I’m using Interlocked.Increment to increment the counter. Whenever the incremented counter is greater than the batch size, the push method signals a synchronization primitive* to unleash the push process. But how does the MPCC know when the batch size is reached?

It does it by observing the synchronization primitive. Let’s expand the previously shown TimerLoop slightly:

async Task TimerLoop()
{
    var token = tokenSource.Token;
    while (!tokenSource.IsCancellationRequested)
    {
        try
        {
            await Task.WhenAny(Task.Delay(pushInterval, token),
               syncEvent.WaitAsync(token)).ConfigureAwait(false);
            await PushInBatches().ConfigureAwait(false);
        }
        catch (Exception)
        {
            // intentionally ignored
        }
    }
}

So instead of only doing a Task.Delay with the push interval we combine the Task.Delay with the syncEvent.WaitAsync into a Task.WhenAny. What it means is the following: Task.WhenAny will return when any of the tasks inside the array passed into the method is completed (successfully, faulted or canceled). So the await will continue when either the push interval task is elapsed (or canceled), or the synchronization primitive was set (or the token canceled). This code has a slight deficiency. Every time we reach the batch size before the push interval has elapsed the task representing the push interval will “leak” until the interval is elapsed. But we will ignore this for now and potentially talk about this in another post.

The code shown so far is now capable of getting items pushed to it into the right slot and when either the push interval or the batch size is reached the component will push items in batches to the pump delegate. In the next post we will cover the actual PushInBatches method.

* I’m using an AsyncAutoResetEvent which I will explain in a later post.

MultiProducerConcurrentConsumer – Start it

In the previous instalment, we discussed the technique of preallocating and reuse to save allocations during runtime. In this post, I will introduce the heart of the MPCC which is the start method and the timer loop outline.

The MPCC can get started at any time after construction. If the component is started, it will start pushing items to the pump delegate whenever the batch size is reached or the push interval. This is how the Start method looks like

public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump)
{
    Start(pump, null);
}

public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state)
{
    if (started)
    {
        throw new InvalidOperationException("Already started");
    }

    tokenSource = new CancellationTokenSource();
    timer = Task.Run(TimerLoop);
    this.pump = pump;
    this.state = state;
    started = true;
}

The MPCC can be started once only. There are two variations for starting, a state based overload and a func which doesn’t provide a state. The state based overload is here to enable scenarios where you want to avoid closure capturing. So you can pass in the state that is required to execute the functionality inside the pump delegate implementation.

Furthermore, the component requires a cancellation token source and a timer loop that executes the interval based pushing of items to the pump. Let’s have a quick look at the timer loop implementation (a few details are left out since we will be exploring those in future posts):

async Task TimerLoop()
{
    var token = tokenSource.Token;
    while (!tokenSource.IsCancellationRequested)
    {
        try
        {
            await Task.Delay(pushInterval, token).ConfigureAwait(false);
            await PushInBatches().ConfigureAwait(false);
        }
        catch (Exception)
        {
            // intentionally ignored
        }
    }
}

The timer loop is scheduled to be executed on the worker thread pool. In its heart, we have a while loop that loops until the token indicates a cancellation was requested. Inside the loop, the component uses a Task.Delay to asynchronously wait until the push interval was reached or the token was cancelled. After the push interval is over the method PushInBatches is called. In this implementation, I choose to ignore any exceptions that might be happening either in the Task.Delay (i.ex. OperationCanceledException was thrown) or exceptions bubbling up from PushInBatches.

Now we have a simple approach to push items in batches based on the push interval defined in the constructor. But this doesn’t address the batch size based pushing of items. We’ll cover this in the next posts.

MultiProducerConcurrentConsumer – Preallocate and reuse

In the last post, I introduced the MultiProducerConcurrentConsumer component. In this post, I’m going to walk through the constructor of the component and explain the preallocation trade-off that was made.

With almost every code that is executed on the hot path of your library, framework or system you need to make tradeoffs regarding allocations. In the end, it boils down to roughly two choices:

  • Allocate as much as you need upfront
  • Allocate on-demand when you need it

The benefit of only allocating when needed is that the memory consumption only grows when it is needed. The downside of this approach is that under highly concurrent scenarios allocating structures in a safe and lock-free way can be tricky.

The drawback of allocating upfront is that even though your application might never need it, the memory is preallocated and will stay that way. On the other hand, if we allocate the potentially needed structures upfront we never need to allocate during concurrent execution. Unfortunately preallocating upfront only works when you know the boundary conditions of the problem at hand.

With the MPCC structure, we can make a few assumptions which allow us to pre-allocate internally needed structures.

public MultiProducerConcurrentCompletion(
    int batchSize, TimeSpan pushInterval, int maxConcurrency, int numberOfSlots)
{
    this.maxConcurrency = maxConcurrency;
    this.pushInterval = pushInterval;
    this.batchSize = batchSize;
    this.numberOfSlots = numberOfSlots;

    queues = new ConcurrentQueue<TItem>[numberOfSlots];
    for (var i = 0; i < numberOfSlots; i++)
    {
        queues[i] = new ConcurrentQueue<TItem>();
    }
    
    var maxNumberOfConcurrentOperationsPossible = numberOfSlots*maxConcurrency;
    pushTasks = new List<Task>(maxNumberOfConcurrentOperationsPossible);

    itemListBuffer = new ConcurrentQueue<List<TItem>>();
    for (var i = 0; i < maxNumberOfConcurrentOperationsPossible; i++)
    {
        itemListBuffer.Enqueue(new List<TItem>(batchSize));
    }
}

Since we know the number of slots up front and we defined the component needs FIFO semantics we can use an array of ConcurrentQueue<TItem> (Line 9) and create all the queues for all the slots ahead of time (Line 10-13).

The maximum concurrency is defined per slot. So we can calculate the maximum number of concurrent operations that are possible at any given time to be number of slots multiplied with maximum concurrency (Line 15). As soon as we calculated maxNumberOfConcurrentOperationsPossible we can allocate a List<Task> which is going to hold concurrently happening operations for graceful shutdown purposes (Line 16).

Last but not least we make the following tradeoff. During runtime, the component will get many items pushed to it and complete items in batches. If we would allocate a List<TItem> everytime the pump callback is called, we’d be creating a lot of Gen0 Garbage. Namely maxNumberOfConcurrentOperationsPossible List<TItem> at maximum per push interval. Depending on the push interval settings this could drastically impact the performance. That’s why the MPCC creates a ConcurrentQueue<List<TItem>> and enqueues List<TItem>(batchSize) into it(Line 18-22). This queue will serve as a List<TItem> buffer. Effectively we are reusing the lists during the whole lifetime of the component. You might have noticed that we initialize the list with the batch size that is also known upfront to prevent unnecessary growing of the lists.

Hope you like this so far. Stay tuned for the next post about MPCC.

Introduction to the MultiProducerConcurrentConsumer for Azure Service Bus message completion

In the last post in the series about Azure Service Bus message completion, I briefly talked how the message batch completion could benefit from a more reactive approach. Before I dive into the inner workings of the reactive structure that I’m going to outline in the next few posts, I need to briefly recap the functional and non-functional requirements we need from the component.

The component shall

  • Make sure messages are only completed on the receiver they came from
  • Reduce the number of threads used when the number of clients is increased
  • Autoscale up under heavy load
  • Scale down under light load
  • Minimise the contention on the underlying collections used
  • Be completely asynchronous
  • Implements a push based model from the producer and consumer perspective
  • Respect the maximum batch sized defined by the client of the component or a predefined push interval
  • Provide FIFO semantics instead of LIFO
  • Be as lock-free as possible

From a design viewpoint the MultiProducerConcurrentConsumer (MPCC) component will look like

The MPCC manages internally N number of slots that can contain items of type TItem. Whenever the timeout or the batch size is reached the component fans out up to a provided concurrency per slot the completion callback.

class MultiProducerConcurrentConsumer<TItem> {
    public MultiProducerConcurrentConsumer(int batchSize, TimeSpan pushInterval, 
int maxConcurrency, int numberOfSlots) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state) { }

    public void Push(TItem item, int slotNumber) { }

    public async Task Complete(bool drain = true) { }
}    

Shown above is the class outline that we are going to use for the component. The constructor allows specifying the batch size, the push interval, the number of slots and the concurrency per slot. The component has a method to push items to it called Push. As soon as the component is started the provided pump delegate will be called whenever the batch size is reached or the push interval. The concurrency per slot is probably best explained by giving an example.

Let’s assume we have a batch size of 500 and a push interval of 2 seconds. When producers can produce 1500 items per second the batch size is reached three times per second. MPCC will auto-scale in that case up to the provided concurrency. So in the above example, it will take out 500 items and invoke the callback and then concurrently try to fetch another 500 items and again another 500 items until the slot is empty or the maximum concurrency is reached.

The Complete method is available for the deterministic stopping of the MPCC. It allows to asynchronously wait for all slots to be drained (by default) or immediately stop and throw away all items that haven’t been processed.

In the next post, we are going to depict the inner workings of the MultiProducerConcurrentConsumer, hope you’ll enjoy.