zoukankan      html  css  js  c++  java
  • RabbitMQ-(1)几种模式

    官网

    https://www.rabbitmq.com/getstarted.html

    RabbitMQ模式有如下几种:

     

     

    目前大家常用且熟知的可能是前五种,即简单模式、Work模式、发布/订阅模式、路由模型、Topic模式(通配符模式)


    1-简单模式

    即消息的生产者将消息发送到队列,消费者消费队列里面的消息

     

    我们新建一个.Net Core控制台项目,引入

    RabbitMQ.Client

    发送方代码,如下

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "192.16.17.17" ,UserName="guest",Password="guest"};
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            for (int i = 1; i <= 10; i++)
            {
                string message = $"{i}-Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
    
                channel.BasicPublish(exchange: "",
                                     routingKey: "hello",
                                     basicProperties: null,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
            
        }
    
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    接收方代码,如下

    public static void Main()
    {
    var factory = new ConnectionFactory() { HostName = "192.16.17.17" ,UserName="guest",Password="guest"};
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
    
    var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Receive1 [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello",
                                 autoAck: true,
                                 consumer: consumer);
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
     

    但是,如果消费者有多个,会是怎样的情况呢,他们会均分消息的数据量,比如有10条消息,消费者1通常会消费第1,3,5,7,9条消息,消费者2会消费2,4,6,8,10条消息,反正是均分,具体消费哪条数据是看谁先消费第一条数据。我们可以编写2个消费者客户端,打印的时候区分下是谁消费的,比如这里在接收消息的地方前面加个Receive1和Receive2来区分,效果如下

     

    那如果消费者1的服务器能力比较强,我们希望它根据自己的消费能力,性能好的消费消息多,性能低的少消费一些,可以认为是能者多劳,那就引出了我们下面的模式,work模式。

    2-Work模式

     

    如何设置能者多劳模式呢,只需要添加如下一行代码

    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

    具体可以在如下地方设置

    channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
    //只需要在各消费方添加如下代码即可    
    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

    以上2种模式,都是1条消息只能被一个消费者消费,另一个消费者就不会收到该条消息

    那如何做到一条消息被多个消费者消费呢,如下的模式可以做到。

     

    3-发布/订阅模式

    该模式就是,一个生产者生产一条消息,发送到交换机Exchange,消费者可以设置各自的队列接受该交换机里面的消息,有多少消费者的队列连接了该Exchange,该Exchange里面的每一条消息都会往各个队列里面分别发送该条相同的消息。

     

    但注意的是,Exchange不会持久化消息,当没有消费者时,存入到Exchange的消息将消失,后面有消费者连接该Exchange时,之前在Exchange中从未消费过的消息是不会收到的。

    发送方,代码如下

    public static void Main(string[] args)
      {
          var factory = new ConnectionFactory() { HostName = "192.16.17.17" ,UserName="guest",Password="guest"};
          using (var connection = factory.CreateConnection())
          using (var channel = connection.CreateModel())
          {
              channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
    
              for (int i = 1; i < 11; i++)
              {
                  var message = $"{i}-Hello World!";
                  var body = Encoding.UTF8.GetBytes(message);
                  channel.BasicPublish(exchange: "logs",
                                       routingKey: "",
                                       basicProperties: null,
                                       body: body);
                  Console.WriteLine(" [x] Sent {0}", message);
              }
              
          }
    
          Console.WriteLine(" Press [enter] to exit.");
          Console.ReadLine();
      }

    消费方,代码如下

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "192.16.17.17" ,UserName="guest",Password="guest"};
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
    
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");
    
            Console.WriteLine(" [*] Waiting for logs.");
    
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("PubSubReceived1 [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
     

    ,这里要注意启动的先后顺序,刚也说了Exchange是不会存储消息的,当你先启动消息的生产者把消息发送到Exchange时,而消费者后启动,在消费者启动之前发送到Exchange里面的消息消费者是接受不到的。

    其实到这里,RabbitMQ的主要模式思路差不多讲完了,因为后面的2种模式跟发布订阅的差不多,只是细化了一下发布订阅的功能,在Exchange和Queue之间新增了“路由”和“路由的通配符”

     

    4-路由模型

     

    5-Topic模式

     

    具体如何实现,我相信大家更希望我是如何知道的,我说完自己的学习渠道后,大家自己去了解以上最后2种模式,

    方法:

    1-官网,访问https://www.rabbitmq.com/getstarted.html

    2-选择各自的语言,如Java或C#,里面就有各语言的实现方式和模式的讲解

    更多分享请关注我的公众号

  • 相关阅读:
    babel初学教程
    手机cs端改变跳转方式
    web.xml 中的listener、 filter、servlet 加载顺序及其详解
    Linux下cp直接覆盖不提示的方法!
    JAVA中用CALENDAR类计算周和周的起始日期(转)
    [android反编译小结]apktool/ AXMLPrinter2.jar/ dex2jar.bat/ jdgui/
    jquery 新手学习常见问题解决方法
    Linux系统中gb2312与utf8相互切换
    xml解析循环参数实例
    java 计算时间差
  • 原文地址:https://www.cnblogs.com/wangoublog/p/15353077.html
Copyright © 2011-2022 走看看