zoukankan      html  css  js  c++  java
  • RabbitMQ上手记录–part 3-发送消息

      接上一part<<RabbitMQ上手记录–part 2 - 安装RabbitMQ>>,这里我们来看看如何通过代码实现对RabbitMQ的调用。

    RabbitMQ通常是安装在服务器端,那么要体现它的功能,当然还需要一个客户端来调用。这个客户端通常就是我们的业务系统,那么如何操作RabbitMQ呢?不可能让程序员写底层的传输代码或者局限于用erlang开发语言。RabbitMQ已经贴心的准备好了API Client,没错就是程序员喜闻乐见的API(或者说是调包)。

      RabbitMQ官方(http://www.rabbitmq.com/clients.html)提供了Java,.NET和erlang的API包,Java和.NET是开发业务系统的两大阵型,erlang是RabbitMQ的开发语言,所以官方主推了这三个语言的API包。那其他语言呢?没问题,RabbitMQ虽然没有提供实现,但是给出了”amqp”的URI规范(http://www.rabbitmq.com/uri-spec.html),也就是跟服务器通讯的一套协议,其他开发语言照着这个规范实现就可以了。有了官方的指引文档,加上活跃热心的社区群众,其他语言也有各自的API包,具体可参考http://www.rabbitmq.com/devtools.html

     

    这里采用很火的Python作为示例代码,演示消费者如何订阅消息,生产者如何发布消息。

    准备工作

    1.已安装好RabbitMQ,并确保服务是在运行的()。

    2.有可用的Python环境,并安装了RabbitMQ的API包pika。

     

    开始编码

    a.新建文件rabbitMQConfig.py,代码如下

    import pika, sys
    
    
    def getDefaultChannel():
         credentials = pika.PlainCredentials("guest", "guest")
         conn_params = pika.ConnectionParameters(
             "localhost", credentials=credentials)
         conn_broker = pika.BlockingConnection(conn_params)
         channel = conn_broker.channel()
         return channel

    各行代码大致的意思

    credentials = pika.PlainCredentials("guest", "guest")

    guest是RabbitMQ默认的一个访客账号,只能用于连接本机localhost服务,密码默认也是guest。这里是创建明文的访问账号信息。

    conn_params = pika.ConnectionParameters("localhost", credentials=credentials)

    创建连接参数,有点类似于数据库的连接字符串信息

    conn_broker = pika.BlockingConnection(conn_params)
    channel = conn_broker.channel()

    这里是创建一个连接,并且是一个阻塞的连接,会一直阻塞直到收到服务器的响应。然后创建一个通讯的channel,这个channel是进行后续操作的关键接口。

     

    b.新建文件HelloWorldConsumer.py,代码如下

    import rabbitMQConfig
    import pika
    
    channel = rabbitMQConfig.getDefaultChannel()
    
    channel.exchange_declare(
         exchange="hello-exchange",
         exchange_type="direct",
         passive=False,
         durable=True,
         auto_delete=False)
    
    channel.queue_declare(queue="hello-queue")
    channel.queue_bind(
         queue="hello-queue", exchange="hello-exchange", routing_key="hola")
    
    
    def msg_consumer(channel, method, header, body):
         channel.basic_ack(delivery_tag=method.delivery_tag)
         if body == "quit":
             channel.basic_cancel(consumer_tag="hello-consumer")
             channel.stop_consuming()
         else:
             print body
    
        return
    
    
    channel.basic_consume(
         msg_consumer, queue="hello-queue", consumer_tag="hello-consumer")
    channel.start_consuming()

    代码解释

    在之前创建channel代码的基础上,我们依次完成了以下动作

    channel.exchange_declare(
         exchange="hello-exchange",
        exchange_type="direct",
         passive=False,
         durable=True,
        auto_delete=False)

    创建exchange,名称为”hello-exchange”,类型是”direct”,也就是这个exchange是严格按照routing_key的值来匹配消息队列的。(具体相关的基本概念请参考《RabbitMQ 上手记录-part 1-基础概念》)

    channel.queue_declare(queue="hello-queue")
    channel.queue_bind(
         queue="hello-queue", exchange="hello-exchange", routing_key="hola")

    创建queue,名称为”hello-queue“,并且绑定到”hello-exchange”,关联的routing_key是”hola”。这里的queue就是用于存放消息的消息队列。

     

    def msg_consumer(channel, method, header, body):

    这里定义了收到消息后的回调函数,这个函数会回确认消息已收到,同时会打印出收到的消息或者根据消息内容关闭连接。

     

    channel.basic_consume(
        msg_consumer, queue="hello-queue", consumer_tag="hello-consumer")
    channel.start_consuming()

    最后的就是开始消费这个队列的消息,这是一个阻塞的操作,会一直等待有消息进入队列并推送到客户端。

     

    到此消费者端的代码就完成了,接下来看看生产者端的代码。

    c.新建文件HelloWorldProducer.py

    import sys
    import pika
    import rabbitMQConfig
    
    channel = rabbitMQConfig.getDefaultChannel()
    
    channel.exchange_declare(
         exchange="hello-exchange",
         exchange_type="direct",
         passive=False,
         durable=True,
         auto_delete=False)
    
    msg = sys.argv[1]
    msg_props = pika.BasicProperties()
    msg_props.content_type = "text/plain"
    
    channel.basic_publish(
         body=msg,
         exchange="hello-exchange",
         properties=msg_props,
         routing_key="hola")

    这里的代码前面几行跟消费者的调用代码类似,都是通过配置拿到channel信息,然后声明一个exchange。可能比较奇怪,为什么之前消费者的代码里也调用了同样的代码声明同样的exchange。在声明exchange的时候,系统会判断是否已存在了相同名称的exchange,如果存在了直接返回。

    msg = sys.argv[1]
    msg_props = pika.BasicProperties()
    msg_props.content_type = "text/plain"

    这里定义了需要发送的消息内容,发送的文本从命令行获取,并且设置消息的content_type为”text/plain”

    channel.basic_publish(
        body=msg,
         exchange="hello-exchange",
         properties=msg_props,
         routing_key="hola")

    接下来是发布消息,关键的参数是指定发布到哪个exchange,消息内容,以及routing_key。

    运行

    python HelloWorldConsumer.py

    然后另起一个bash/命令行

    python HelloWorldProducer.py hello

    运行之后在第一个命令行就能看到消息输出

    image

    以上就是最基本的发布消息和订阅消息的过程,后续我们来看看如何实现RabbitMQ集群。

     

  • 相关阅读:
    Redis持久化
    Redis进阶——事务、TTL、排序、消息通知、管道
    行为设计模式
    Redis数据类型
    ASP.NET并发处理
    c# 泛型有什么作用?
    IBatisNet 升级到 .Net Framework 4.0 时发现 IBatisNet 一小BUG
    【Python3】用for循环和while循环分别打印出如下格式“九九乘法表”
    Visual Studio 2010 SP1 WPF程序属性重命名BUG,小心!
    为什么用SOA架构?
  • 原文地址:https://www.cnblogs.com/shenba/p/8727699.html
Copyright © 2011-2022 走看看