
文章插图
从上图可知 , 本质上还是发布订阅模式的实现 , 接下来就核心概念进行详解 。
MessageMessage:消息,可以使用class、interface、struct和record来创建 , 消息作为一个契约,需确保创建后不能篡改,因此应只保留只读属性且不应包含方法和行为 。MassTransit使用的是包含命名空间的完全限定名即
typeof(T).FullName
来表示特定的消息类型 。因此若在另外的项目中消费同名的消息类型,需确保消息的命名空间相同 。另外需注意消息不应继承,以避免发送基类消息类型造成的不可预期的结果 。为避免此类情况 , 官方建议使用接口来定义消息 。在MassTransit中,消息主要分为两种类型:- Command:命令,用于告诉服务做什么,命令被发送到指定端点 , 仅被一个服务接收并执行 。一般以动名词结构命名,如:UpdateAddress、CancelOrder 。
- Event:事件,用于告诉服务什么发生了,事件被发布到多个端点,可以被多个服务消费 。一般以过去式结构命名 , 如:AddressUpdated,OrderCanceled 。
{"messageId": "6c600000-873b-00ff-9a8f-08da8da85542","requestId": null,"correlationId": null,"conversationId": "6c600000-873b-00ff-9526-08da8da85544","initiatorId": null,"sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true","destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent","responseAddress": null,"faultAddress": null,"messageType": ["urn:message:MassTransit.Demo:OrderCreatedEvent"],"message": {"orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"},"expirationTime": null,"sentTime": "2022-09-03T12:32:15.0796943Z","headers": {},"host": {"machineName": "THINKPAD","processName": "MassTransit.Demo","processId": 24684,"assembly": "MassTransit.Demo","assemblyVersion": "1.0.0.0","frameworkVersion": "6.0.5","massTransitVersion": "8.0.6.0","operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"}}
从以上消息实例中可以看出一个包装后的消息包含以下核心属性:- messageId:全局唯一的消息ID
- messageType:消息类型
- message:消息体,也就是具体的消息实例
- sourceAddress:消息来源地址
- destinationAddress:消息目标地址
- responseAddress:响应地址,在请求响应模式中使用
- faultAddress:消息异常发送地址 , 用于存储异常消费消息
- headers:消息头,允许应用自定义扩展信息
- correlationId:关联Id , 在Saga状态机中会用到 , 用来关联系列事件
- host:宿主,消息来源应用的宿主信息

文章插图
从以上类图可以看出,消息的发送主要核心依赖于两个接口:
ISendEndpoint
:提供了Send
方法,用于发送命令 。IPublishEndpoint
:提供了Publish
方法,用于发布事件 。
IBus
、ISendEndpointProvider
和ConsumeContext
进行命令的发送;通过IBus
和IPublishEndpointProvider
进行事件的发布 。具体举例如下:发送命令
- 通过
IBus
发送:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//通过以下方式配置对应消息类型的目标地址EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));await _bus.Send(request);}
- 通过
ISendEndpointProvider
发送:
private readonly ISendEndpointProvider_sendEndpointProvider;public async Task Post(CreateOrderRequest request){var serviceAddress = new Uri("queue:create-order");var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);await endpoint.Send(request);}
- 通过
ConsumeContext
发送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){//do something elsevar destinationAddress = new Uri("queue:lock-stock");var command = new LockStockRequest(context.Message.OrderId);await context.Send<LockStockRequest>(destinationAddress, command);// 也可以通过获取`SendEndpoint`发送命令// var endpoint = await context.GetSendEndpoint(destinationAddress);// await endpoint.Send<LockStockRequest>(command);}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 4 .NET 6学习笔记——如何在.NET 6的Desktop App中使用Windows Runtime API
- 学习ASP.NET Core Blazor编程系列八——数据校验
- 【.NET 6】RabbitMQ延迟消费指南
- 简读《ASP.NET Core技术内幕与项目实战》之3:配置
- .net core-利用PdfSharpCore和SkiaSharp.QrCode 添加PDF二维码页眉
- 云原生分布式 PostgreSQL+Citus 集群在 Sentry 后端的实践
- .net core -利用 BsonDocumentProjectionDefinition 和Lookup 进行 join 关联 MongoDB 查询
- 微服务系列之分布式日志 ELK
- .net lambda表达式合并
- .NET Core C#系列之XiaoFeng.Threading.JobScheduler作业调度