zoukankan      html  css  js  c++  java
  • 基于Masstransit实现Eventbus的功能

    Masstransit 是一个非常优秀的基于消息进行通信的分布式应用程序框架,详情参考官网

    在介绍AA.ServiceBus开源地址:https://github.com/ChengLab/AAFrameWork  之前,先介绍下几个概念.

    分布式

    分布式系统如何定义?这里引用一下Distributed Systems Concepts and Design(Third Edition)中的一句话:"A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messages"(分布式系统是指位于联网计算机上的组件仅通过传递消息来通信和协调其操作的系统)。从这句话里面我们可以看到几个重点:
    1、组件分布在网络计算机上
    2、组件之间仅仅通过消息传递来通信并协调行动
    严格讲,同一个分布式系统中的计算机在空间部署上是可以随意分布的,这些计算机可能被放在不同的机柜上,也可能在不同的机房中,甚至分布在不同的城市。

     中间件

    中间件是介于操作系统和在其上运行的应用程序之间的软件。中间件实质上充当隐藏转换层,实现了分布式应用程序的通信和数据管理。它有时被称为管道,因为它将两个应用程序连接在一起,使数据和数据库可在“管道”间轻松传递。参考Azure

    常见的中间件比如:远程过程调用中间件,消息中间件,数据库访问中间。 

    消息中间件

    Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems. 

    面向消息的中间件(MOM)是支持在分布式系统之间发送和接收消息的软件或硬件基础设施 

    AA.ServiceBus 介绍

    AA.ServiceBus 是基于MassTransit的消息中间件,提供点对点和发布订阅的通信方式。这两个之间的区别:

    •   端点对端点通信 该消息仅处理一次 并且被一个消费者处理。

     例如命名模式 命令告诉服务做某事,推荐动词-名词顺序的命名风格:如提交订单命令(SubmitOrder)

    •  发布订阅通信 可以被多个订阅者进行消费处理。

    例如事件驱动模式 事件意味着某事已经发生了,推荐以名词-动词(过去时态)顺序的命名风格,表明发生了某事。示例订单提交过了事件 OrderSubmitted

     目前实现消息中间件有多种方式,参考微服务.NET:容器化应用架构指南 如图

    AA.ServiceBus 快速开始 

       实例我们创建两个控制台程序生产者、消费者分别命名ServiceBus.ProducersServiceBus.Consumers,然后在创建一个消息契约类库命名为ServiceBus.MsgContract ,分别被生产者和消费者引用。     

    1.在消息契约类库中创建两个消息 分别是 提交订单 SubmitOrder 和 订单已提交OrderSubmitted代码如下

     public interface OrderSubmitted
        {
            long Id { get; set; }
            decimal OrderPrice { get; set; }
        }
    
       public interface SubmitOrder
        {
            long Id { get; set; }
            Decimal OrderPrice { get; set; }
        }

    2.在生产者控制台项目中安装Install-Package AA.ServiceBus -Version 1.0.0,生产者主要对消息的构造然后进行发送或发布;

    public class Producer
        {
            public static void TestProducer()
            {
                //rabbitmq 配置
                string rabbitMqUri = "rabbitmq://localhost:5672";
                string rabbitMqUserName = "your";
                string rabbitMqPassword = "your";
    
    
                PulishEvent(rabbitMqUri, rabbitMqUserName, rabbitMqPassword);
                SendCommand(rabbitMqUri, rabbitMqUserName, rabbitMqPassword);
            }
    
            /// <summary>
            /// 发布事件
            /// </summary>
            /// <param name="rabbitMqUri"></param>
            /// <param name="rabbitMqUserName"></param>
            /// <param name="rabbitMqPassword"></param>
            private static void PulishEvent(string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
            {
                IBusControl busControl = ServiceBusManager.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
                             .BuildEventProducer();
    
                TaskUtil.Await(busControl.Publish<OrderSubmitted>(new
                {
                    Id = 1010,
                    OrderPrice = 1024
                }));
            }
            /// <summary>
            /// 发送命令
            /// </summary>
            /// <param name="rabbitMqUri"></param>
            /// <param name="rabbitMqUserName"></param>
            /// <param name="rabbitMqPassword"></param>
            private static void SendCommand(string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
            {
                string queueName = "submitorder.queue";
    
                ISendEndpoint busControl = ServiceBusManager.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
                             .BuildCommandProducer(queueName);
    
                TaskUtil.Await(busControl.Send<SubmitOrder>(new
                {
                    Id = 1010,
                    OrderPrice=1024
                }));
            }
        }

         

     3.在消费者控制台项目中安装Install-Package AA.ServiceBus -Version 1.0.0,生产者需要创建对应的消费者进行处理消息,只需要继承IConsumer接口即可

     

    public class Consumer
        {
            public static void TestConsumer()
            {
                //rabbitmq 配置
                string rabbitMqUri = "rabbitmq://localhost:5672";
                string rabbitMqUserName = "your";
                string rabbitMqPassword = "your";
                string queueName = "submitorder.queue";
    
                var busControl = ServiceBusManager.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
                 .RegisterConsumer<SubmitOrderCommandConsumer>(queueName)//注册提交订单命令消费者
                 .RegisterConsumer<OrderSubmittedEventConsumer>(null)   //注册订单已创建事件消费者
                 .Build();
                busControl.Start();
            }
        }
        /// <summary>
        ///订单已经提交了  事件消费者
        /// </summary>
        public class OrderSubmittedEventConsumer : IConsumer<OrderSubmitted>
        {
            public async Task Consume(ConsumeContext<OrderSubmitted> context)
            {
                var @event = context.Message;
                Console.WriteLine($"接收到订单创建了事件消息单价:{@event.OrderPrice}");
                //do somethings...
            }
        }
    
        /// <summary>
        /// 提交订单 命令消费者
        /// </summary>
        public class SubmitOrderCommandConsumer : IConsumer<SubmitOrder>
        {
            public async Task Consume(ConsumeContext<SubmitOrder> context)
            {
                var command = context.Message;
                Console.WriteLine($"接收到了创建订单命令消息单价:{command.OrderPrice}");
                //do somethings...
            }
        }

    运行消费者和生产者控制台 输出如下:

     

     

  • 相关阅读:
    ReentrantLock与synchronized的差别
    读TIJ -1 对象入门
    wikioi 2573 大顶堆与小顶堆并用
    开源 免费 java CMS
    UVA10972
    springboot5
    spring-boot4
    spring-boot3
    spring-boot2
    spring-boot1
  • 原文地址:https://www.cnblogs.com/chengtian/p/11239216.html
Copyright © 2011-2022 走看看