Azure Service Bus .NET SDK Deep Dive – Send Via

With SendVia, it is possible to create an atomic transaction between the incoming message and outgoing messages, for more posts in this series go to Contents.

In scenarios where transactional processing is required for an incoming message that would generate some outgoing messages, the incoming and outgoing messages should all succeed or rollback. Failure to do so would either create duplicate processing (failure to complete the incoming message) or ghost messages (failure to revert the outgoing messages).

Similar to shipping something from one point (source) to another point (destination), your package might go through straight to the destination and, instead might go through an intermediary. The idea behind Send-via is to create an atomic operation between an incoming message and the outgoing messages. The following video demonstrates this:

So instead of sending outgoing messages directly to the destination queues, the messages are sent to a transfer queue. The messages in the transfer queue are associated with the incoming message. Only when the incoming message is successfully acknowledged, Azure Service Bus will reliably transfer the messages in the transfer queue to the destination queues. To demonstrate the effect that can occurs without the SendVia feature let’s have a look at the following code

await kickOffSender.SendMessageAsync(new ServiceBusMessage("Kick off"));

var receiver = serviceBusClient.CreateProcessor(inputQueue);
var sender = serviceBusClient.CreateSender(destinationQueue);

await Prepare.ReportNumberOfMessages(connectionString, destinationQueue);

receiver.ProcessMessageAsync += async processMessageEventArgs =>
{
    var message = processMessageEventArgs.Message;

    await sender.SendMessageAsync(new ServiceBusMessage("Will leak"));
    throw new InvalidOperationException();
};

If you have been following this series, the code above should be more or less clear. First, I’m sending a message to an inputQueue with the content Kick off. The receiver is feeding messages from that inputQueue and there is a message sender that will be used to send messages to destinationQueue. At the end of the message receiver callback function, I’ve added a throw statement that will throw an InvalidOperationException. The message will be rolled back and retried depending on the MaxDeliveryCount settings on the queue. In this demo I set the MaxDeliveryCount to two. But what happens to the send operation? Let’s see how this code behaves

As the content of the message already indicates, the message that is sent in the callback function of the receiver will be sent to the destination queue even when the receive operation failed. Because the MaxDeliveryCount in this example is set to two, two messages will be leaked to the destinationQueue and are immediately available for processing on the destination queue, meaning that partial updates can be observed in the system even though the actual business transaction was not completed yet. Translating this to business code means that how you structure your code and when you try to send out messages has large implications on the effect on the system and every component participating in complex business processes need to be able to deal with duplicates as well as partial updates. Let’s see how we can solve this.

await using var transactionalClient = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
    EnableCrossEntityTransactions = true,
});
receiver = transactionalClient.CreateProcessor(inputQueue);
sender = transactionalClient.CreateSender(destinationQueue);

receiver.ProcessMessageAsync += async processMessageEventArgs =>
{
    var message = processMessageEventArgs.Message;
    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        await sender.SendMessageAsync(new ServiceBusMessage("Will not leak"));

        if (!message.ApplicationProperties.ContainsKey("Win")) throw new InvalidOperationException();

        await sender.SendMessageAsync(new ServiceBusMessage("Will not leak"));

        scope.Complete();
    }
};

There are two important changes:

  • The receiver and the sender need to be created from the same ServiceBusClient that has EnableCrossEntityTransaction set to true
  • The ServiceBusSender operations need to be wrapped with TransactionScope(TransactionScopeAsyncFlowOption.Enabled)

That’s all. If we do that and send the following messages

await kickOffSender.SendMessageAsync(new ServiceBusMessage("Fail"));
var winMessage = new ServiceBusMessage("Win");
winMessage.ApplicationProperties.Add("Win", true);
await kickOffSender.SendMessageAsync(winMessage);

Sends during failures of incoming messages will no longer leak out as the next demo demonstrates.

You might want to be careful to apply this principle where it makes sense because the connection sharing, as well as the behind the scenes transaction scope processing in combination with the additional transfer on the broker, can have a visible impact on the throughput of your system. But if you need transactional processing SendVia is a very powerful feature to leverage to avoid having to worry about messages leaking out to receivers when they shouldn’t have.

If you are curious to hear a bit more about message processing, transactions , and partial updates/failures have a Life beyond transaction part of my DIY Async Message Pumps talk I gave at BuildStuff Lithuania in 2019.

Updated: 2021-03-26 to use the new SDK

About the author

Daniel Marbach

6 comments

By Daniel Marbach

Recent Posts