【.NET 6】RabbitMQ延迟消费指南( 三 )

DMConsumer完整实现如下
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct, durable: true, autoDelete: false);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dmExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dm", configure => configure.AsDelayedExchange(ExchangeType.Direct));//两个交换机绑定await bus.Advanced.BindAsync(dmExchange, sourceExchange, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 开始消费 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers["biz-delayed"] = 1;message.WithDelay(TimeSpan.FromHours(1));await bus.Advanced.PublishAsync(dmExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延迟消费");}else{//假装在消费//Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消费");}}相比于DLXConsumerDMConsumer里,我们不需要额外的队列,只需要创建orders.notification_dm交换机即可,同时直接将交换机绑定到orders.notification交换机 , EasyNetQ里使用AsDelayedExchange指示该交换机为延迟交换机,使用WithDelay设置消息延迟时间通过查看EasyNetQ源码 https://github.com/EasyNetQ/EasyNetQ/blob/master/Source/EasyNetQ/DelayedExchangeExtensions.cs , 它封装延迟交换机的设置

【.NET 6】RabbitMQ延迟消费指南

文章插图
启动Producer再生成一些数据,然后运行DLXConsumer看效果,和DLXConsumer一样
【【.NET 6】RabbitMQ延迟消费指南】
【.NET 6】RabbitMQ延迟消费指南

文章插图
打开RabbitMQ后台,可以看到多了一个类型为x-delayed-messageorders.notification_dm交换机,带有DMArgs两个标签
【.NET 6】RabbitMQ延迟消费指南

文章插图
进入交换机 , 可以看到里面已经存储了13条消息 。
【.NET 6】RabbitMQ延迟消费指南

文章插图
总结自此,利用队列的死信交换机策略和利用rabbitmq_delayed_message_exchange插件实现RabbitMQ消息延迟已经介绍完毕 , 下面是.NET6 demo完整的项目结构
【.NET 6】RabbitMQ延迟消费指南

文章插图
其实除了这两种,EasyNetQ也有一个调度器(Scheduler)可以实现延迟消息,但似乎需要依赖数据库,不是主流的做法不推荐使用 。
如有任何问题或者意见,欢迎评论 。

推荐阅读