zoukankan      html  css  js  c++  java
  • RabbitMQ学习总结 第四篇:发布/订阅 Publish/Subscribe

    目录

    RabbitMQ学习总结 第一篇:理论篇
    RabbitMQ学习总结 第二篇:快速入门HelloWorld

    RabbitMQ学习总结 第三篇:工作队列Work Queue

    RabbitMQ学习总结 第四篇:发布/订阅 Publish/Subscribe

    RabbitMQ学习总结 第五篇:路由Routing

    RabbitMQ学习总结 第六篇:Topic类型的exchange

    RabbitMQ学习总结 第七篇:RCP(远程过程调用协议)

    上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者。本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅)。

    为了说明这个模式,我们将会构建一个简单的日志系统。这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们。

    在我们的日志系统里,每个运行的消费者程序都能接收到消息。这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者来把日志消息打印到屏幕上。

    从本质上来说,是把日志消息推送到了所有的消费者端。

    1、消息交换机

    上篇中我们往Queue里发送消息,并从Queue里取出消息。现在我们来介绍RabbitMQ的完全消息模型。

    我们来快速回顾一下之前博文中的内容:

    • 一个生产者者应用程序发送消息;
    • 一个消息队列用来存储和缓存消息;
    • 一个消费者程序接收消息

    RabbitMQ的消息发送模型核心思想是生产者不直接把消息发送到消息队列中。事实上,生产者不知道自己的消息将会被缓存到哪个队列中。

    其实生产者者可以把消息发送到exchange(消息交换机)上。exchange是一个很简单的事情,它一边接收生产者的消息,另一边再把消息推送到消息队列中。Exchange必须知道在它接收到一条消息时应该怎么去处理。应该把这条消息推送到指定的消息队列中?还是把消息推送到所有的队列中?或是把消息丢掉?这些规则都可以用exchange类型来定义。

    有一些可用的exchange类型:direct, topic, headers和fanout。这里我们主要看最后一个:fanout,这里我们创建一个名字为logs、类型为fanout的exchange:

    channel.exchangeDeclare("logs", "fanout");

    fanout类型的exchange是很简单的。就是它把它能接收到的所有消息广播到它知道的所有队列中。这正是我们的日志系统所需要的。

    列出exchange

    可以在服务器上使用rabbitmqctl命令来列出RabbitMQ服务器上的所有消息exchange:

    $ sudo rabbitmqctl list_exchanges
    Listing exchanges ...
            direct
    amq.direct      direct
    amq.fanout      fanout
    amq.headers     headers
    amq.match       headers
    amq.rabbitmq.log        topic
    amq.rabbitmq.trace      topic
    amq.topic       topic
    logs    fanout
    ...done.

    在这个列表中有一些形如amp.*的exchange,还有默认(未命名)的交换机。这些都是被默认创建的,但这些已经被默认创建的都不是你现在需要用到的。

    没有名字的exchange

    在之前的博文里没有使用都exchange的相关知识,但是任然能够发送消息。之所以能发送成功是因为我们使用一个默认exchange,我们使用(””)来标识的。

    channel.basicPublish("", "hello", null, message.getBytes());

    第一个参数就是exchange的名字。空字符串的符号指的是默认的或没有命名的exchange:消息会根据routingKey被路由到指定的消息队列中。

    现在我们来吧消息推送到已命名的exchange上:

    channel.basicPublish( "logs", "", null, message.getBytes());

    2、临时队列

    如果你看过之前几篇的博文,应该会发现我们都是使用了一个指定名字的消息队列(hello和task_queue)。对应的生产者和消费者之间都要使用相同的消息队列名称,这在很重要的。

    但是在我们的log系统中却不是这样,我们希望能够接收到所有的log消息,不只是其中的一部分。我们只要处理当前的log消息,不用管过去的历史log。为了实现,我们需要做以下两步:

    1. 无论什么时候我们和RabbitMQ建立连接时,我们都要刷新、清空Queue。为了达到这一的目的,我们可以用一个随机的名字(随机性可由自己来定义)来创建Queue,也可以让服务器来自动建立一个随见的Queue。
    2. 当消费者断开连接时,Queue能自动被删除。

    使用Java客户端时,我们使用无参数的queueDeclare方法,就可以创建一个已经生成名字的、排他性的且会自动删除的Queue:

    String queueName = channel.queueDeclare().getQueue();

    这是就拿到了一个随机名字的queue,形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg

    3、绑定(bindings)

    我们已经创建了一个fanout类型的exchange和一个队列。现在我们需要让exchange向我们的queue里发送消息。Exchange和queue之间关系被称为binding(绑定)。

    channel.queueBind(queueName, "logs", "");

    现在开始,名字为logs的exchange就会忘我们的queue里退消息了。

    查看binding列表:

    使用rabbitmqctl list_bindings命令来看已经存在的所有的binding。

    4、最终实现

    发送日志消息的生产者程序和之前的程序没有太多的差别。最大的区别就是我们把消息推送到一个命名的exchange上,而不是之前未命名的默认exchange。在我们发送消息时需要提供一个routingKey,但对于fanout类型的exchange可以忽略。下边是生产者的代码EmitLog.java:

    import java.io.IOException;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class EmitLog {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv)
                      throws java.io.IOException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明exchange名字以及类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            
            // getMessage的实现请见上篇博文
            String message = getMessage(argv);
    
            //指定exchange的名字
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
        //...
    }

    正如你所见,在建立连接后我们声明了exchange。这一步是必须的,因为禁止向一个不存在的exchange推送消息。

    如果没有向exchange负责的queue,那么消息将会被丢失,这是没有问题的;如果没有消费者监听的话,我们会安全的丢掉这些消息。

    ReceiveLogs.java的代码如下:

    import java.io.IOException;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class ReceiveLogs {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明消息路由的名称和类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            //声明一个随机消息队列
            String queueName = channel.queueDeclare().getQueue();
            
            //绑定消息队列和消息路由
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            //启动一个消费者
            channel.basicConsume(queueName, true, consumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
    
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    • 编译文件:

    javac -cp .:amqp-client-3.3.5.jar ReceiveLogs.java EmitLog.java

    • 把日志存到文件里:

    java -cp .:amqp-client-3.3.5.jar ReceiveLogs > logs_from_rabbit.log

    然后监听该日志文件:

    tail -10f logs_from_rabbit.log

    • 往屏幕上打印日志消息:

    java -cp .:amqp-client-3.3.5.jar ReceiveLogs

    • 启动生产者:

    java -cp .:amqp-client-3.3.5.jar EmitLog

    日志输出到文件中:

    日志消息打印到了屏幕上:

    在运行ReceiveLogs的时候,使用rabbitmqctl list_bindings命令来查看RabbitMQ中的exchange

    leo@leocook:~$ sudo rabbitmqctl list_bindings
    Listing bindings ...
            exchange        amq.gen-1Zuyn_44c8IWsdJWrI42Og  queue   amq.gen-1Zuyn_44c8IWsdJWrI42Og  []
            exchange        amq.gen-rSrGSPWLNTuq1dfXipPfAA  queue   amq.gen-rSrGSPWLNTuq1dfXipPfAA  []
            exchange        task_queue      queue   task_queue      []
    logs    exchange        amq.gen-1Zuyn_44c8IWsdJWrI42Og  queue           []
    logs    exchange        amq.gen-rSrGSPWLNTuq1dfXipPfAA  queue           []
    ...done.

    总结:

    1、在生产者和消费者的信道中声明exchange名字以及类型

    2、在生产者的信道中指定发送目标的exchange

    3、在消费者端的信道中声明一个随机的消息队列,并拿到这个队列名称;然后在信道上绑定该消息队列和消息路由

    下篇咱们来讨论,消费者端怎么才能拿到生产者发送消息中的部分消息。

    参考链接:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

  • 相关阅读:
    DataAnnotations
    使用BizTalk实现RosettaNet B2B So Easy
    biztalk rosettanet 自定义 pip code
    Debatching(Splitting) XML Message in Orchestration using DefaultPipeline
    Modifying namespace in XML document programmatically
    IIS各个版本中你需要知道的那些事儿
    关于IHttpModule的相关知识总结
    开发设计的一些思想总结
    《ASP.NET SignalR系列》第五课 在MVC中使用SignalR
    《ASP.NET SignalR系列》第四课 SignalR自托管(不用IIS)
  • 原文地址:https://www.cnblogs.com/leocook/p/mq_rabbitmq_3.html
Copyright © 2011-2022 走看看