zoukankan      html  css  js  c++  java
  • RabbitMQ入门之Hello World

    RabbitMQ简介

      在介绍RabbitMQ之前,我们需要了解一些最基础的概念,相信使用过或者听说过RabbitMQ的人都不会陌生,但笔者还是不厌其烦地在这里讲述,因为笔者的理念是self contained。

    • Queue: 队列。计算机数据结构中的一种基本类型,遵循“先入先出”(FIFO)的原则,比如我们日常生活中常见的排队时的队伍就是一个队列。
    • Message Queue: 消息队列,简称MQ。消息队列本质上也是队列,只不过队列中的元素为Message(消息),而消息则是服务之间最常见的通信方式。流行的MQ框架主要有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。
    • AMQP:Advanced Message Queuing Protocol,是一个提供统一消息服务的应用层标准高级消息队列协议,简单来说,它就是一个消息列队的协议,其标准高,要求严。
    • Erlang:Erlang是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。
    • RabbitMQ:RabbitMQ是一个实现了AMQP高级消息队列协议的消息队列服务,用Erlang语言实现。RabbitMQ的运行原理如下图(后续我们会解释其中的含义,现阶段只作为浏览):

    RabbitMQ运行原理

    以上是我们对RabbitMQ的最初认识。接下来我们还需要了解RabbitMQ的下载与安装,如下:

    1. RabbitMQ的下载页面:https://www.rabbitmq.com/download.html
    2. RabbitMQ的安装过程:https://www.rabbitmq.com/download.html#installation-guides
    3. RabbitMQ入门教程: https://www.rabbitmq.com/getstarted.html

      说了这么多,我们为什么要选择RabbitMQ,也就是说它的优势又是什么呢?RabbitMQ的强大之处在于:

    • 可靠性:RabbitMQ使用一些机制来保证可靠性,如持久化、传输确认及发布确认等。
    • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
    • 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
    • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用。
    • 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
    • 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Java、Python、Ruby、PHP、C#、JavaScript等。
    • 管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
    • 插件机制:RabbitMQ提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。

      带着对RabbitMQ的初次见面,我们不妨再了解下如何简单地使用RabbitMQ。

    RabbitMQ入门之Hello World

      在计算机领域中,每次学习一个新事物的惊喜,往往都是伴随着Hello World。在编程语言中,会有输出“Hello World”;在大数据中,“Hello World”就是统计单词的词频;在Docker中,就是使用“Hello World”镜像;在RabbitMQ,这次的“Hello World”就是生产者发送“Hello World”,而消费者输出“Hello World”
      RabbitMQ就是消息代理,它接受并推动消息流动。你可以把它想象成一个邮局:当你把一封信塞进邮箱,你需要确保它能送到收信人的手里。而RabbitMQ就是一个邮箱,邮局,邮递员。不同于真实的邮局(处理信件),RabbitMQ处理接受、存储、推动消息。
      在RabbitMQ,或者消息队列领域中,有如下术语。

    • 生产者(Producer):生产者仅产生消息,也就说一个产生消息的程序就是生产者。对应于邮局的例子,生产者就是寄信人,因为他们产生信件。

    生产者

    • 队列(Queue): 一个队列就是RabbitMQ中的邮箱。尽管消息会在RabbitMQ和应用程序之间流动,但是它们只会在队列中存储。一个队列仅受限于硬盘和内存大小,它是一个大的消息缓存区。许多生产者产生消息后会进入一个队列,许多消费者也会从同一个队列中获取消息。以下是我们如何表示一个队列:

    队列

    • 消费者(Consumer):消费消息与接收消息的意思是一致的。一个消费者往往会等待接收消息。在邮局的例子中,消费者也许就是收信人。

    消费者

      介绍完生产者、队列、消费者后,我们将会来学习RabbitMQ中的Hello World。
      我们使用Python的Pika模块来操作RabbitMQ。在本文中,我们将会编写两个小程序:一个生产者(Producer)发送一条消息,而一个消费者(Consumer)将会接收这个消息并将它输出。这就是消息通信的“Hello World”。
      在下图中,P代表生产者,C代表消费者,中间的盒子代表队列——消息缓存区。我们总的设计图如下:

    总设计图
    生产者会将消息发送至“hello”队列,消费者从从该队列中获取消息。

    发送消息

      在这一部分中,我们将会让生产者来发送消息。

    发送消息
      我们的第一个程序send.py将会发送一个消息至队列。首先我们要做的是建立与RabbitMQ Server的连接。

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    

    我们连接到了本地机器(localhost)的一个代理。如果我们需要连接不同机器的代理,我们只需要声明机器名称以及IP地址即可。
      接着,在我们发送消息之前,我们需要确认队列是否存在。如果我们发送消息到一个不存在的地方,RabbitMQ将会丢失这条消息。因此,我们需要创建一个hello队列,这里将是消息传递的地方。

    channel.queue_declare(queue='hello')
    

    我们已经准备好发送消息了。我们的第一条消息是字符串“Hello World!”,我们将它发送至hello队列。
      在RabbitMQ中,消息不会被直接发送至队列,它需要通过exchange才能做到。在这里我们不需要了解exchange的原理,我们只需要知道,空字符串就代表默认的exchange。该exchange很特殊——它规定了我们的消息往哪个队列走。队列名称需要用routing_key这个参数来声明:

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    

    在退出程序前,我们需要确保网络缓存被清空并且我们的消息确实被传送至RabbitMQ。一般我们通过关闭连接来实现。

    connection.close()
    

    接受消息

      在这一部分中,我们将会让消费者来接受消息。

    接受消息

    我们的第二个程序receive.py将会从队列中接受消息,并把其输出出来。
      同样地,第一步是连接到RabbitMQ Server。这部分的代码与之前的部分相同。
      下一步,更之前一样,需要确保队列存在。使用queue_declare来创建队列是幂等的(idempoten) —— 我们可以运行这条命令很多次,但只会创建一个队列。

    channel.queue_declare(queue='hello')
    

    也许你会好奇我们为什么要再一次声明这个列队,明明我们在之前的代码中已经声明过了。这里这么做主要是为了确保队列已经存在。举例来说,这边是先运行send.py,但我们不能确定哪一个程序会先运行。因此在这样的情况下,在两个程序中反复声明列队是不错的方式。
      从队列中接受消息更加复杂。他需要通过callback函数与列队关联。无论什么时候我们接受到消息,这个callback函数都被会Pika模块调用。在我们的例子中,这个函数将会输出消息的内容。

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    

      下一步,我们需要告诉RabbitMQ,在hello队列中,这个特定的callback函数需要接受消息。

    channel.basic_consume(queue='hello',
                          auto_ack=True,
                          on_message_callback=callback)
    

    auto_ack参数的含义会在后面的文章中解释。
      最后我们创建一个永不停止的循环,用于接收消息:

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

    实践出真知

      上面的部分介绍了“Hello World”的理论方面,接下来,我们会分别使用Python和Java程序来分别实现这个例子。

    Python

      sent.py程序如下:

    # -*- coding: utf-8 -*-
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World from Python!')
    print(" [x] Sent 'Hello World!'")
    connection.close()
    

      receive.py程序如下:

    # -*- coding: utf-8 -*-
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
    
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

    先启动receive.py,程序会提示“ [*] Waiting for messages. To exit press CTRL+C”,表明该消费者在等待接收消息。在运行sent.py,该程序会发送“Hello World from Python!”至队列,同时receive.py会输出该消息。每运行一次sent.py,receive.py会就会输出一个该消息,如下图:

    Java

      我们使用Gradle来构建这个项目,项目结构如下:

    Java项目结构

      在build.gradle中,我们引入第三方jar包,内容如下:

    plugins {
        id 'java'
    }
    
    group 'rabbitmq'
    version '1.0-SNAPSHOT'
    
    sourceCompatibility = 1.8
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        testCompile group: 'junit', name: 'junit', version: '4.12'
        // https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
        compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'
        // https://mvnrepository.com/artifact/org.slf4j/slf4j-api
        compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.26'
        // https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
        testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.26'
    
    }
    

      Send.java代码如下:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    
    public class Send {
    
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                 String message = "Hello World from Java!";
                 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                 System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    

      Recv.java的代码如下:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
    
    

    具体的操作方法同Python一样。

    Python与Java的交互

      如果我们把Python的“Hello World”看成一个简单的小系统,而Java的“Hello World”也看成一个简单的小系统,那么RabbitMQ可以沟通这两个系统,这也是RabbitMQ的一个特定:系统对接。
      我们在Python中运行receive.py,而运行Java的Send.java三次,运行Python的sent.py两次,结果如下:

    Python与Java的交互

    这样的测试结果是令人吃惊的,因为我们用RabbitMQ打通了两个不同语言的系统!

    总结

      本文作为RabbitMQ入门的第一篇,希望能对大家有所帮助。笔者也是初学RabbitM,文章中肯定有不足之处,恳请大家批评指正。
      感谢大家的阅读~

    参考网站

    1. Python操作rabbitmq系列(一): https://zhuanlan.zhihu.com/p/29800710
    2. RabbitMQ教程: https://blog.csdn.net/hellozpc/article/details/81436980
    3. RabbitMQ Tutorials:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
    4. 透彻rabbitmq: https://zhuanlan.zhihu.com/p/63700605
    5. MQ和RabbitMQ作用特点: https://blog.csdn.net/weixin_40792878/article/details/82555791
  • 相关阅读:
    mongodb
    python中读取文件的read、readline、readlines方法区别
    uva 129 Krypton Factor
    hdu 4734
    hdu 5182 PM2.5
    hdu 5179 beautiful number
    hdu 5178 pairs
    hdu 5176 The Experience of Love
    hdu 5175 Misaki's Kiss again
    hdu 5174 Ferries Wheel
  • 原文地址:https://www.cnblogs.com/jclian91/p/12196186.html
Copyright © 2011-2022 走看看