Cleanup code for Cleaning up queues and exchanges on RabbitMQ

In my last post, I introduced the topic of cleaning up queues and exchanges in RabbitMQ. In this post, I’m going to walk you through the cleanup code. Let us first explore the main cleanup logic which requires the following usings

    using System;
    using System.Collections.Generic;
    using System.Globalization;
    using System.Linq;
    using System.Net;
    using System.Net.Http;
    using System.Net.Http.Headers;
    using System.Threading.Tasks;
    using Newtonsoft.Json;

and the Newtonsoft.Json nuget package.

       public void Cleanup()
       {
           var username = "guest";
           var password = "guest";
           var host = "localhost"
           var virtualHost = Uri.EscapeDataString("/"); // %2F is the name of the default virtual host

           var handler = new HttpClientHandler
           {
               Credentials = new NetworkCredential(username, password),
           };
           var httpClient = new HttpClient(handler)
           {
               BaseAddress = new Uri(string.Format(CultureInfo.InvariantCulture, "http://{0}:15672/", host))
           };
           httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

           var exchangeDeletionTasks = DeleteExchanges(httpClient, virtualHost);
           var queueDeletionTasks = DeleteQueues(httpClient, virtualHost);
           var cleanupTasks = exchangeDeletionTasks.Concat(queueDeletionTasks).ToArray();
           Task.WhenAll(cleanupTasks).Wait();

           foreach (var cleanup in cleanupTasks)
           {
               var responseMessage = cleanup.Result;
               try
               {
                   responseMessage.EnsureSuccessStatusCode();
               }
               catch (HttpRequestException)
               {
                   var requestMessage = responseMessage.RequestMessage;
                   Console.WriteLine("Cleanup task failed for {0} {1}", requestMessage.Method, requestMessage.RequestUri);
               }
           }
       }

To be able to connect to the broker API we need the appropriate credentials. We are using in this post the default credentials for the guest user. Furthermore, we assume the broker is running on  localhost. The default virtual host for RabbitMQ is empty. To clean up queues on this default virtual host, we need to send  %2F to the server. We then create a new  HttpClient with a handler that uses the right  NetworkCredential to connect to the broker. The  BaseAddress for the is in our example http://localhost:15672. We need to configure the  HttpClient to send  application/json on the request header by adding  MediaTypeWithQualityHeaderValue(“application/json”) to the  DefaultRequestHeaders.Accept collection.

When the client is properly configured, we can start deleting the exchanges.

static IEnumerable<Task<HttpResponseMessage>> DeleteExchanges(HttpClient httpClient, string virtualHost)
{
    // Delete exchanges
    var exchangeResult = httpClient.GetAsync(string.Format(CultureInfo.InvariantCulture, "api/exchanges/{0}", virtualHost)).Result;
    exchangeResult.EnsureSuccessStatusCode();
    var allExchanges = JsonConvert.DeserializeObject<List<Exchange>>(exchangeResult.Content.ReadAsStringAsync().Result);
    var exchanges = FilterAllExchangesByExcludingInternalTheDefaultAndAmq(allExchanges);

    var exchangeDeletionTasks = new List<Task<HttpResponseMessage>>(exchanges.Count);
    exchangeDeletionTasks.AddRange(exchanges.Select(exchange =>
    {
        var deletionTask = httpClient.DeleteAsync(string.Format(CultureInfo.InvariantCulture, "/api/exchanges/{0}/{1}", virtualHost, exchange.Name));
        deletionTask.ContinueWith((t, o) => { Console.WriteLine("Deleted exchange {0}.", exchange.Name); }, null, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
        deletionTask.ContinueWith((t, o) => { Console.WriteLine("Failed to delete exchange {0}.", exchange.Name); }, null, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
        return deletionTask;
    }));
    return exchangeDeletionTasks;
}

static List<Exchange> FilterAllExchangesByExcludingInternalTheDefaultAndAmq(IEnumerable<Exchange> allExchanges)
{
    return (from exchange in allExchanges
        let isInternal = exchange.Internal
        let name = exchange.Name.ToLowerInvariant()
        where !isInternal  // we should never delete rabbits internal exchanges
        where !name.StartsWith("amq.") // amq.* we shouldn't remove
        where name.Length > 0 // the default exchange which can't be deleted has a Name=string.Empty
        select exchange)
        .ToList();
}

private class Exchange
{
    public string Name { get; set; }
    public bool Internal { get; set; }
}

One important filter method when deleting the exchanges is FilterAllExchangesByExcludingInternalTheDefaultAndAmq. This filter method ensures that we never delete internal exchanges, amq.* exchanges or the default exchange that has an empty name. Deleting one of these exchanges could have severe side effects, and the default exchange can in fact never be deleted. Deleting the remaining exchanges is then fairly trivial. We first acquire all exchanges by issuing a GET request to /api/exchanges/%2F  and ensure that this request returned a successful status code. We materialize the response payload into a  List<Exchange>, filter them with  FilterAllExchangesByExcludingInternalTheDefaultAndAmq and create a list of deletion tasks. For deleting a given exchange we send a DELETE request to  /api/exchanges/%2F/[exchangeName] . To be able to know whether a delete request was successful or not, we add two task continuations to the deletion task. Both are executed synchronously by specifying  TaskContinuationOptions.ExecuteSynchronously, one is only run when the task execution was successful with   TaskContinuationOptions.OnlyOnRanToCompletion, the other only when the task was faulted with  TaskContinuationOptions.OnlyOnFaulted. Finally we return the deletionTask itself and not the continuations.

After you grasp how the deletion of exchanges works, the deletion of queues becomes straight forward.

       static IEnumerable<Task<HttpResponseMessage>> DeleteQueues(HttpClient httpClient, string virtualHost)
       {
           var queueResult = httpClient.GetAsync(string.Format(CultureInfo.InvariantCulture, "api/queues/{0}", virtualHost)).Result;
           queueResult.EnsureSuccessStatusCode();
           var queues = JsonConvert.DeserializeObject<List<Queue>>(queueResult.Content.ReadAsStringAsync().Result);

           var queueDeletionTasks = new List<Task<HttpResponseMessage>>(queues.Count);
           queueDeletionTasks.AddRange(queues.Select(queue =>
           {
               var deletionTask = httpClient.DeleteAsync(string.Format(CultureInfo.InvariantCulture, "/api/queues/{0}/{1}", virtualHost, queue.Name));
               deletionTask.ContinueWith((t, o) => { Console.WriteLine("Deleted queue {0}.", queue.Name); }, null, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
               deletionTask.ContinueWith((t, o) => { Console.WriteLine("Failed to delete queue {0}.", queue.Name); }, null, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
               return deletionTask;
           }));
           return queueDeletionTasks;
       }

       private class Queue
       {
           public string Name { get; set; }
       }

The principle is the same although this time we send the GET request to  /api/queues/%2F   and the individual DELETE requests to  /api/queues/[queueName].

In the end we need to union the exchange deletion tasks with the queue deletion tasks and wait for its completion. When this is done we can check the status codes of each response if desired.

Remark: I decided to show the synchronous blocking version. You could as change the code to  public Task CleanupAsync()  or something alike and then use the amazing  async/await  feature of C#. Happy cleaning!

About the author

Daniel Marbach

Add comment

By Daniel Marbach

Recent Posts