【.NET 6】RabbitMQ延迟消费指南( 二 )

orders.notification_dlx,用于接收转发过来延迟消息,同时将该队列的死信交换机设置为orders.notification;消费消息时,为了消息是不是已经延迟过,可以在消息头里添加一个自定义参数biz-delayed , 在将需要延迟处理的消息发送到orders.notification_dlx交换机之前,除了设置过期时间,也同时将biz-delayed设置为1,后续再消费该消息时,读取该值,不至于陷入死循环 。完整代码如下
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dlxExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dlx", ExchangeType.Direct);var dlxQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification_dlx", configure => configure.WithDeadLetterExchange(sourceExchange));await bus.Advanced.BindAsync(dlxExchange, dlxQueue, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 开始消费 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers.Add("biz-delayed", 1);message.Properties.Expiration = TimeSpan.FromHours(1);await bus.Advanced.PublishAsync(dlxExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延迟消费");}else{//假装在消费Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消费");}}上述代码中,EasyNetQ设置队列死信交换机的API为WithDeadLetterExchange,设置消息过期时间的API为Properties.Expiration
运行DLXConsumer项目,可以看到Type为1的消息被延迟 , 其它则被正常消费

【.NET 6】RabbitMQ延迟消费指南

文章插图
打开RabbitMQ后台确认,原本orders.notification里的消息已经被消费掉了,同时多了一个orders.notification_dlx队列,并且orders.notification_delay队列相比orders.notification多了一个DLX标签,Type为1的消息就是被转移该队列 。
【.NET 6】RabbitMQ延迟消费指南

文章插图
进入orders.notification_delay队列,交换机与队列正常绑定,x-dead-letter-exchange也已被设置
【.NET 6】RabbitMQ延迟消费指南

文章插图
检查队列中的消息 , 可以看到Properties里的expiration: 3600000headers:biz-delayed: 1
【.NET 6】RabbitMQ延迟消费指南

文章插图
再过3600000毫秒,orders.notification_dlx队列就会被投递到orders.notification交换机,队列orders.notification也就会收到这些信息 , 这时因为消息头里有biz-delayed,消费者会正常将其消费 。
使用延迟交换机实现使用延迟交换机,需要RabbitMQ服务器安装rabbitmq_delayed_message_exchange插件 , 原理是投递到延迟交换机的消息,会延迟指定时间(x-delay参数设置)后,自动投递到该交换机绑定的另一交换机上 。直接看代码 。
docker环境安装rabbitmq_delayed_message_exchange插件这里介绍下docker环境如何安装rabbitmq_delayed_message_exchange插件 , 首先在github https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 下载与你RabbitMQ服务器大版本匹配的Release,将文件复制到RabbitMQ的/plugins目录下,命令如下
docker cp {rabbitmq_delayed_message_exchange文件路径} {rabbitmq容器id}:/pluginsdocker exec -it {rabbitmq容器id} rabbitmq-plugins enable rabbitmq_delayed_message_exchange以我本机为例 , 插件启用成功 。
【.NET 6】RabbitMQ延迟消费指南

文章插图
下面给解决方法添加一个DMConsumer项目 。
cd srcdotnet new console -n DMConsumercd DMConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DMConsumer/DMConsumer.csproj

推荐阅读