zoukankan      html  css  js  c++  java
  • RabbitMQ的发布订阅模式(Publish/Subscribe)(三)

    一、发布/订阅(Publish/Subscribe)模式

    发布订阅是我们经常会用到的一种模式,生产者生产消息后,所有订阅者都可以收到。RabbitMQ的发布/订阅模型图如下:

    1、该模式下生产者并不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列;
    2、该模式必须声明交换机,并且设置模式: channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);  
    fanout指分发模式(将每一条消息都发送到与交换机绑定的队)。
    3、队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    二、发布消息

    消息生产者向交换机(exchange)发送消息,代码如下:

       // 定义交换机名称
            static string EXCHANGE_NAME = "ps_exchange_fanout";
            public static void PublishMessage()
            {
                try
                {
                    var conn = RabbitMQHelper.GetConnection();
                    var channel = conn.CreateModel();
                    // 定义exchange
                    channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
                    string msg = "hello ps!";
                    var body = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish(EXCHANGE_NAME, "", null, body);
                    Console.WriteLine("send msg:" + msg);
                    channel.Close();
                    conn.Close();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    

    消息发送成功,截图如下:

    三、订阅消息

    在这里需要两个消费者,消息发送后,所有的订阅者都可以收到消息;

    3.1 消费者1

    和轮询分发以及公平分发不同的是,消费者需要将队列绑定到交换机,来订阅消息;实现代码如下:

            static string EXCHANGE_NAME = "ps_exchange_fanout";
            static string QUEUE_NAME = "ps_queue_sub1";
            /// <summary>
            /// 订阅消费者1
            /// </summary>
            static void SubscribeConsumer1()
            {
                var conn = RabbitMQHelper.GetConnection();
                var channel = conn.CreateModel();
                // 定义exchange
                channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
                // 绑定queue
                channel.QueueDeclare(queue: QUEUE_NAME);
                channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "");
                // 定义Consumer
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model,ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine($"SubscribeConsumer1 收到消息: {message},时间:{DateTime.Now}");
                };
                //启动消费者 设置为手动应答消息
                channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
                Console.WriteLine("Subscribe Consumer1 消费者已启动");
                Console.ReadKey();
                channel.Dispose();
                conn.Close();
            }
    

    3.2 消费者2

    消费者2的代码和1的基本相同,大家可以将1的修改一下,就可以使用,在此就不重复贴出了;

    3.3 接收消息结果

    消费者1接收消息截图:

    消费者2接收消息截图:

    通过上图,我们可以看到,发布者发布消息后,订阅者1、2均受到了相同的消息,至此功能已经完成;

    四、小结

    4.1 订阅者代码的主要流程

    根据消费者的代码,我们可以提炼流程如下
    (1)创建连接
    (2)声明exchange
    (3)绑定队列到exchange
    (4)声明消费者
    (5)绑定消费者到channel,监听处理消息
    (6)关闭连接

    4.2 订阅成功后,我们打开mq的管理地址可以看到,有两个queue绑定到exchange上了:


    注意:如果我们的exchange没有消费者订阅,发布的消息将不会被保存到任何队列,直接丢失了;
    参考链接:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

  • 相关阅读:
    打开Fiddle,提示“Machine-wide Progress Telerik Fiddler installation has been found at ...Please, use that one or uninstall it ...”
    Type Target runtime Apache Tomcat v8.0 is not defined.
    Loadrunner的安装注意事项
    11.java并发编程的艺术-java并发编程实践
    10.java并发编程的艺术-Executor框架
    9.java并发编程的艺术-java中的线程池
    8.java并发编程的艺术-java中的并发工具类
    Java并发编程的艺术-java中的13个原子操作类
    6.java并发容器和框架——Fork/Join框架
    6.java并发容器和框架——Java中的阻塞队列
  • 原文地址:https://www.cnblogs.com/zqllove/p/12858659.html
Copyright © 2011-2022 走看看