In my last post, I introduced the topic of cleaning up queues and exchanges in RabbitMQ. In this post, I’m going to walk you through the cleanup code. Let us first explore the main cleanup logic which requires the following usings
using System; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Threading.Tasks; using Newtonsoft.Json;
and the Newtonsoft.Json nuget package.
public void Cleanup() { var username = "guest"; var password = "guest"; var host = "localhost" var virtualHost = Uri.EscapeDataString("/"); // %2F is the name of the default virtual host var handler = new HttpClientHandler { Credentials = new NetworkCredential(username, password), }; var httpClient = new HttpClient(handler) { BaseAddress = new Uri(string.Format(CultureInfo.InvariantCulture, "http://{0}:15672/", host)) }; httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); var exchangeDeletionTasks = DeleteExchanges(httpClient, virtualHost); var queueDeletionTasks = DeleteQueues(httpClient, virtualHost); var cleanupTasks = exchangeDeletionTasks.Concat(queueDeletionTasks).ToArray(); Task.WhenAll(cleanupTasks).Wait(); foreach (var cleanup in cleanupTasks) { var responseMessage = cleanup.Result; try { responseMessage.EnsureSuccessStatusCode(); } catch (HttpRequestException) { var requestMessage = responseMessage.RequestMessage; Console.WriteLine("Cleanup task failed for {0} {1}", requestMessage.Method, requestMessage.RequestUri); } } }
To be able to connect to the broker API we need the appropriate credentials. We are using in this post the default credentials for the guest user. Furthermore, we assume the broker is running on localhost. The default virtual host for RabbitMQ is empty. To clean up queues on this default virtual host, we need to send %2F to the server. We then create a new HttpClient with a handler that uses the right NetworkCredential to connect to the broker. The BaseAddress for the is in our example http://localhost:15672. We need to configure the HttpClient to send application/json on the request header by adding MediaTypeWithQualityHeaderValue(“application/json”) to the DefaultRequestHeaders.Accept collection.
When the client is properly configured, we can start deleting the exchanges.
static IEnumerable<Task<HttpResponseMessage>> DeleteExchanges(HttpClient httpClient, string virtualHost) { // Delete exchanges var exchangeResult = httpClient.GetAsync(string.Format(CultureInfo.InvariantCulture, "api/exchanges/{0}", virtualHost)).Result; exchangeResult.EnsureSuccessStatusCode(); var allExchanges = JsonConvert.DeserializeObject<List<Exchange>>(exchangeResult.Content.ReadAsStringAsync().Result); var exchanges = FilterAllExchangesByExcludingInternalTheDefaultAndAmq(allExchanges); var exchangeDeletionTasks = new List<Task<HttpResponseMessage>>(exchanges.Count); exchangeDeletionTasks.AddRange(exchanges.Select(exchange => { var deletionTask = httpClient.DeleteAsync(string.Format(CultureInfo.InvariantCulture, "/api/exchanges/{0}/{1}", virtualHost, exchange.Name)); deletionTask.ContinueWith((t, o) => { Console.WriteLine("Deleted exchange {0}.", exchange.Name); }, null, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); deletionTask.ContinueWith((t, o) => { Console.WriteLine("Failed to delete exchange {0}.", exchange.Name); }, null, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); return deletionTask; })); return exchangeDeletionTasks; } static List<Exchange> FilterAllExchangesByExcludingInternalTheDefaultAndAmq(IEnumerable<Exchange> allExchanges) { return (from exchange in allExchanges let isInternal = exchange.Internal let name = exchange.Name.ToLowerInvariant() where !isInternal // we should never delete rabbits internal exchanges where !name.StartsWith("amq.") // amq.* we shouldn't remove where name.Length > 0 // the default exchange which can't be deleted has a Name=string.Empty select exchange) .ToList(); } private class Exchange { public string Name { get; set; } public bool Internal { get; set; } }
One important filter method when deleting the exchanges is FilterAllExchangesByExcludingInternalTheDefaultAndAmq. This filter method ensures that we never delete internal exchanges, amq.* exchanges or the default exchange that has an empty name. Deleting one of these exchanges could have severe side effects, and the default exchange can in fact never be deleted. Deleting the remaining exchanges is then fairly trivial. We first acquire all exchanges by issuing a GET request to /api/exchanges/%2F and ensure that this request returned a successful status code. We materialize the response payload into a List<Exchange>, filter them with FilterAllExchangesByExcludingInternalTheDefaultAndAmq and create a list of deletion tasks. For deleting a given exchange we send a DELETE request to /api/exchanges/%2F/[exchangeName] . To be able to know whether a delete request was successful or not, we add two task continuations to the deletion task. Both are executed synchronously by specifying TaskContinuationOptions.ExecuteSynchronously, one is only run when the task execution was successful with TaskContinuationOptions.OnlyOnRanToCompletion, the other only when the task was faulted with TaskContinuationOptions.OnlyOnFaulted. Finally we return the deletionTask itself and not the continuations.
After you grasp how the deletion of exchanges works, the deletion of queues becomes straight forward.
static IEnumerable<Task<HttpResponseMessage>> DeleteQueues(HttpClient httpClient, string virtualHost) { var queueResult = httpClient.GetAsync(string.Format(CultureInfo.InvariantCulture, "api/queues/{0}", virtualHost)).Result; queueResult.EnsureSuccessStatusCode(); var queues = JsonConvert.DeserializeObject<List<Queue>>(queueResult.Content.ReadAsStringAsync().Result); var queueDeletionTasks = new List<Task<HttpResponseMessage>>(queues.Count); queueDeletionTasks.AddRange(queues.Select(queue => { var deletionTask = httpClient.DeleteAsync(string.Format(CultureInfo.InvariantCulture, "/api/queues/{0}/{1}", virtualHost, queue.Name)); deletionTask.ContinueWith((t, o) => { Console.WriteLine("Deleted queue {0}.", queue.Name); }, null, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); deletionTask.ContinueWith((t, o) => { Console.WriteLine("Failed to delete queue {0}.", queue.Name); }, null, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); return deletionTask; })); return queueDeletionTasks; } private class Queue { public string Name { get; set; } }
The principle is the same although this time we send the GET request to /api/queues/%2F and the individual DELETE requests to /api/queues/[queueName].
In the end we need to union the exchange deletion tasks with the queue deletion tasks and wait for its completion. When this is done we can check the status codes of each response if desired.
Remark: I decided to show the synchronous blocking version. You could as change the code to public Task CleanupAsync() or something alike and then use the amazing async/await feature of C#. Happy cleaning!