zoukankan      html  css  js  c++  java
  • .NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)--学习笔记

    2.6.7 RabbitMQ -- Masstransit 详解

    • Consumer 消费者
    • Producer 生产者
    • Request-Response 请求-响应

    Consumer 消费者

    在 MassTransit 中,一个消费者可以消费一种或多种消息

    消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers

    • Consumer
    • Instance
    • Handler
    • Others

    Consumer

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.ReceiveEndpoint("order-service", e =>
                {
                    e.Consumer<SubmitOrderConsumer>();
                });
            });
        }
    }
    

    继承 IConsumer,实现 Consume 方法

    class SubmitOrderConsumer :
        IConsumer<SubmitOrder>
    {
        public async Task Consume(ConsumeContext<SubmitOrder> context)
        {
            await context.Publish<OrderSubmitted>(new
            {
                context.Message.OrderId
            });
        }
    }
    

    三个原则:

    • 拥抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
    • Consume 方法是一个被等待的方法,在执行中时其他消费者无法接收到这个消息,当这个方法完成的时候,消息被 ack,并且从队列中移除
    • Task 方法异常会导致消息触发 retry,如果没有配置重试,消息将被投递到失败队列

    Instance

    public class Program
    {
        public static async Task Main()
        {
            var submitOrderConsumer = new SubmitOrderConsumer();
    
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.ReceiveEndpoint("order-service", e =>
                {
                    e.Instance(submitOrderConsumer);
                });
            });
        }
    }
    

    所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)

    Consumer 每次接收到消息都会 new 一个实例

    Handler

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.ReceiveEndpoint("order-service", e =>
                {
                    e.Handler<SubmitOrder>(async context =>
                    {
                        await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
                    });
                });
            });
        }
    }
    

    通过一个委托 Lambda 方法,来消费消息

    Others

    • Saga<>
    • StateMachineSaga<>

    Producer 生产者

    消息的生产可以通过两种方式产生:发送和发布

    发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者

    基于这两种规则,消息被定义为:命令 command 和事件 event

    • send
    • publish

    send

    可以调用以下对象的 send 方法来发送 command:

    • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
    • ISendEndpointProvider(可以从 DI 中获取)
    • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

    ConsumeContext

    public class SubmitOrderConsumer : 
        IConsumer<SubmitOrder>
    {
        private readonly IOrderSubmitter _orderSubmitter;
    
        public SubmitOrderConsumer(IOrderSubmitter submitter)
            => _orderSubmitter = submitter;
    
        public async Task Consume(IConsumeContext<SubmitOrder> context)
        {
            await _orderSubmitter.Process(context.Message);
    
            await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
        }
    }
    

    ISendEndpointProvider

    public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
    {
        var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
    
        await endpoint.Send(new SubmitOrder { OrderId = "123" });
    }
    

    publish

    • 发送地址
    • 短地址
    • Convention Map

    发送地址

    • rabbitmq://localhost/input-queue
    • rabbitmq://localhost/input-queue?durable=false

    短地址

    • GetSendEndpoint(new Uri("queue:input-queue"))

    Convention Map

    在配置文件中指定 map 规则

    EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));
    

    直接发送

    public class SubmitOrderConsumer : 
        IConsumer<SubmitOrder>
    {
        private readonly IOrderSubmitter _orderSubmitter;
    
        public SubmitOrderConsumer(IOrderSubmitter submitter)
            => _orderSubmitter = submitter;
    
        public async Task Consume(IConsumeContext<SubmitOrder> context)
        {
            await _orderSubmitter.Process(context.Message);
    
            await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
        }
    }
    

    可以调用以下对象的 publish 方法来发送 event:

    • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
    • IPublishEndpoint(可以从 DI 中获取)
    • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

    IPublishEndpoint

    public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
    {
        await publishEndpoint.Publish<OrderSubmitted>(new
        {
            OrderId = "27",
            OrderDate = DateTime.UtcNow,
        });
    }
    

    Request-Response 请求-响应

    Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式

    • Consumer
    • IClientFactory
    • IRequestClient
    • Send a request

    Consumer

    public async Task Consume(ConsumeContext<CheckOrderStatus> context)
    {
        var order = await _orderRepository.Get(context.Message.OrderId);
        if (order == null)
            throw new InvalidOperationException("Order not found");
        
        await context.RespondAsync<OrderStatusResult>(new 
        {
            OrderId = order.Id,
            order.Timestamp,
            order.StatusCode,
            order.StatusText
        });
    }
    

    需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程

    IClientFactory

    public interface IClientFactory 
    {
        IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
    
        IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
    
        RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
    
        RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
    }
    

    通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

    IRequestClient

    public interface IRequestClient<TRequest>
        where TRequest : class
    {
        RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
    
        Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
    }
    

    RequestClient 可以创建请求,或者直接获得响应

    Send a request

    var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
    var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
    
    var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});
    

    知识共享许可协议

    本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

    欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。

    如有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。

  • 相关阅读:
    IE6-9中tbody的innerHTML不能赋值bug
    matchesSelector及低版本IE中对该方法的实现
    JavaScript日期组件的实现
    IE6/7/8中parseInt第一个参数为非法八进制字符串且第二个参数不传时返回值为0
    子程序设计原则
    仅IE6中链接A的href为javascript协议时不能在当前页面跳转
    JavaScript获取图片的原始尺寸
    JavaScript判断图片是否加载完成的三种方式
    Mac OS X 快捷键
    IE6-8中Date不支持toISOString方法
  • 原文地址:https://www.cnblogs.com/MingsonZheng/p/14274903.html
Copyright © 2011-2022 走看看