zoukankan      html  css  js  c++  java
  • 消息队列--RabbitMQ(二)

    1.常用的几种队列简介

    RabbitMQ有五种常用的队列,分别是:简单队列、work模式、发布订阅模式、路由模式、主题(Topic)模式。其实发布订阅、路由、主题这三种模式都从属于与routingkey相关的模式,所以从性质上来说可以说是属于同一类。接下来,我们就以简述与代码的形式,来分别解释一下这几种模式。

    2.相关名词解释

       Publish:发布者(消息的生产方)

       Consumer:使用者(消息的消耗方)

       Exchange:交换机(消息的中转站)--后面与routingkey相关的模式会做出相应的解释

       VirtrulHost:虚拟主机(在消息系统内开辟的一个放置或者是寄存队列的一个区域,相当于数据库概念)

       Queue:消息队列(相当于数据库中的表)

       Channel:通道(构建消费者与生产者消息沟通的渠道)

     3.几种队列模式的简述及编码实现

          (1)简单模式:

               所谓简单模式就是由一个生产者、一个消费者,外加一个渠道构建的最为简单的生产消费模式。生产者通过渠道完成消息的推送,消费者接受消息并使用,如此简单。

               eg. Client

                //1.构建一个连接工厂

                var factory = new ConnectionFactory() {

                    HostName="localhost",//主机名

                    Port=5672,   //端口号

                    VirtualHost="/lsh-blog",  //要使用的虚拟主机名

                    UserName="lsh",  //账号

                    Password="lsh"   //密码

                   

                 };

                Console.WriteLine("接下来要发10条消息。。。");

                //2.构建一个连接

                using (var connection = factory.CreateConnection())

                {

                    //3。构建一个通道

                    using (var channel = connection.CreateModel())

                    {

                        channel.QueueDeclare("lko",false,false,false,null);

                        //推送消息

                        string msg = "我来发布一条最简单的消息";

                         //推送消息

                        channel.BasicPublish("",routingKey: "",

                                                 basicProperties: null,

                                                body: Encoding.UTF8.GetBytes(msg));

                    }

                }

                Console.ReadKey();

       Consumer:

                           /*

                 consumer  

                 */

                Console.WriteLine("接收消息中。。。");

                var factory = new ConnectionFactory()

                {

                    HostName = "localhost",

                    Port = 5672,

                    VirtualHost = "/lsh-blog",

                    UserName = "lsh",

                    Password = "lsh",

                };

                using (var connection = factory.CreateConnection())

                using (var channel = connection.CreateModel())

                {

                    channel.QueueDeclare(queue: "lko",

                                            durable: false,

                                            exclusive: false,

                                            autoDelete: true,

                                            arguments: null);

     

                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>

                    {

                        var body = ea.Body;

                        var message = Encoding.UTF8.GetString(body);

                        Console.WriteLine(" [Consumer] Received {0}", message);

                        Thread.Sleep(2000);

                        channel.BasicAck(ea.DeliveryTag, false);

                    };

                    channel.BasicConsume(queue: "lko",

                                         noAck: false,

                                         consumer: consumer);

                    Console.ReadLine();

    (2)Work模式

       由一个生产者、多个消费者以及驱动组成。RabbitMQ队列有一个消息竞争原则,就是同时消费同一条队列的几个消费者,对于队列的消息都是谁先获取就被谁消费,没有重复或者不被消费这一说。因而,该模式只是一种特性,所以就不用代码说明啦。

    (3) 发布订阅模式

       发布订阅就是生产方发布一个入口,使用方就可以接入这个入口,从而做到数据的流通。这样的话,所有的使用方都可以接入这个入口,从而得到数据信息。该模式所需要借助的,便是交换机(Exchange)这个概念,以交换机作为入口,消费使用者进行绑定接入。

    eg. Publish

       //1.构建一个连接工厂

                var factory = new ConnectionFactory() {

                    HostName="localhost",//主机名

                    Port=5672,   //端口号

                    VirtualHost="/lsh-blog",  //要使用的虚拟主机名

                    UserName="lsh",  //账号

                    Password="lsh"   //密码

                   

                 };

                Console.WriteLine("接下来要发10条消息。。。");

                //2.构建一个连接

                using (var connection = factory.CreateConnection())

                {

                    //3。构建一个通道

                    using (var channel = connection.CreateModel())

                    {

     

                        //定义交换机(fanout为交换机类型)

                        channel.ExchangeDeclare("search_direct", "fanout");

                        //推送消息

                        string msg = "我来发布一条最简单的消息";

                         //推送消息

                        channel.BasicPublish("search_direct", routingKey: "",

                                                 basicProperties: null,

                                                body: Encoding.UTF8.GetBytes(msg));

                       

                    }

                }

                Console.ReadKey();

      

      Consumer:

         /*

                 consumer  

                 */

                Console.WriteLine("接收消息中。。。");

                var factory = new ConnectionFactory()

                {

                    HostName = "localhost",

                    Port = 5672,

                    VirtualHost = "/lsh-blog",

                    UserName = "lsh",

                    Password = "lsh",

                };

                using (var connection = factory.CreateConnection())

                using (var channel = connection.CreateModel())

                {

                    channel.QueueDeclare(queue: "lko",

                                            durable: false,

                                            exclusive: false,

                                            autoDelete: true,

                                            arguments: null);

                    //将队列lko与交换机search_direct进行绑定接入

                    channel.QueueBind("lko", "search_direct","");

                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>

                    {

                        var body = ea.Body;

                        var message = Encoding.UTF8.GetString(body);

                        Console.WriteLine(" [Consumer] Received {0}", message);

                        Thread.Sleep(2000);

                        channel.BasicAck(ea.DeliveryTag, false);

                    };

                    channel.BasicConsume(queue: "lko",

                                         noAck: false,

                                         consumer: consumer);

                    Console.ReadLine();

    (4) 路由模式

         路由模式就是以交换机为开发接口,rotingkey作为唯一的过滤筛选标识来实现消息 的发布与使用。这样的话,消费者就可以只消费绑定当前交换机发过来的某一个或某一些rotingkey的消息进行选择性处理啦。发布方,我们只需在上面说过的发布订阅模式的交换及类型fanout改为direct(即:channel.ExchangeDeclare("search_direct", "fanout"), channel.BasicPublish("search_direct", routingKey: "",basicProperties: null,body: Encoding.UTF8.GetBytes(msg));),消费使用方绑定自己队列想要接收的某一些routngkey的消息即可(即: channel.QueueBind("lko", "search_direct","item.del");channel.QueueBind("lko", "search_direct","item.add");…).

       (5)Topic话题模式

          可以说是带有模糊匹配消费性质的路由模式,也就是说,发布方的交换机类型变成topic,其余的可以保持原封不变,而接收方可以消费一个消费区间或者也可以说是符合某条件的routingkey集合的进行消费使用(即:消费方channel.QueueBind("lko", "search_direct","item.#"),来匹配所有以item开头的所有roukingkey的消息进行消费使用)

    4.总结

       至此,RabbitMQ常用的几种消费队列模式就简单的介绍完啦,可能有不是很完善的介绍说明,还请诸位多多谅解,后期会不断的完善。下一篇,我将谈一下我所了解的RabbitMQ的使用场景及基于C#的RabbitMQ.dll客户端相关封装的一款组件—EasyNetMQ进行简单的使用分析,欢迎各位大佬指点批评~

    力争写最通俗易懂的文章,不添加任何防腐剂~~~
  • 相关阅读:
    web.xml中openEntityManagerInViewFilter的作用(转)
    JNDI解读(转)
    Java读取大文件的高效率实现
    快速入门react
    谈一谈我所了解的https
    漫谈JWT
    Java 中的几种线程池这么用才是对的
    用Vue来实现图片上传多种方式
    一个页面从输入URL到页面加载显示完成,这个过程都发生什么?
    “===”与“==”的区别
  • 原文地址:https://www.cnblogs.com/diligent-lsh/p/9741625.html
Copyright © 2011-2022 走看看