orders.notification_dlx
,用于接收转发过来延迟消息,同时将该队列的死信交换机设置为orders.notification
;消费消息时,为了消息是不是已经延迟过,可以在消息头里添加一个自定义参数biz-delayed
, 在将需要延迟处理的消息发送到orders.notification_dlx
交换机之前,除了设置过期时间,也同时将biz-delayed
设置为1,后续再消费该消息时,读取该值,不至于陷入死循环 。完整代码如下
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dlxExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dlx", ExchangeType.Direct);var dlxQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification_dlx", configure => configure.WithDeadLetterExchange(sourceExchange));await bus.Advanced.BindAsync(dlxExchange, dlxQueue, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 开始消费 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers.Add("biz-delayed", 1);message.Properties.Expiration = TimeSpan.FromHours(1);await bus.Advanced.PublishAsync(dlxExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延迟消费");}else{//假装在消费Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消费");}}
上述代码中,EasyNetQ
设置队列死信交换机的API为WithDeadLetterExchange
,设置消息过期时间的API为Properties.Expiration
。
运行DLXConsumer
项目,可以看到Type
为1的消息被延迟 , 其它则被正常消费

文章插图
打开RabbitMQ后台确认,原本
orders.notification
里的消息已经被消费掉了,同时多了一个orders.notification_dlx
队列,并且orders.notification_delay
队列相比orders.notification
多了一个DLX
标签,Type
为1的消息就是被转移该队列 。
文章插图
进入
orders.notification_delay
队列,交换机与队列正常绑定,x-dead-letter-exchange
也已被设置
文章插图
检查队列中的消息 , 可以看到
Properties
里的expiration: 3600000
和headers:biz-delayed: 1

文章插图
再过3600000毫秒,
orders.notification_dlx
队列就会被投递到orders.notification
交换机,队列orders.notification
也就会收到这些信息 , 这时因为消息头里有biz-delayed
,消费者会正常将其消费 。使用延迟交换机实现使用延迟交换机,需要RabbitMQ服务器安装
rabbitmq_delayed_message_exchange
插件 , 原理是投递到延迟交换机的消息,会延迟指定时间(x-delay
参数设置)后,自动投递到该交换机绑定的另一交换机上 。直接看代码 。docker环境安装
rabbitmq_delayed_message_exchange
插件这里介绍下docker环境如何安装rabbitmq_delayed_message_exchange
插件 , 首先在github https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 下载与你RabbitMQ服务器大版本匹配的Release,将文件复制到RabbitMQ的/plugins
目录下,命令如下docker cp {rabbitmq_delayed_message_exchange文件路径} {rabbitmq容器id}:/pluginsdocker exec -it {rabbitmq容器id} rabbitmq-plugins enable rabbitmq_delayed_message_exchange
以我本机为例 , 插件启用成功 。
文章插图
下面给解决方法添加一个
DMConsumer
项目 。cd srcdotnet new console -n DMConsumercd DMConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DMConsumer/DMConsumer.csproj
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 王者荣耀9月26日微信每日一题答案是什么
- 简读《ASP.NET Core技术内幕与项目实战》之3:配置
- 英雄联盟战利品钥匙获取方法是什么
- 进击的巨人最终季大结局_进击的巨人最终季的结局
- 苹果12有几款机型_苹果12有几个型号
- 华容道“过五关”怎么玩(华容道在线免费玩)
- 十一 【Kubernetes】K8s笔记:Ingress 集群进出流量总管
- 弹壳特攻队推图选择哪些技能
- 苹果13promax详细参数_参数配置表
- 如何调整屏幕分辨率(分辨率1920x1080怎么设置)