zoukankan      html  css  js  c++  java
  • .net 环境使用 RabbitMQ ,由浅入深 【一】

    最近因为先开发的项目需要用到消息队列,因此捣鼓了一下市面上开源的消息队列。

    原本听闻Rocketmq ,一开始用的是 RocketMQ,各种集群搭建完毕,消息发送什么的测试后,,但是结果因为 RocketMQ没有官方支持的.net客户端(第三方的不太好用,比如不支持轨迹),并且我发现消息好像还不能单条确认?

    后来又捣鼓RabbitMQ,这个RabbitMq有官方.net 客户端,光这一条我就选了RabbitMQ(我的渣渣系统对性能还没有这么高的要求,只要是社区活跃度可以的我相信都够支撑我的系统,好用才是最关键)

    这篇文章会写一个RabbitMQ的小系列,从最简单的上手到后面结合实际业务做东西。

    先描述一下我消息队列的使用场景,我系统需要监听当订单下达后,商品库存减少的事件后,做出一定事情。比如:

    当商品库存发生变动,通知到各个消费者,包括但不限于以下消费者:

    1、当库存变动后,根据情况调价,根据当前剩余库存和销售率,对商品价格做出一定调整。

    2、当库存变动后,调整该商品对应的第三方仓储的库存数量。

    这是典型一个发布与订阅模式,大概就是2个消费者订阅同一种消息,消费者之间的业务互相不影响,只要事件发生后,通知到消费者即可。这两个步骤如果放到订单下单的流程同步做,估计要被老板打死吧

    现在,用RabbitMQ来实现发布一条消息,2个消费者收到。

    因为这篇文章是由浅入深,所以,这篇主要是发出消息,2个消费者收到消息,不说任何关于MQ的概念,那些什么交换机,什么路由键,都不解释,解释多了,反倒容易吓退新手,我现在也是新手,当时我从RocketMQ转RabbitMQ被名词绕晕,诚然理解各种概念非常重要,但是我想让使用者先看到效果,然后由效果返回去理解概念,可能会更容易接受。因此我会尽量用最少的代码发出一条消息并且被订阅消费。

    第一步是搭建环境,这个百度一搜一大堆,不是我这篇文章的重点,可以参考这篇文章 https://www.cnblogs.com/zhuwenjoyce/p/13236839.html ,非常简单。

    如果按上面文章搞完了以后,访问 http://127.0.0.1:15672/ 将会出现一个登录页面,用户名和密码都是 guest。登录后如图,这个我先称为 消息控制台

     接下来,需要对RabbitMQ做一定配置(网上基本上所有文章都是代码配置交换机、队列(先别管这个名词),我觉得会扰乱新手,给新手造成一定的困境),所以我会先让大家用消息控制台来配置。

    比如下图是我截的其他教程的文章。相信很多人看了会疑惑,一下子bind一下daclare的,新手看了又要开始各种百度这个代码是干嘛的了。所以我不用代码配置这些,尽量用两三行代码就能收消息

    以下步骤,有不明白的名词先别去想,先跟着做,能发消息能收消息再来思考,后面的文章会详细介绍概念,这篇文章的重点是先看到效果,后面再返回去思考。

    第一步:到消息控制台配置队列,点击消息控制台的 Queues ,然后 Add a new queue 有输入框,在name处输入 stockChange.priceSet,其他保持默认,左下角点击“Add queue”

    然后再继续添加一个   stockChange.stockUpdate ,这两个队列,代表2个消费者,一个消费者负责库存变动后改价,一个消费者负责库存变动后修改第三方仓储的库存。

    第二步:到消息控制台配置交换机,点击消息控制台上的 "Exchange"标签 可以看到列表下面有一个 Add a new exchange,在 name处输入 bookErp.stockChange.franout ,type 改为 fanout

    其他保持默认,点击左下角的 Add exchange 按钮,这个时候页面上应该出现和我类似的列表了,确保有我画红色框的那个(忽略我列表上其他的,因为写这个文章的时候我懒得去删除多余的了)

     

     第三步:绑定交换机与队列的关系。点击刚刚列表上的  bookErp.stockChange.franout ,出现另外一个界面 ,在 Add bingding from this exchange 中的 To Queue 输入框中输入:

    stockChange.priceSet,然后点击 "Bind" 按钮,添加完了一条绑定记录,然后继续添加一条为 stockChange.stockUpdate 的记录,添加完了以后你将看到如下:

    以上便完成了 “库存变更” 这个事件对应 2个消费者的使用场景。现在写代码,写2个消费者代码。因为现在不涉及业务逻辑,就打印消息而已,我就不写2份代码,写一份,启动2次,以不同字符串来区分是两个不同消费者

    打开vs2019,新建一个 .net framework 的控制台程序。打开nuget搜 RabbitMQ.client ,安装第一个。安装完毕后

    在 Program.cs 的类中加入如下两个方法

         /// <summary>
            /// 连接配置
            /// </summary>
            private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest",
                Port = 5672,
                VirtualHost = "/"
            };
    

      

      

      /// <summary>
            /// 基于事件的,当消息到达时触发事件,获取数据
            /// </summary>
            public static void DirectAcceptExchangeEvent(string queueName)
            {
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        Console.WriteLine("开始监听队列:" + queueName);
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var msgBody = Encoding.UTF8.GetString(ea.Body.ToArray());
                            var msgProps = ea.BasicProperties;
                            Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
    
                            //deliveryTag 传递标签,ulong 类型.它的范围隶属于每个信道.因此必须在收到消息的相同信道上确认.不同的信道将导致“未知的传递标签”协议异常并关闭通道.
                            //multiple 确认一条消息还是多条.false 表示只确认 e.DelivertTag 这条消息,true表示确认 小于等于 e.DelivertTag 的所有消息 
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    
                        };
                        channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
                        Console.WriteLine("按任意值,退出程序");
                        Console.ReadKey();
                    }
                }
            }
    

      

     接下来在Main方法中执行如下代码 

          static void Main(string[] args)
            {
                Console.WriteLine("请输入要监听的队列名");
                string queueName = Console.ReadLine();
    
                DirectAcceptExchangeEvent(queueName);
    
                Console.ReadLine();
            }
    

      

    编译一下,去找到 debug目录下的 exe 文件运行,提示你输入队列名字 stockChange.priceSet ,按回车。然后别动,再运行一个 exe ,输入 stockChange.stockUpdate,按回车。

    这个时候,你作为2个不同消费者,监听了库存变更这个事件,可以执行不同的业务逻辑。

    现在去我们web控制台发消息,就是刚刚你配置MQ的地方。打开exchange,出现列表,点击 bookErp.stockChange.franout ,页面刷新后,在 Publish message 下面的 Payload 输入框里面输入 123 ,点击“Publish message” 按钮,这个时候去看你刚刚运行的控制台,将发现消息被打印在控制台了。如下图:

    如果放到现实场景,你可能就是post一个json字符串,里面包含商品的skuId、当前库存等信息,然后消费者拿到后自己去做自己的事

    以上就实现了一个事件,2个消费者一起监听,自己各自做自己的业务逻辑。

    实际使用的话就是把发消息从web控制台上改为代码发消息即可了。

    相比网上其他教程,我将RabbitMQ配置交换机、消息队列、路由键等操作放在了web控制台,代码只做最简单的监听而不配置MQ,便于大家理解。

    实际使用也是可以先手动配置好,代码只监听和发消息。

    下一篇,我开始结合这一篇涉及的名词,来解释它到底是怎么运行的。

  • 相关阅读:
    怎样在caffe中添加layer以及caffe中triplet loss layer的实现
    【java】为数组全部元素赋同样的值 以及 数组之间的复制
    POJ 2386 Lake Counting 搜索题解
    android中实现内容搜索
    JNI之——在cmd命令行下编译执行C/C++源文件
    怎样编写高效android代码
    http headers
    matlab 工具函数、matlab toolbox(工具箱)
    数列收敛与数列极限
    数列收敛与数列极限
  • 原文地址:https://www.cnblogs.com/Jerseyblog/p/14129751.html
Copyright © 2011-2022 走看看