zoukankan      html  css  js  c++  java
  • 柯南君:看大数据时代下的IT架构(6)消息队列之RabbitMQ--案例(Publish/Subscribe起航)

    一、回想

    让我们回想一下,在上几章里都讲了什么?总结例如以下:

    二、Publish/Subscribe(公布/订阅)(using the Java Client)

    	在前面的教程中,我们创建了一个work Queue(工作队列)

    工作队列背后的如果是每一个任务是交付给一个工作者(worker) 也就是均匀分给每一个消费者。在本部分,我们将做一些全然不同的事情,我们将提供一个消息到多个消费者。

    这样的模式被称为“公布/订阅”。

    	为了说明这个模式,我们将构建一个简单的日志系统。它将包含两个项目:
    1. 第一个将发出日志消息
    2. 第二个将接收并打印它们。
    	在我们的日志系统,每执行一次,接收器项目将得到消息的副本。这样我们能够执行一个接收机而且能够直接记录到磁盘,同一时候我们能够执行还有一个接收器,看到屏幕上的日志。

    注:从本质上讲,发表日志消息广播给全部的接收者。

    	以下让我们脑中带几个问题,让我们一步一步去解决:
    • 假设我把消息分配给全部的消费者,我们将怎么做呢?

    三、Exchanges(交换机)

    在前部分的教程中,我们从一个队列发送和接收消息。如今是时候让Rabbit推出完整的消息模型。

    让我们高速复习我们前面的教程::
    • 生产者是一个用户发送消息的应用程序。
    • 一个队列是存储消息的缓冲区。

    • 消费者是一个用户应用程序接收消息。


    RabbitMQ的消息模型的核心思想是,生产者从未直接向队列发送不论什么消息。

    实际上,常常生产者甚至不知道消息是否会被运送到不论什么队列。
    相反,生产者仅仅能发送Exchanges(消息交换区)。交换是一个很easy的事情。一方面它从生产者那收到消息并推他们到还有一边队列。交换区必须知道怎样处理它收到一条消息:

    1. 它应该被加到一个特定的队列吗?

    2. 它应该被加到多队列?
    3. 或者它应该丢弃吗?
    交换的规则定义的类型。

    如上图所看到的:X表示Exchange(交换机);
    有一些可用的交换类型directtopicheaders and fanout我们将专注于最后一个——fanout。

    让我们创建一个这样的类型的交换,称之为日志:

    channel.exchangeDeclare("logs", "fanout");
    fanout交换很easy。你大概能够猜到的名字,仅仅是广播全部的消息接收队列它知道。

    而这正是我们须要为我们的记录器。

    问题:
    exchange list 列出全部 (交换机)列表
    $ sudo rabbitmqctl list_exchanges
    Listing exchanges ...
            direct
    amq.direct      direct
    amq.fanout      fanout
    amq.headers     headers
    amq.match       headers
    amq.rabbitmq.log        topic
    amq.rabbitmq.trace      topic
    amq.topic       topic
    logs    fanout
    ...done.
    在此列表中有一些amq* 交换器 与默认(匿名)交换。

    这些都是默认创建的,但可能你不须要使用它们。

    ② 缺省名字的 exchange(交换机)
    在前部分的教程中我们对exchange 一无所知,,但仍然可以将消息发送到队列。这是可能的,由于我们是使用一个默认的交换,我们确定的空字符串(" ")
    记得之前我们公布一个消息:
        channel.basicPublish("", "hello", null, message.getBytes());
    第一个參数是该交换区的名称。空字符串表示默认或无名的交换。:假设routingKey存在的话。消息路由到指定的队列的名称。
    如今,我们能够公布我们的交换器:
        channel.basicPublish( "logs", "", null, message.getBytes());

    四、Temporary queues(暂时队列)

    	你可能记得曾经我们使用的队列都是指定名称的(还记得hello和task_queue吗?)。对我们来说命名一个队列是至关重要的,
    	当你想在生产者和消费者中分享队列的时候,给一个队列的名称是必须的。	
        可是那些都不是日志记录系统所须要的。我们希望可以获得全部的日志信息,而不仅仅是当中的一部分。并且我们仅仅对当前正在传递的信息感兴趣,
        对旧的日志信息不感兴趣,要解决这些问题,我们须要分两个步骤:
    • 首先当我们链接到RabbitMQserver的时候,须要一个新的、空的队列,为了做到这点。能够创建一个随机名的队列。
    或者更好的方法就是让server选择一个随机的队列名。
    • 其次,当断开与队列的连接时。消费者应该被自己主动删除掉
    在Javaclient,我们通过一个无參数的queueDeclare()方法为我们创建一个非持久的、唯一的、能自己主动删除的队列与队列名称
     String queueName = channel.queueDeclare().getQueue();
    在这点上。queueName包括了一个随机队列名称。

    比如它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    五、Bindings(绑定)

    我们已经创建了一个fanout exchange和一个队列。如今我们须要告诉exchange去发送消息到队列中,exchange和队列之间的关系被称为一个绑定(binding)

    	channel.queueBind(queueName, "logs", "");
    注意:从如今開始我们从logs exchange将被加入消息到队列中,使用rabbitmqctl list_bingdins能列出全部的绑定。

    六、Putting it all together(公布者/订阅者 实现)

    生产者代码和之前的发送消息的代码并没有太大的差别,最重要的变化是,我们如今要将公布的消息传递给logs exchange来取代无名的exchange(之前的是"")
    在发送消息时须要提供一个routingKey。它对于fanout exchange是很重要的,不能被忽视的,这里的EmitLog.java代码例如以下
    </pre><pre name="code" class="java">import java.io.IOException;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class EmitLog {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv)
                      throws java.io.IOException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            String message = getMessage(argv);
    
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
        //...
    }

    接收端:
    import java.io.IOException;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class ReceiveLogs {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
    
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }


    像曾经一样,我们開始做编译
    $ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java
    假设你想将日志保存到一个文件,打开一个控制台并执行
    $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
    假设你想看到日志在你的屏幕上,产生一个新的终端并执行:
    $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs
    公布日志类型:
    $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog
    使用rabbitmqctl list_bindings实际上您能够验证绑定和队列的代码是否是我们想要的? 有两个ReceiveLogs。
    $ sudo rabbitmqctl list_bindings
    Listing bindings ...
    logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
    logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
    ...done.



  • 相关阅读:
    Treap 树堆 容易实现的平衡树
    (转)Maven实战(二)构建简单Maven项目
    (转)Maven实战(一)安装与配置
    根据请求头跳转判断Android&iOS
    (转)苹果消息推送服务器 php 证书生成
    (转)How to renew your Apple Push Notification Push SSL Certificate
    (转)How to build an Apple Push Notification provider server (tutorial)
    (转)pem, cer, p12 and the pains of iOS Push Notifications encryption
    (转)Apple Push Notification Services in iOS 6 Tutorial: Part 2/2
    (转)Apple Push Notification Services in iOS 6 Tutorial: Part 1/2
  • 原文地址:https://www.cnblogs.com/gcczhongduan/p/5139985.html
Copyright © 2011-2022 走看看