zoukankan      html  css  js  c++  java
  • RabbitMQ(五)——发布订阅模式

    RabbitMQ系列

    RabbitMQ(一)——简介

    RabbitMQ(二)——模式类型

    RabbitMQ(三)——简单模式

    RabbitMQ(四)——工作队列模式

    RabbitMQ(五)——发布订阅模式

    RabbitMQ(六)——路由模式

    RabbitMQ(七)——主题模式

    RabbitMQ(八)——消息确认

    RabbitMQ(九)——消息持久化

    RabbitMQ(十)——消息优先级

    前言

      上一章的工作队列模式中,生产者发布的一堆消息进入队列,消费者接收队列中的消息,每条消息只能发给一个消费者。

      本章要做的是吧一条消息发送给多个消费者,这种模式就是Fanout Exchange(扇形交换机)“发布/订阅模式”,它会将消息路由给绑定到它身上的所有队列。

      注意:该模式定义队列时durable:false没有存储消息功能,如果消息发送到没有绑定消费队列的交换器,消息丢失,也就是说,当消息生产者发送消息时,消费者还没有绑定此交换器,则没有接收到消息,且接收不到了;durable:true只要没被消费就一直存在队列中。

      交换器:

      在RabbitMQ中完整的消息传递并不是生产者直接将消息发送到队列,生产者甚至不不知道消息是否会进入队列。正确的模型是生产者把消息发送给交换器(Exchange),交换器会将接受到的消息转发到队列中,交换器必须确定将消息发送到哪些队列。  

      

      交换器类型(详情):

        • Fanout     
        • Direct
        • Topic
        • Header

      

      fanout类型即发布订阅模式,它会把收到的消息广播到它绑定的队列中。

    channel.ExchangeDeclare("exchange", "fanout");
    

      

      之前我们发布的做法是这样的:

    channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));
    

      

      前面我们并没有声明交换器,之所以还能把消息发送到队列是因为用了“”空的字符串标识了默认或匿名交换器。BasicPublish方法第一个参数就是交换器,第二个参数是routekey,消息通过本交换器进入到routekey队列。

     

      发送消息到指定交换器:

    channel.BasicPublish("exchange", "simple", null, Encoding.UTF8.GetBytes(msg)); 
    

      

      创建好fanout交换器后,定义多个队列,然后将交换器和队列绑定:

    channel.QueueBind("simple", "exchange", "");
    

      

        

    代码

      生产者:

    static void Main(string[] args)
            {
                Console.WriteLine("FanoutServer发布服务器启动...");
    
                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var connection = factory.CreateConnection())
                //3.创建通道
                using (var channel = connection.CreateModel())
                {
                    //4.创建交换器
                    channel.ExchangeDeclare("exchange", "fanout");
    
                    string msg = "";
    
                    for (int i = 0; i < 100; i++)
                    {
                        msg = $"发布消息{i}";
                        var body = Encoding.UTF8.GetBytes(msg);
                        channel.BasicPublish("exchange", "", null, body);
                        Console.WriteLine($"发布成功:{msg}");
                        Thread.Sleep(600);
                    }
                    Console.ReadKey();
                }
            }
    View Code

       消费者:

    static void Main(string[] args)
            {
                Console.WriteLine("FanoutClient接收客户端启动...");
                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var connection = factory.CreateConnection())
                {
                    //3.创建通道
                    using (var channel = connection.CreateModel())
                    {
                        //4.定义交换器
                        channel.ExchangeDeclare("exchange", "fanout");
                        //5.创建匿名队列,绑定交换器
                        //var queueName = channel.QueueDeclare("simple");
                        var queueName = channel.QueueDeclare().QueueName;
                        channel.QueueBind(queueName, "exchange", "");
    
                        //6.创建消费者
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                          {
                              //接收消息
                              var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                              Console.WriteLine($"接收消息:{body.ToString()}");
                          };
                        //7.消费消息
                        channel.BasicConsume(queueName, true, consumer);
    
                        Console.ReadKey();
    
                    }
                }
            }
    View Code

     生产者创建完通道后,创建一个交换器,给它命名“exchange”以及设定“fanout”类型,最后发布消息到此交换器,与前面的模式不同的是不用设定队列参数。

                                                                                                                                                                                                      

    消费者创建完通道后,创建一个交换器,与生产者的交换器一致,定义一个队列(可以指定,或者使用匿名),然后将此队列与交换器绑定,消息会在此队列中消费,最后创建消费者进行消费。

    效果

        先启动2个消费者,再启动生产者,消息完全接受到。

        

        先启动生产者,再分别启动消费者,只能接受到消费者启动之后生产发送的消息

    发布订阅模式就先到这,有什么不对的地方望斧正~

    附上Demo地址:https://github.com/1164887865/RabbitMQDemo

  • 相关阅读:
    mac 10.15.7 修改PATH
    oc 属性类型一般用法
    ubuntu解压zip文件名乱码
    telnet 退出
    docker 根据容器创建镜像
    mac android adb device 没有显示设备
    Yii2 查看所有的别名 alias
    Yii2 App Advanced 添加 .gitignore
    ubuntu 18.04 搜狗突然就提示乱码
    An error occured while deploying the file. This probably means that the app contains ARM native code and your Genymotion device cannot run ARM instructions. You should either build your native code to
  • 原文地址:https://www.cnblogs.com/zousc/p/12725685.html
Copyright © 2011-2022 走看看