MassTransit | .NET 分布式应用框架( 二 )

  • Consumer:消费者,用于消息消费 。

  • MassTransit | .NET 分布式应用框架

    文章插图
    从上图可知 , 本质上还是发布订阅模式的实现 , 接下来就核心概念进行详解 。
    MessageMessage:消息,可以使用class、interface、struct和record来创建 , 消息作为一个契约,需确保创建后不能篡改,因此应只保留只读属性且不应包含方法和行为 。MassTransit使用的是包含命名空间的完全限定名即typeof(T).FullName来表示特定的消息类型 。因此若在另外的项目中消费同名的消息类型,需确保消息的命名空间相同 。另外需注意消息不应继承,以避免发送基类消息类型造成的不可预期的结果 。为避免此类情况 , 官方建议使用接口来定义消息 。在MassTransit中,消息主要分为两种类型:
    1. Command:命令,用于告诉服务做什么,命令被发送到指定端点 , 仅被一个服务接收并执行 。一般以动名词结构命名,如:UpdateAddress、CancelOrder 。
    2. Event:事件,用于告诉服务什么发生了,事件被发布到多个端点,可以被多个服务消费 。一般以过去式结构命名 , 如:AddressUpdated,OrderCanceled 。
    经过MassTransit发送的消息,会使用信封包装,包含一些附加信息,数据结构举例如下:
    {"messageId": "6c600000-873b-00ff-9a8f-08da8da85542","requestId": null,"correlationId": null,"conversationId": "6c600000-873b-00ff-9526-08da8da85544","initiatorId": null,"sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true","destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent","responseAddress": null,"faultAddress": null,"messageType": ["urn:message:MassTransit.Demo:OrderCreatedEvent"],"message": {"orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"},"expirationTime": null,"sentTime": "2022-09-03T12:32:15.0796943Z","headers": {},"host": {"machineName": "THINKPAD","processName": "MassTransit.Demo","processId": 24684,"assembly": "MassTransit.Demo","assemblyVersion": "1.0.0.0","frameworkVersion": "6.0.5","massTransitVersion": "8.0.6.0","operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"}}从以上消息实例中可以看出一个包装后的消息包含以下核心属性:
    1. messageId:全局唯一的消息ID
    2. messageType:消息类型
    3. message:消息体,也就是具体的消息实例
    4. sourceAddress:消息来源地址
    5. destinationAddress:消息目标地址
    6. responseAddress:响应地址,在请求响应模式中使用
    7. faultAddress:消息异常发送地址 , 用于存储异常消费消息
    8. headers:消息头,允许应用自定义扩展信息
    9. correlationId:关联Id , 在Saga状态机中会用到 , 用来关联系列事件
    10. host:宿主,消息来源应用的宿主信息
    ProducerProducer,生产者,即用于生产消息 。在MassTransit主要借助以下对象进行命令的发送和事件的发布 。
    MassTransit | .NET 分布式应用框架

    文章插图
    从以上类图可以看出,消息的发送主要核心依赖于两个接口:
    1. ISendEndpoint:提供了Send方法,用于发送命令 。
    2. IPublishEndpoint:提供了Publish方法,用于发布事件 。
    但基于上图的继承体系,可以看出通过IBusISendEndpointProviderConsumeContext进行命令的发送;通过IBusIPublishEndpointProvider进行事件的发布 。具体举例如下:
    发送命令
    1. 通过IBus发送:
    private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//通过以下方式配置对应消息类型的目标地址EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));await _bus.Send(request);}
    1. 通过ISendEndpointProvider发送:
    private readonly ISendEndpointProvider_sendEndpointProvider;public async Task Post(CreateOrderRequest request){var serviceAddress = new Uri("queue:create-order");var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);await endpoint.Send(request);}
    1. 通过ConsumeContext发送:
    public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){//do something elsevar destinationAddress = new Uri("queue:lock-stock");var command = new LockStockRequest(context.Message.OrderId);await context.Send<LockStockRequest>(destinationAddress, command);// 也可以通过获取`SendEndpoint`发送命令// var endpoint = await context.GetSendEndpoint(destinationAddress);// await endpoint.Send<LockStockRequest>(command);}}

    推荐阅读