zoukankan      html  css  js  c++  java
  • rabbitMQ_Publish/Subscribe(三)

    发布/订阅

     生产者发布信息,多个订阅者可以同时接收到信息。

    转发器

    现在是时候在RabbitMQ中引入完整的消息传递模式了。

    让我们快速了解我们在以前的教程中介绍的内容:

    • 生产者是一个发送消息的应用程序。
    • 队列是存储消息的缓冲器。
    • 消费者是接收消息的应用程序。

    RabbitMQ中的消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,生产者通常甚至不知道是否将消息传递到某个队列。

    相反,生产者只能将信息发送到转发器。转发是一件非常简单的事情。一方面,它收到来自生产者的消息,另一方将它们推送到队列。转发器必须准确知道接收到的消息如何处理。应该转发到特定队列吗?应该转发到多个队列吗?或者应该丢弃。其规则由转发器类型来决定 。

     

    有几种转发类型可用:direct, topic, headers 和 fanout.。我们将重点关注最后一个 - fanout。让我们创建一个这种类型的转发器,并将其称为logs转发器:

    channel.exchangeDeclare(“logs”,“fanout”);
    

    fanout类型的转发非常简单。它只是将所有收到的消息广播到所有绑定到这个转发器的队列上。

    罗列转发器

    要列出服务器上的转发器,您可以运行rabbitmqctl:

    sudo rabbitmqctl list_exchanges

    在这个列表中会有一些名为amq.*这样前缀的转发器和默认(未命名)转发器。这些是默认创建的,但是不太可能需要使用它们。

    匿名转发器

    在前面部分,我们没有定义任何转发器,但仍然能够将消息发送到队列上。这是可能的,因为我们使用了默认的转发器,我们通过空字符串(“”)标识。

    回想一下我们之前发布的消息:

    channel.basicPublish(“”,“hello”,null,message.getBytes());
    

    第一个参数是转发器的名称。空字符串表示默认或匿名转发器:消息通过routekey(第二个参数)路由到指定的队列。

    现在,我们可以发布到我们定义的转发器logs:

    channel.basicPublish(“logs”,“”,null,message.getBytes());
    

    临时队列

    我们之前使用的是具有指定名称的队列(hello ,task_queue这类指定了名称的队列)。当您想要在生产者和消费者之间共享队列时,给队列一个名字很重要。

    但是本例中,我们不想自己去创建一个特定名字的队列,我们希望一个消费者连接到rabbitMQ就自动创建一个具有随机名称的队列,然后当这个消费者断开连接的时候

    就自动将这个队列删除。

    在java中当我们没有为queueDeclare()提供参数时, 我们创建了一个具有生成随机名称的非持久性的,排他的,自动删除的队列:

    String queueName = channel.queueDeclare().getQueue();
    

    此时,queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    绑定

    我们已经创建了一个fanout转发器和随机名称队列。现在我们需要告诉转发器发送消息到这些随机名称队列中,所以我们需要将这些队列绑定到转发器上,它知道要发送消息给哪些队列。

    channel.queueBind(queueName,“logs”,“”);
    

    从现在开始,logs转发器将把消息转发到我们的队列中。

    列出绑定

    你可以列出现有的绑定

    rabbitmqctl list_bindings
    

     

     

    生产者EmitLogl.java

     1 package com.rabbitMQ;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.rabbitmq.client.ConnectionFactory;
     6 
     7 public class EmitLog {
     8 
     9     private static final String EXCHANGE_NAME = "logs";//转发器
    10 
    11     public static void main(String[] argv)
    12                   throws java.io.IOException, Exception {
    13 
    14         ConnectionFactory factory = new ConnectionFactory();
    15         factory.setHost("localhost");
    16         Connection connection = factory.newConnection();
    17         Channel channel = connection.createChannel();
    18 
    19         //定义一个转发器,转发器名为logs,转发器类型为fanout,他可以对信息进行广播到所有绑定了的队列中
    20         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    21 
    22         String message = getMessage(argv);
    23         //由于是广播到所有绑定了这个转发器的队列,那么可以不写routeKey
    24         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    25         System.out.println(" [x] Sent '" + message + "'");
    26 
    27         channel.close();
    28         connection.close();
    29     }
    30     private static String getMessage(String[] strings) {
    31         if (strings.length < 1)
    32             return "Hello World!";
    33         return joinStrings(strings, " ");
    34     }
    35 
    36     private static String joinStrings(String[] strings, String delimiter) {
    37         int length = strings.length;
    38         if (length == 0)
    39             return "";
    40         StringBuilder words = new StringBuilder(strings[0]);
    41         for (int i = 1; i < length; i++) {
    42             words.append(delimiter).append(strings[i]);
    43         }
    44         return words.toString();
    45     }
    46 }

    消费者ReceiveLogs.java

     1 package com.rabbitMQ;
     2 
     3 import com.rabbitmq.client.*;
     4 
     5 import java.io.IOException;
     6 
     7 public class ReceiveLogs {
     8     //转发器名称
     9   private static final String EXCHANGE_NAME = "logs";
    10 
    11   public static void main(String[] argv) throws Exception {
    12     ConnectionFactory factory = new ConnectionFactory();
    13     factory.setHost("localhost");
    14     Connection connection = factory.newConnection();
    15     Channel channel = connection.createChannel();
    16     //可以使用rabbitmqctl list_exchanges查看有哪些转发器
    17     channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    18     //定义一个排他的,自动删除的,非持久性的队列,名称是随机的
    19     String queueName = channel.queueDeclare().getQueue();
    20     //转发器绑定队列
    21     //可使用rabbitmqctl list_bindings查看绑定列表
    22     channel.queueBind(queueName, EXCHANGE_NAME, "");
    23 
    24     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    25 
    26     Consumer consumer = new DefaultConsumer(channel) {
    27       @Override
    28       public void handleDelivery(String consumerTag, Envelope envelope,
    29                                  AMQP.BasicProperties properties, byte[] body) throws IOException {
    30         String message = new String(body, "UTF-8");
    31         System.out.println(" [x] Received '" + message + "'");
    32       }
    33     };
    34     boolean autoAck = true;
    35     channel.basicConsume(queueName, autoAck, consumer);
    36   }
    37 }

    启动两个ReceiveLogs.java实例,然后启动EmitLog.java,两个ReceiveLogs.java将同时收到消息

  • 相关阅读:
    openwrt 相关文章
    负载均衡相关文章
    Today's Progress
    Rodrigues formula is beautiful, but uneven to sine and cosine. (zz Berkeley's Page)
    Camera Calibration in detail
    Fundamental Matrix in Epipolar
    Camera Calibration's fx and fy do Cares in SLAM
    FilterEngine::apply
    FilterEngine 类解析——OpenCV图像滤波核心引擎(zz)
    gaussBlur
  • 原文地址:https://www.cnblogs.com/honger/p/6963234.html
Copyright © 2011-2022 走看看