发布事件
- 通过
IBus
发布:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//do somethingawait _bus.Send(request);}
- 通过
IPublishEndpoint
发布:
private readonly IPublishEndpoint _publishEndpoint;public async Task Post(CreateOrderRequest request){//do somethingvar order = CreateOrder(request);await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}
- 通过
ConsumeContext
发布:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){、var order = CreateOrder(conext.Message);await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}}
ConsumerConsumer,消费者 , 即用于消费消息 。MassTransit 包括多种消费者类型,主要分为无状态和有状态两种消费者类型 。无状态消费者无状态消费者 , 即消费者无状态,消息消费完毕,消费者就释放 。主要的消费者类型有:
IConsumer<TMessage>
、JobConsumer
、IActivity
和RoutingSlip
等 。其中IConsumer<TMessage>
已经在上面的快速体验
部分举例说明 。而JobConsumer<TMessage>
主要是对IConsumer<TMessage>
的补充,其主要应用场景在于执行耗时任务 。而对于IActivity
和RoutingSlip
则是MassTransit Courier
的核心对象 , 主要用于实现Saga模式的分布式事务 。MassTransit Courier 实现了Routing Slip模式,通过按需有序组合一系列的Activity,得到一个用来限定消息处理顺序的Routing Slip 。而每个Activity的具体抽象就是IActivity
和IExecuteActivity
。二者的差别在于IActivity
定义了Execute
和Compensate
两个方法,而IExecuteActivitiy
仅定义了Execute
方法 。其中Execute
代表正向操作 , Compensate
代表反向补偿操作 。用一个简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示 。而对于具体实现,可参阅文章:AspNetCore&MassTransit Courier实现分布式事务
文章插图
有状态消费者有状态消费者,即消费者有状态,其状态会持久化,代表的消费者类型为
MassTransitStateMachine
。MassTransitStateMachine
是MassTransit Automatonymous
库定义的,Automatonymous
是一个.NET 状态机库,用于定义状态机,包括状态、事件和行为 。MassTransitStateMachine
就是状态机的具体抽象,可以用其编排一系列事件来实现状态的流转 , 也可以用来实现Saga模式的分布式事务 。并支持与EF Core和Dapper集成将状态持久化到关系型数据库,也支持将状态持久化到MongoDB、Redis等数据库 。MassTransitStateMachine
对于Saga模式分布式事务的实现方式与RoutingSlip
不同 , 还是以简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示 。基于MassTransitStateMachine
实现分布式事务详参后续文章 。
文章插图
从上图可知 , 通过
MassTransitStateMachine
可以将事件的执行顺序逻辑编排在一个集中的状态机中,通过发送命令和订阅事件来推动状态流转,而这也正是Saga编排模式的实现 。应用场景了解完MassTransit的核心概念,接下来再来看下MassTransit的核心特性以及应用场景:
- 基于消息的请求响应模式:可用于同步通信
- Mediator模式:中间者模式的实现,类似MediatR , 但功能更完善
- 计划任务:可用于执行定时任务
- Routing Slip 模式:可用于实现Saga模式的分布式事务
- Saga 状态机:可用于实现Saga模式的分布式事务
- 本地消息表:类似DotNetCore.Cap,用于实现最终一致性
推荐阅读
- 4 .NET 6学习笔记——如何在.NET 6的Desktop App中使用Windows Runtime API
- 学习ASP.NET Core Blazor编程系列八——数据校验
- 【.NET 6】RabbitMQ延迟消费指南
- 简读《ASP.NET Core技术内幕与项目实战》之3:配置
- .net core-利用PdfSharpCore和SkiaSharp.QrCode 添加PDF二维码页眉
- 云原生分布式 PostgreSQL+Citus 集群在 Sentry 后端的实践
- .net core -利用 BsonDocumentProjectionDefinition 和Lookup 进行 join 关联 MongoDB 查询
- 微服务系列之分布式日志 ELK
- .net lambda表达式合并
- .NET Core C#系列之XiaoFeng.Threading.JobScheduler作业调度