zoukankan      html  css  js  c++  java
  • RabbitMQ(五)——发布订阅模式

    RabbitMQ系列

    RabbitMQ(一)——简介

    RabbitMQ(二)——模式类型

    RabbitMQ(三)——简单模式

    RabbitMQ(四)——工作队列模式

    RabbitMQ(五)——发布订阅模式

    RabbitMQ(六)——路由模式

    RabbitMQ(七)——主题模式

    RabbitMQ(八)——消息确认

    RabbitMQ(九)——消息持久化

    RabbitMQ(十)——消息优先级

    前言

      上一章的工作队列模式中,生产者发布的一堆消息进入队列,消费者接收队列中的消息,每条消息只能发给一个消费者。

      本章要做的是吧一条消息发送给多个消费者,这种模式就是Fanout Exchange(扇形交换机)“发布/订阅模式”,它会将消息路由给绑定到它身上的所有队列。

      注意:该模式定义队列时durable:false没有存储消息功能,如果消息发送到没有绑定消费队列的交换器,消息丢失,也就是说,当消息生产者发送消息时,消费者还没有绑定此交换器,则没有接收到消息,且接收不到了;durable:true只要没被消费就一直存在队列中。

      交换器:

      在RabbitMQ中完整的消息传递并不是生产者直接将消息发送到队列,生产者甚至不不知道消息是否会进入队列。正确的模型是生产者把消息发送给交换器(Exchange),交换器会将接受到的消息转发到队列中,交换器必须确定将消息发送到哪些队列。  

      

      交换器类型(详情):

        • Fanout     
        • Direct
        • Topic
        • Header

      

      fanout类型即发布订阅模式,它会把收到的消息广播到它绑定的队列中。

    channel.ExchangeDeclare("exchange", "fanout");
    

      

      之前我们发布的做法是这样的:

    channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));
    

      

      前面我们并没有声明交换器,之所以还能把消息发送到队列是因为用了“”空的字符串标识了默认或匿名交换器。BasicPublish方法第一个参数就是交换器,第二个参数是routekey,消息通过本交换器进入到routekey队列。

     

      发送消息到指定交换器:

    channel.BasicPublish("exchange", "simple", null, Encoding.UTF8.GetBytes(msg)); 
    

      

      创建好fanout交换器后,定义多个队列,然后将交换器和队列绑定:

    channel.QueueBind("simple", "exchange", "");
    

      

        

    代码

      生产者:

    static void Main(string[] args)
            {
                Console.WriteLine("FanoutServer发布服务器启动...");
    
                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var connection = factory.CreateConnection())
                //3.创建通道
                using (var channel = connection.CreateModel())
                {
                    //4.创建交换器
                    channel.ExchangeDeclare("exchange", "fanout");
    
                    string msg = "";
    
                    for (int i = 0; i < 100; i++)
                    {
                        msg = $"发布消息{i}";
                        var body = Encoding.UTF8.GetBytes(msg);
                        channel.BasicPublish("exchange", "", null, body);
                        Console.WriteLine($"发布成功:{msg}");
                        Thread.Sleep(600);
                    }
                    Console.ReadKey();
                }
            }
    View Code

       消费者:

    static void Main(string[] args)
            {
                Console.WriteLine("FanoutClient接收客户端启动...");
                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var connection = factory.CreateConnection())
                {
                    //3.创建通道
                    using (var channel = connection.CreateModel())
                    {
                        //4.定义交换器
                        channel.ExchangeDeclare("exchange", "fanout");
                        //5.创建匿名队列,绑定交换器
                        //var queueName = channel.QueueDeclare("simple");
                        var queueName = channel.QueueDeclare().QueueName;
                        channel.QueueBind(queueName, "exchange", "");
    
                        //6.创建消费者
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                          {
                              //接收消息
                              var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                              Console.WriteLine($"接收消息:{body.ToString()}");
                          };
                        //7.消费消息
                        channel.BasicConsume(queueName, true, consumer);
    
                        Console.ReadKey();
    
                    }
                }
            }
    View Code

     生产者创建完通道后,创建一个交换器,给它命名“exchange”以及设定“fanout”类型,最后发布消息到此交换器,与前面的模式不同的是不用设定队列参数。

                                                                                                                                                                                                      

    消费者创建完通道后,创建一个交换器,与生产者的交换器一致,定义一个队列(可以指定,或者使用匿名),然后将此队列与交换器绑定,消息会在此队列中消费,最后创建消费者进行消费。

    效果

        先启动2个消费者,再启动生产者,消息完全接受到。

        

        先启动生产者,再分别启动消费者,只能接受到消费者启动之后生产发送的消息

    发布订阅模式就先到这,有什么不对的地方望斧正~

    附上Demo地址:https://github.com/1164887865/RabbitMQDemo

  • 相关阅读:
    poi api工具
    利用pwdx查看Linux程序的工作目录
    安装loadrunner11的时候提示'命令行选项语法错误。键入命令 / ?’ 怎么办
    vi 替换字符串
    在linux上用jmeter压测时出现很多异常java.net.NoRouteToHostException: Cannot assign requested address.
    在linux上用jmeter压测时出现很多异常java.net.NoRouteToHostException: Cannot assign requested address.
    jmeter:清除本地指定目录下的所有类型文件
    word中迅速将表格一分为二 拆分表格快捷键ctrl+shift+enter 重复上一个命令快捷键f4
    Jmeter中Bean shell脚本格式修改为utf-8
    在Discuz中增加创始人
  • 原文地址:https://www.cnblogs.com/zousc/p/12725685.html
Copyright © 2011-2022 走看看