zoukankan      html  css  js  c++  java
  • 第三篇:发布/订阅

    Publish/Subscribe

    1. 交换器 
      1. 列出交换器
      2. nameless交换器
    2. 临时队列
    3. 绑定
    4. 整合
    5. 代码演示
      1. 运行结果分析

    之前的教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只被传递给一个工作人员。在这一部分,我们将做一些完全不同的事情 - 我们会向多个消费者传递信息。这种模式被称为“发布/订阅”。

    为了说明这种模式,我们将建立一个简单的日志系统。它将包含两个程序 - 第一个将发出日志消息,第二个将接收并打印它们。

    在我们的日志系统中,接收程序的每个运行副本都会收到消息。这样我们就可以运行一个接收器并将日志指向磁盘; 同时我们将能够运行另一个接收器并在屏幕上查看日志。

    基本上,发布的日志消息将被广播给所有的接收者。

    交换器

    在本教程的前几部分中,我们发送消息并从队列中接收消息。现在是时候在rabbitmq中引入完整的消息传递模型

    让我们快速回顾一下前面教程中的内容:

    • 生产者    是发送消息的应用程序。
    • 队列        是存储消息的缓冲器。
    • 消费者     是接收消息的应用程序。

    RabbitMQ中的消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。实际上,生产者通常甚至不知道消息是否会被传送到任何队列中。

    相反,生产者只能发送消息给交换器交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方则推动他们去排队。交易所必须知道如何处理收到的消息。是否应该附加到特定队列?它应该附加到多个队列中吗?或者它应该被丢弃。这些规则由交换类型定义 

     

     有几种可用的交换类型:directtopic,headerfanout我们将关注最后一个 - fanout。让我们创建一个这种类型的交换器,并将其命名为logs

    channel.exchangeDeclare("logs", "fanout");

    fanout交换器非常简单。它只是将收到的所有消息广播到它所知道的所有队列中。

    列出rabbitmq-server中的交换器

    [root@bogon ~]# sudo rabbitmqctl list_exchanges
    Listing exchanges ...
                        direct                                 #默认(未命名)交换
    amq.direct            direct
    amq.fanout    fanout
    amq.headers    headers
    amq.match    headers
    amq.rabbitmq.log    topic
    amq.rabbitmq.trace    topic
    amq.topic    topic
    ...done.

     nameless的交换器

     在本教程的以前部分,我们对交换一无所知,但仍能够将消息发送到队列。因为我们使用了一个默认的交换器,我们用空字符串(“”)来标识

    channel.basicPublish("","hello",null,message.getBytes());

     第一个参数是交换器的名称。空字符串表示默认或无名的交换器:如果交换器字符串存在,消息会通过routingKey指定的名称路由到队列。

     现在,我们可以将消息发布到我们的指定交换器:

    channel.basicPublish( "logs", "", null, message.getBytes());

    临时队列

     您可能还记得,我们先去使用的队列有一个指定的名称(hello  和 task_queue)。命名队列对我们至关重要 - 我们需要将worker指向同一队列。当你想在生产者和消费者之间分享队列时,给队列一个名字是很重要的。

    但是,我们的记录器并非如此。我们希望接收到所有日志消息,而不仅仅是其中的一部分。我们也只对目前流动的消息感兴趣,而不是旧消息要解决这个问题,我们需要两件事。

      首先,每当我们连接到rabbitmq-server,我们需要一个新的,空的队列。要做到这一点,我们可以创建一个随机名称的队列,或者甚至更好的方案 - 让服务器为我们选择一个随机队列名称。

      其次,一旦我们断开消费者,队列应该被自动删除。

     在Java客户端中,当我们不向queueDeclare()提供参数时,我们会使用rabbitmq生成的队列名称创建一个非持久的,独占的,自动删除队列

    String queueName = channel.queueDeclare().getQueue();       #queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

    绑定

     

     我们已经创建了一个fanout交换器和一个队列。现在我们需要告诉交换器将消息发送到我们的队列。交换器和队列之间的关系称为绑定

    channel.queueBind(queueName, "logs", "");

     从现在起,日志交换器会将消息附加到我们绑定的队列中。

    列出绑定

    [root@bogon ~]# rabbitmqctl list_bindings
    Listing bindings ...
        exchange    amq.gen--djOWalcrvkqh19W5b_rPw    queue    amq.gen--djOWalcrvkqh19W5b_rPw    []
        exchange    amq.gen-RNxtcGP0giBH0E6LADKVzw    queue    amq.gen-RNxtcGP0giBH0E6LADKVzw    []
        exchange    amq.gen-hPHJyg0QP40clgVEmQUP8A    queue    amq.gen-hPHJyg0QP40clgVEmQUP8A    []
        exchange    task_queue    queue    task_queue    []
    logs    exchange    amq.gen--djOWalcrvkqh19W5b_rPw    queue        []
    logs    exchange    amq.gen-RNxtcGP0giBH0E6LADKVzw    queue        []
    logs    exchange    amq.gen-hPHJyg0QP40clgVEmQUP8A    queue        []
    ...done.

     

    整合

    我们现在想发布消息到我们的日志交换器,而不是默认的nameless exchange发送时我们需要提供一个routingKey ,但是对于fanout交换器,它的值将被忽视

     

    代码演示:

    生产者:EmitLog.java

    package com.rabbitmq.tutorials.publishsubscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLog {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv)
                      throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.0.103");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            int messageCount = 1;
            while(messageCount<=10) {
                String message = "Message "+ messageCount;
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                System.out.println("[x] Sent'" + message + "'");
                messageCount +=1;
            }
            channel.close();
            connection.close();
        }
        //...
    }

    消费者:ReceiveLogs.java

    package com.rabbitmq.tutorials.publishsubscribe;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogs {
      private static final String EXCHANGE_NAME = "logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.103");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//声明交换器
        String queueName = channel.queueDeclare().getQueue(); //随机生成queueName。queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
        channel.queueBind(queueName, EXCHANGE_NAME, ""); //将交换器和队列绑定
    
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
    }

    运行结果分析

     执行方案1:

    1. 先运行EmitLog.java  

    2. 再运行ReceiveLogs.java

       结果:ReceiveLogs.java没有接收到EmitLog.java发送的消息 

       分析:ReceiveLogs.java中的队列尚未申明,没有绑定到logs交换器

      执行方案2:

    1. 先运行ReceiveLogs.java 两次   

    2. 查看交换器绑定关系列表,有两个队列已经绑定到了logs交换器
      [root@bogon ~]# rabbitmqctl list_bindings
      Listing bindings ...
          exchange    amq.gen-F0w3pUADFsnfytz0_Ia8-A    queue    amq.gen-F0w3pUADFsnfytz0_Ia8-A    []
          exchange    amq.gen-r8WYvKOOSY4vQHfjCZ4W6g    queue    amq.gen-r8WYvKOOSY4vQHfjCZ4W6g    []
          exchange    task_queue    queue    task_queue    []
      logs    exchange    amq.gen-F0w3pUADFsnfytz0_Ia8-A    queue        []
      logs    exchange    amq.gen-r8WYvKOOSY4vQHfjCZ4W6g    queue        []
      ...done.
    3. 再运行EmitLog.java,ReceiveLogs.java接收到消息
    4. 关闭两个ReceiveLogs.java程序后,再查看交换器绑定列表,绑定关系被删除

      [root@bogon ~]# rabbitmqctl list_bindings
      Listing bindings ...
          exchange    task_queue    queue    task_queue    []
      ...done.
  • 相关阅读:
    cygwin补充安装gcc/g++的方法
    JS中获取request的值,非常好用的JS代码
    登录页面跳出框架的JS
    asp.net DataTable转JSON
    ASP.NET文件下载的实用方法
    史上最牛X到的身份证号码验证,测试误差为0
    nopcommerce插件深度剖析
    C# foreach,linq,delegate集合查询的性能比较
    jquery tab插件精简版
    建议博客园成立中国的开源项目组织,同意的顶起
  • 原文地址:https://www.cnblogs.com/jimboi/p/8455846.html
Copyright © 2011-2022 走看看