zoukankan      html  css  js  c++  java
  • MassTransit中Request&Response基本使用

    MassTransit 是一个自由、开源、轻量级的消息总线基于.Net框架, 用于创建分布式应用程序。方便搭建基于消息的松耦合异步通信的应用程序和服务。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。

    官网地址:http://masstransit-project.com/

    发布订阅模式

    这种场景十分常见,发送一个消息(或事件)到消息队列中,有一个或是多个订阅方对预期的消息接收处理。

    图片

    基于需要搭建了两个WebApi程序,用于模拟发送方和订阅方,其中的RabbitMQ已预先搭建好了,只在程序中引用包配置下即可。

    <PackageReference Include="MassTransit" Version="7.2.0" />
    <PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" />
    <PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />
    

    发布端配置

    在Startup中增加MassTransit需要的服务及初始化配置。

    • 对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。
    • 对发送方需要发送的消息初始化一个请求客户端,配置请求信息及推送到MQ的地址。
    services.AddMassTransit(x =>
    {
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
            {
                h.Username(Configuration["RabbitmqConfig:Username"]);
                h.Password(Configuration["RabbitmqConfig:Password"]);
            });
        });
        x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered")));
    });
    services.AddMassTransitHostedService();
    

    为了快速了解,使用Controller在Action中发起对MQ的消息推送

    [ApiController]
    [Route("[controller]")]
    public class ValueController : ControllerBase
    {
        readonly IPublishEndpoint _publishEndpoint;
        public ValueController(IPublishEndpoint publishEndpoint)
        {
            _publishEndpoint = publishEndpoint;
        }
        [HttpPost]
        public async Task<ActionResult> Post(string value)
        {
            await _publishEndpoint.Publish<ValueEntered>(new
            {
                Value = value
            });
            return Ok();
        }
    }
    

    订阅端配置

    订阅端也创建一个WebApi应用,在Startup中增加MassTransit的服务,使用到的Nuget包和发布端一样。

    • 对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。
    • 为订阅端增加一个订阅处理的Handler,即如下的ValueEnteredEventConsumer
    • 增加一个接受点,指定队列名称,即发送端发送的队列名称,设置该队列消费处理的Consumer,即ValueEnteredEventConsumer
    services.AddMassTransit(x =>
    {
        x.AddConsumer<ValueEnteredEventConsumer>();
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
            {
                h.Username(Configuration["RabbitmqConfig:Username"]);
                h.Password(Configuration["RabbitmqConfig:Password"]);
            });
            cfg.ReceiveEndpoint("events-valueentered", e =>
            {
                e.ConfigureConsumer<ValueEnteredEventConsumer>(context);
            });
        });
    });
    services.AddMassTransitHostedService();
    

    如此一来,通过Postman发送一个请求,经发布端发布一个消息到RabbitMQ,订阅端侦听消息,处理消息,一切都很熟悉。
    图片

    请求响应模式

    在发布订阅的基础上,改变以往的习惯,当发布一个消息后,等待订阅方的处理,并将消息推送回RabbitMQ,发送方接受到处理后的消息继续执行。

    图片

    请求端

    在Startup中新加上一个用于发送消息(CheckOrderStatus)的请求客户端及指定消息队列名称(为每一个消息创建一个单独的队列)。

    x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));
    

    增加一个Controller及Action,来请求及获取处理结果(OrderStatusResult)。

    [ApiController]
    [Route("[controller]")]
    public class OrderController : ControllerBase
    {
        private readonly IRequestClient<CheckOrderStatus> _client;
        public OrderController(IRequestClient<CheckOrderStatus> client)
        {
            _client = client;
        }
        public async Task<OrderStatusResult> Get(string id)
        {
            var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });
            return response.Message;
        }
    }
    

    响应端

    同样在响应端Startup中对新的消息设置下消息侦听队列以及相应的Handler如下的ValueEnteredEventConsumer去消费消息并返回处理结果。

    x.AddConsumer<CheckOrderStatusConsumer >();
    x.UsingRabbitMq((context, cfg) =>
    {
        // ...
        cfg.ReceiveEndpoint("events-checkorderstatus", e =>
        {
        e.ConfigureConsumer<CheckOrderStatusConsumer >(context);
        });
    });
    

    Consumer中获取请求参数,执行请求,返回执行结果。

    public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
    {
        public async Task Consume(ConsumeContext<CheckOrderStatus> context)
        {
            if (context.Message.OrderId == "9527")
            {
                throw new InvalidOperationException("Order not found");
            }
            Console.WriteLine($"OrderId:{context.Message.OrderId}");
            await context.RespondAsync<OrderStatusResult>(new
            {
                OrderId = context.Message.OrderId,
                Timestamp = Guid.NewGuid().ToString(),
                StatusCode = "1",
                StatusText = "Close"
            });
        }
    }
    

    这样一来,当请求端发起一个消息(事件)到RabbitMQ,响应端侦听并处理完毕返回处理结果到RabbitMQ,请求端依照响应结果继续执行后续请求。
    图片

    HTTP方式差异

    与以往的Http请求方式有所不同,通过httpClient.PostAsync发送请求,接收端处理并返回结果,而走requestClient发送请求到RabbitMQ,再由RabbitMQ推送到侦听节点消费并返回结果,如下第一二部分结构。

    图片

    2021-06-29,望技术有成后能回来看见自己的脚步

  • 相关阅读:
    在linux下的使用复制命令cp,不让出现“overwrite”(文件覆盖)提示的方法。【转】
    Java 学习 day05
    Java 学习 day04
    Java 学习 day03
    Java 学习 day02
    Java 学习 day01
    学习TensorFlow,TensorBoard可视化网络结构和参数
    自编码器及相关变种算法简介
    自编码器(autoencoder)
    卷积神经网络
  • 原文地址:https://www.cnblogs.com/CKExp/p/14952378.html
Copyright © 2011-2022 走看看