zoukankan      html  css  js  c++  java
  • RabbitMQ入门:发布/订阅(Publish/Subscribe)

    在前面的两篇博客中

    遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息模型分别为(P代表生产者,C代表消费者,红色代表队列):

    这次我们来看下将一个消息发送给多个消费者(工作者),这种模式一般被称为“发布/订阅”模式。其工作模型为(P代表生产者,X代表Exchange(路由器/交换机),C代表消费者,红色代表队列):

    我们发现,工作模型中首次出现路由器,并且每个消费者有单独的队列。生产者生成消息后将其发送给路由器,然后路由器转送到队列,消费者各自到自己的队列里面获取消息进行消费。在实际的应用场景中,生产者一般不会直接将消息发送给队列,而是发送给路由器进行中转,Exchange必须清楚的知道怎么处理收到的消息:是将消息发送到一个特定队列还是多有队列,或者直接废弃消息。这种才符合RabbitMQ消息模型的核心思想

    接下来我们详细展开今天的话题:

    一、Exchange

    Exchange在我们的工作模型中首次出现,因此需要详细介绍下。

    Exchange分为4种类型:

    Direct:完全根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
    Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
    Fanout:不需要key,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
    Headers:我们可以不考虑它。

    今天我们的实例采用fanout类型的exchange。

    尽管首次出现,但是其实我们前面的案例中也有用到exchange,只是我们没有给他名字,用的是RabbitMQ默认的,比如下面这段代码,我们将路由器名这个参数传入了“”,如果我们需要自己声明exchange的话,这个就不能传入“”了,而是传入自己定义好的值。

    二、临时队列

    前面两篇博客中,我们都在使用队列的时候给出了定义好的名字,这在生产者和消费者共用相同队列的时候很有必要,但是我们有了exchange,生产者不需要知道有哪些队列,因此队列名字可以不用指定了,而是通过RabbitMQ 接口自己去生成临时队列,队列名字也由RabbitMQ自动生成。通过

    可以声明一个非持久的、通道独占的、自动删除的队列,getQueue()方法可以获取随机队列名字。这个名字用来在队列和exchange之间建立binding关系的时候使用:

    三、代码实现

    基于上面exchange和临时队列的知识铺垫,可以展开今天的代码实现了。

    1.  生产者
      public class Product {
          //exchange名字
          public static String EXCHANGE_NAME = "exchange";
      
          public static void main(String[] args) {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("localhost");
              Connection connection = null;
              Channel channel = null;
              try {
                  // 1.创建连接和通道
                  connection = factory.newConnection();
                  channel = connection.createChannel();
      
                  // 2.为通道声明exchange和exchange的类型
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                  
                  String msg = " hello rabbitmq, this is publish/subscribe mode";
                  // 3.发送消息到指定的exchange,队列指定为空,由exchange根据情况判断需要发送到哪些队列
                  channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
                  System.out.println("product send a msg: " + msg);
              } catch (IOException e) {
                  e.printStackTrace();
              } catch (TimeoutException e) {
                  e.printStackTrace();
              } finally {
                  // 4.关闭连接
                  if (channel != null) {
                      try {
                          channel.close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      } catch (TimeoutException e) {
                          e.printStackTrace();
                      }
                  }
      
                  if (connection != null) {
                      try {
                          connection.close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
      }
    2. 消费者1
      public class Consumer1 {
      
          public static void main(String[] args) {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("localhost");
              Connection connection = null;
              Channel channel = null;
              try {
                  // 1.创建连接和通道
                  connection = factory.newConnection();
                  channel = connection.createChannel();
      
                  // 2.为通道声明exchange以及exchange类型
                  channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
      
                  // 3.创建随机名字的队列
                  String queueName = channel.queueDeclare().getQueue();
      
                  // 4.建立exchange和队列的绑定关系
                  channel.queueBind(queueName, Product.EXCHANGE_NAME, "");
                  System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them");
                  // 5.通过回调生成消费者并进行监听
                  Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope,
                              com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
      
                          // 获取消息内容然后处理
                          String msg = new String(body, "UTF-8");
                          System.out.println("*********** Consumer1" + " get message :[" + msg + "]");
                      }
                  };
                  // 6.消费消息
                  channel.basicConsume(queueName, true, consumer);
              } catch (IOException e) {
                  e.printStackTrace();
              } catch (TimeoutException e) {
                  e.printStackTrace();
              }
          }
      }
    3. 消费者2,核心代码同消费者1一样,只是在日志打印上将"Consumer1"改为"Consumer2"而已。这里不再列出具体代码。
    4. 先运行消费者1和2,然后运行生产者,观察控制台log打印情况:
      生产者:
      product send a msg:  hello rabbitmq, this is publish/subscribe mode
      
      消费者1**** Consumer1 keep alive ,waiting for messages, and then deal them
      *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode]
      
      消费者2:
       **** Consumer2 keep alive ,waiting for messages, and then deal them
      *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]

      可以看到,当生产者发出消息后,两个消费者最终都收到了消息。

    5. 我们去查看RabbitMQ管理页面:

      在Exchanges 标签页里面多了一个名为“exchange”的路由器,他的类型是fanout。点exchange 的link进入详细页面:

      发现在binding项目中有了两条绑定关系,队列的名字也可以看到。将页面切换到Queues标签页:

      出现了两个新的队列,队列名字和绑定关系中的一样,并且队列都是自动删除的、通道独占的。

    6. 然后将消费者1和消费者2都停掉,重新查看管理页面,我们发现exchange还在,binding关系不存在了,临时队列也自动删除了

  • 相关阅读:
    libevent网络编程汇总
    LibEvent代码阅读--多缓冲区和零拷贝技术
    几个第三方库发图片
    Libevent使用例子,从简单到复杂
    CImage得到位图的大小
    从位图数据取得位图句柄
    BMP格式详解
    如何将内存中的位图数据绘制在DC上
    C++加载位图跟SOCKET通信的编写
    11235
  • 原文地址:https://www.cnblogs.com/sam-uncle/p/9208008.html
Copyright © 2011-2022 走看看