DMConsumer
完整实现如下
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct, durable: true, autoDelete: false);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dmExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dm", configure => configure.AsDelayedExchange(ExchangeType.Direct));//两个交换机绑定await bus.Advanced.BindAsync(dmExchange, sourceExchange, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 开始消费 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers["biz-delayed"] = 1;message.WithDelay(TimeSpan.FromHours(1));await bus.Advanced.PublishAsync(dmExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延迟消费");}else{//假装在消费//Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消费");}}
相比于DLXConsumer
,DMConsumer
里,我们不需要额外的队列,只需要创建orders.notification_dm
交换机即可,同时直接将交换机绑定到orders.notification
交换机 , EasyNetQ
里使用AsDelayedExchange
指示该交换机为延迟交换机,使用WithDelay
设置消息延迟时间通过查看EasyNetQ
源码 https://github.com/EasyNetQ/EasyNetQ/blob/master/Source/EasyNetQ/DelayedExchangeExtensions.cs , 它封装延迟交换机的设置

文章插图
启动
Producer
再生成一些数据,然后运行DLXConsumer
看效果,和DLXConsumer
一样【【.NET 6】RabbitMQ延迟消费指南】

文章插图
打开RabbitMQ后台,可以看到多了一个类型为
x-delayed-message
的orders.notification_dm
交换机,带有DM
和Args
两个标签
文章插图
进入交换机 , 可以看到里面已经存储了13条消息 。

文章插图
总结自此,利用队列的死信交换机策略和利用
rabbitmq_delayed_message_exchange
插件实现RabbitMQ消息延迟已经介绍完毕 , 下面是.NET6 demo完整的项目结构
文章插图
其实除了这两种,
EasyNetQ
也有一个调度器(Scheduler)可以实现延迟消息,但似乎需要依赖数据库,不是主流的做法不推荐使用 。如有任何问题或者意见,欢迎评论 。
推荐阅读
- 王者荣耀9月26日微信每日一题答案是什么
- 简读《ASP.NET Core技术内幕与项目实战》之3:配置
- 英雄联盟战利品钥匙获取方法是什么
- 进击的巨人最终季大结局_进击的巨人最终季的结局
- 苹果12有几款机型_苹果12有几个型号
- 华容道“过五关”怎么玩(华容道在线免费玩)
- 十一 【Kubernetes】K8s笔记:Ingress 集群进出流量总管
- 弹壳特攻队推图选择哪些技能
- 苹果13promax详细参数_参数配置表
- 如何调整屏幕分辨率(分辨率1920x1080怎么设置)