zoukankan      html  css  js  c++  java
  • .NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)--学习笔记

    2.6.7 RabbitMQ -- Masstransit 详解

    • Consumer 消费者
    • Producer 生产者
    • Request-Response 请求-响应

    Consumer 消费者

    在 MassTransit 中,一个消费者可以消费一种或多种消息

    消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers

    • Consumer
    • Instance
    • Handler
    • Others

    Consumer

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.ReceiveEndpoint("order-service", e =>
                {
                    e.Consumer<SubmitOrderConsumer>();
                });
            });
        }
    }
    

    继承 IConsumer,实现 Consume 方法

    class SubmitOrderConsumer :
        IConsumer<SubmitOrder>
    {
        public async Task Consume(ConsumeContext<SubmitOrder> context)
        {
            await context.Publish<OrderSubmitted>(new
            {
                context.Message.OrderId
            });
        }
    }
    

    三个原则:

    • 拥抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
    • Consume 方法是一个被等待的方法,在执行中时其他消费者无法接收到这个消息,当这个方法完成的时候,消息被 ack,并且从队列中移除
    • Task 方法异常会导致消息触发 retry,如果没有配置重试,消息将被投递到失败队列

    Instance

    public class Program
    {
        public static async Task Main()
        {
            var submitOrderConsumer = new SubmitOrderConsumer();
    
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.ReceiveEndpoint("order-service", e =>
                {
                    e.Instance(submitOrderConsumer);
                });
            });
        }
    }
    

    所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)

    Consumer 每次接收到消息都会 new 一个实例

    Handler

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.ReceiveEndpoint("order-service", e =>
                {
                    e.Handler<SubmitOrder>(async context =>
                    {
                        await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
                    });
                });
            });
        }
    }
    

    通过一个委托 Lambda 方法,来消费消息

    Others

    • Saga<>
    • StateMachineSaga<>

    Producer 生产者

    消息的生产可以通过两种方式产生:发送和发布

    发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者

    基于这两种规则,消息被定义为:命令 command 和事件 event

    • send
    • publish

    send

    可以调用以下对象的 send 方法来发送 command:

    • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
    • ISendEndpointProvider(可以从 DI 中获取)
    • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

    ConsumeContext

    public class SubmitOrderConsumer : 
        IConsumer<SubmitOrder>
    {
        private readonly IOrderSubmitter _orderSubmitter;
    
        public SubmitOrderConsumer(IOrderSubmitter submitter)
            => _orderSubmitter = submitter;
    
        public async Task Consume(IConsumeContext<SubmitOrder> context)
        {
            await _orderSubmitter.Process(context.Message);
    
            await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
        }
    }
    

    ISendEndpointProvider

    public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
    {
        var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
    
        await endpoint.Send(new SubmitOrder { OrderId = "123" });
    }
    

    publish

    • 发送地址
    • 短地址
    • Convention Map

    发送地址

    • rabbitmq://localhost/input-queue
    • rabbitmq://localhost/input-queue?durable=false

    短地址

    • GetSendEndpoint(new Uri("queue:input-queue"))

    Convention Map

    在配置文件中指定 map 规则

    EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));
    

    直接发送

    public class SubmitOrderConsumer : 
        IConsumer<SubmitOrder>
    {
        private readonly IOrderSubmitter _orderSubmitter;
    
        public SubmitOrderConsumer(IOrderSubmitter submitter)
            => _orderSubmitter = submitter;
    
        public async Task Consume(IConsumeContext<SubmitOrder> context)
        {
            await _orderSubmitter.Process(context.Message);
    
            await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
        }
    }
    

    可以调用以下对象的 publish 方法来发送 event:

    • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
    • IPublishEndpoint(可以从 DI 中获取)
    • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

    IPublishEndpoint

    public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
    {
        await publishEndpoint.Publish<OrderSubmitted>(new
        {
            OrderId = "27",
            OrderDate = DateTime.UtcNow,
        });
    }
    

    Request-Response 请求-响应

    Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式

    • Consumer
    • IClientFactory
    • IRequestClient
    • Send a request

    Consumer

    public async Task Consume(ConsumeContext<CheckOrderStatus> context)
    {
        var order = await _orderRepository.Get(context.Message.OrderId);
        if (order == null)
            throw new InvalidOperationException("Order not found");
        
        await context.RespondAsync<OrderStatusResult>(new 
        {
            OrderId = order.Id,
            order.Timestamp,
            order.StatusCode,
            order.StatusText
        });
    }
    

    需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程

    IClientFactory

    public interface IClientFactory 
    {
        IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
    
        IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
    
        RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
    
        RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
    }
    

    通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

    IRequestClient

    public interface IRequestClient<TRequest>
        where TRequest : class
    {
        RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
    
        Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
    }
    

    RequestClient 可以创建请求,或者直接获得响应

    Send a request

    var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
    var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
    
    var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
    

    知识共享许可协议

    本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

    欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。

    如有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。

  • 相关阅读:
    RUST实践.md
    redis.md
    opencvrust.md
    aws rds can't connect to mysql server on 'xx'
    Foundation ActionScript 3.0 With Flash CS3 And Flex
    Foundation Flash Applications for Mobile Devices
    Flash Mobile Developing Android and iOS Applications
    Flash Game Development by Example
    Actionscript 3.0 迁移指南
    在SWT中非UI线程控制界面
  • 原文地址:https://www.cnblogs.com/MingsonZheng/p/14274903.html
Copyright © 2011-2022 走看看