zoukankan      html  css  js  c++  java
  • 转载RabbitMQ入门(1)--介绍

    前面声明本文都是RabbitMQ的官方指南翻译过来的,由于本人水平有限难免有翻译不当的地方,如发现不对的地方,请联系下我,好及时改正。好了,正文开始:

    RabbitMQ 是一个消息代理。这主要的原理十分简单,就是通过接受和转发消息。你可以把它想象成邮局:当你将一个包裹送到邮局,你会相信邮递员先生最终会将邮件送到接件人手上。RabbitMQ就好比一个邮箱,邮局或邮递员。

    邮局和RabbitMQ两种主要的不同之处在于,RabbitMQ不处理文件,而是接受,并存储和以二进制形式将消息转发。

    RabbitMQ,在消息的传送过程中,我们使用一些标准称呼。

    生产过程就像发送过程,发送消息的程序就是一个生产者,我们使用“P”来描述它。

    producer

    队列是好比邮筒的称呼,它位于RabbitMQ内部,虽然消息流通过RabbitMQ和你的应用程序,但是它们仅仅存储在队列中。一个队列没有范围 限制,你可以想存储多少就存储多少,本质上来说它是无限大的缓存。多个生产者可以通过一个队列发送消息,同样多个消费者也可以通同一个消息队列中接收消 息。队列是画成这样,名字在它的上面:

    queue

    消费过程与接收相似,一个消费者通常是一个等着接受消息的程序,我们使用"C"来描述:

    consumer

    注意,那生产者,消费者和代理者不需要一定在一个机器上,事实上,大多数应用程序中,他们并不在一个机器上。

    "Hello World"

    (使用java客户端)

    在这部分指南中,我们将要使用java写两个程序;一个发送简单消息的生产者和一个接收消息并输出出来的消费者。我们会忽视掉一些Java API的细节,为了开始仅仅精选在这简单的事情上,这是一个"Hello World"消息。

    java-one

    Java 客户端库 RabbitMQ 遵循AMQP协议,那是一个开放的,并且通用的消息协议。在不同语言中有数种AMQP客户端,我们使用由RabbitMQ提供的Java客户端。 下载客户端库包,检验签名,将它解压缩到你的工作路径,从解压到的路径中提取JAR文件:

    $ unzip rabbitmq-java-client-bin-*.zip
    $ cp rabbitmq-java-client-bin-*/*.jar ./
    

    (RabbitMQ Java客户端也存在Maven中央库中,groupIdcom.rabbitmq,artifactIdamqp-client.)

    现在我们已经有了Java客户端和依赖文件,我们可以写一些代码了。

    发送

    sending.png

    我们将会让我们的消息发送者发送消息,我们的接收者接收消息。发送者连接到RabbitMQ上,发送一个简单的消息,然后退出。

    Send.java,我们需要引入一些类:

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    

    建立这个类,为队列命名:

    public class Send {
    
      private final static String QUEUE_NAME = "hello";
    
      public static void main(String[] argv)
          throws java.io.IOException {
          ...
      }
    }
    

    接着,我们创建一个服务器的连接:

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    

    抽象的socket连接,注意协议版本的处理以及授权,诸如此类的事情。 这里我们连接到本地机器上的代理,因此它是localhost。如果我们想连接到不同机器上的代理,只需要说明它的主机名和IP地址。

    接下来我们创建一个通道,获取操作的大多数API都位于这上。

    对于发送,我们必须声明一个发送队列,然后我们把消息发送到这个队列上:

    channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

    Declaring a queue is idempotent - it will only be created if it doesn't exist already. The message content is a byte array, so you can encode whatever you like there.

    Lastly, we close the channel and the connection; 声明一个队列是幂等的,仅仅在要声明的队列不存在时才创建。消息内容是二进制数组,所以你可以随你喜好编码。

    channel.close(); connection.close();

    Here's the whole Send.java class.

    发送没有起作用

    如果你是第一次使用RabbitMQ并且你没有看到"Sent"消息,你可能抓耳挠腮的想到底是哪里出的问题。可能是代理启动时没有足够空间(默认 它需要至少1Gb 空间),因此拒绝接受消息。通过检查代理的日志文件来确定这个问题,必要情况下可以降低限制大小。配置文件的文档将会告诉你怎样设置disk_free_limit

    接收

    上面代码是构建我们的发送者。我们的接收者是从RabbitMQ中提取消息,所以不像发送者那样发送一个简单的消息,我们需要一直运行监听消息并且输出消息。 receiving

    Recv.java中的代码有与Send中几乎相同的引用:

    import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer;

    这额外的QueueingConsumer类是用来缓存从服务器那里发出来的信息。

    跟创建发送者相同,我们打开一个连接和一个通道,声明一个我们要消费的队列。注意要与发送的队列相匹配。

    public class Recv {
    
      private final static String QUEUE_NAME = "hello";
    
      public static void main(String[] argv)
          throws java.io.IOException,
                 java.lang.InterruptedException {
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        ...
        }
    }
    

    注意我们在这里同样声明了一个队列。以为我们可能在发送者之前启动接收者,在我们从中获取消息之前我们想要确定这队列是否真实存在。 我们通知服务器通过此队列给我们发送消息。因此服务器会异步的给我们推送消息,在这里我们提供一个回调对象用来缓存消息,直到我们准备好再使用它们。这就是QueueingConsumer所做的事。

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);
    
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }
    

    QueueingConsumer.nextDelivery()在另一个来自服务器的消息到来之前它会一直阻塞着。

    这是整个Recv.java类。

    把所有放在一起

    你可以在RabbitMQ Java客户端的类路径上编译这些文件:

    $ javac -cp rabbitmq-client.jar Send.java Recv.java
    

    为了运行它们,你需要rabbitma-client.jar和它在类路径上的的依赖文件。在一个终端上,运行发送者:

    $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
    

    然后,运行接收者:

    $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
    

    在windows环境中,我们使用分号代替冒号来分隔类路径上的选项。

    接收者将会输出从RabbitMQ中获取到来自发送者的消息。接收者会一直保持运行,等待消息(使用Ctrl-C停止),所以试着用另一个终端运行发送者。 如果你想检验队列,试着使用rabbitmqctl list_queues0

    Hello World!

    时间移动到第二部分,构建一个简单的工作队列。

    提示 为了保存输入,你可以将类路径设置到环境变量中

    $ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
    $ java -cp $CP Send
    

    或者在 Windows环境中:

    > set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
    > java -cp %CP% Send
    
  • 相关阅读:
    eslint 的 env 配置是干嘛使的?
    cookie httpOnly 打勾
    如何定制 antd 的样式(theme)
    剑指 Offer 66. 构建乘积数组
    剑指 Offer 65. 不用加减乘除做加法
    剑指 Offer 62. 圆圈中最后剩下的数字
    剑指 Offer 61. 扑克牌中的顺子
    剑指 Offer 59
    剑指 Offer 58
    剑指 Offer 58
  • 原文地址:https://www.cnblogs.com/shangxiaofei/p/4961567.html
Copyright © 2011-2022 走看看