MassTransit | .NET 分布式应用框架( 三 )

发布事件

  1. 通过IBus发布:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//do somethingawait _bus.Send(request);}
  1. 通过IPublishEndpoint发布:
private readonly IPublishEndpoint _publishEndpoint;public async Task Post(CreateOrderRequest request){//do somethingvar order = CreateOrder(request);await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}
  1. 通过ConsumeContext发布:
【MassTransit | .NET 分布式应用框架】public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){、var order = CreateOrder(conext.Message);await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}}ConsumerConsumer,消费者 , 即用于消费消息 。MassTransit 包括多种消费者类型,主要分为无状态和有状态两种消费者类型 。
无状态消费者无状态消费者 , 即消费者无状态,消息消费完毕,消费者就释放 。主要的消费者类型有:IConsumer<TMessage>JobConsumerIActivityRoutingSlip等 。其中IConsumer<TMessage>已经在上面的快速体验部分举例说明 。而JobConsumer<TMessage>主要是对IConsumer<TMessage>的补充,其主要应用场景在于执行耗时任务 。而对于IActivityRoutingSlip则是MassTransit Courier的核心对象 , 主要用于实现Saga模式的分布式事务 。MassTransit Courier 实现了Routing Slip模式,通过按需有序组合一系列的Activity,得到一个用来限定消息处理顺序的Routing Slip 。而每个Activity的具体抽象就是IActivityIExecuteActivity 。二者的差别在于IActivity定义了ExecuteCompensate两个方法,而IExecuteActivitiy仅定义了Execute方法 。其中Execute代表正向操作 , Compensate代表反向补偿操作 。用一个简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示 。而对于具体实现,可参阅文章:AspNetCore&MassTransit Courier实现分布式事务
MassTransit | .NET 分布式应用框架

文章插图
有状态消费者有状态消费者,即消费者有状态,其状态会持久化,代表的消费者类型为MassTransitStateMachineMassTransitStateMachineMassTransit Automatonymous 库定义的,Automatonymous 是一个.NET 状态机库,用于定义状态机,包括状态、事件和行为 。MassTransitStateMachine就是状态机的具体抽象,可以用其编排一系列事件来实现状态的流转 , 也可以用来实现Saga模式的分布式事务 。并支持与EF Core和Dapper集成将状态持久化到关系型数据库,也支持将状态持久化到MongoDB、Redis等数据库 。MassTransitStateMachine对于Saga模式分布式事务的实现方式与RoutingSlip不同 , 还是以简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示 。基于MassTransitStateMachine 实现分布式事务详参后续文章 。
MassTransit | .NET 分布式应用框架

文章插图
从上图可知 , 通过MassTransitStateMachine可以将事件的执行顺序逻辑编排在一个集中的状态机中,通过发送命令和订阅事件来推动状态流转,而这也正是Saga编排模式的实现 。
应用场景了解完MassTransit的核心概念,接下来再来看下MassTransit的核心特性以及应用场景:
  1. 基于消息的请求响应模式:可用于同步通信
  2. Mediator模式:中间者模式的实现,类似MediatR , 但功能更完善
  3. 计划任务:可用于执行定时任务
  4. Routing Slip 模式:可用于实现Saga模式的分布式事务
  5. Saga 状态机:可用于实现Saga模式的分布式事务
  6. 本地消息表:类似DotNetCore.Cap,用于实现最终一致性
总体而言,MassTransit是一款优秀的分布式应用框架,可作为分布式应用的消息总线 , 也可以用作单体应用的事件总线 。感兴趣的朋友不妨一观 。

推荐阅读