zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记(例子)

    合理使用消息队列(Messaging Queue)可大幅降低网络系统架构的耦合度和复杂度,让各集成部件拥有更灵活的自主弹性。同时异步处理机制在高性能和高可靠性上也有极佳的表现,是一种较理想的集成解决方案。

    ActiveMQZeroMQRabbitMQ 之间徘徊许久,最终还是选择 RabbitMQ。ZeroMQ 和 RabbitMQ 都支持开源消息协议 AMQP,不过 ZeroMQ 暂时不支持消息持久化和崩溃恢复,且稳定度稍差。

    1. 基础概念

    AMQP 有四个非常重要的概念:虚拟机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。

    • 虚拟机: 通常是应用的外在边界,我们可以为不同的虚拟机分配访问权限。虚拟机可持有多个交换机、队列和绑定。
    • 交换机: 从连接通道(Channel)接收消息,并按照特定的路由规则发送给队列。
    • 队列: 消息最终的存储容器,直到消费客户端(Consumer)将其取走。
    • 绑定: 也就是所谓的路由规则,告诉交换机将何种类型的消息发送到某个队列中。

    通常的操作流程是:

    • (1) 消费者: 创建信息通道。
    • (2) 消费者: 定义消息队列。
    • (3) 消费者: 定义特定类型的交换机。
    • (4) 消费者: 设定绑定规则 (包括交换机名称、队列名称以及路由键)。
    • (5) 消费者: 等待消息。
    • (6) 生产者: 创建消息。
    • (7) 生产者: 将消息投递给信息通道 (注明接收交换机名称和路由键)。
    • (8) 交换机: 获取消息,依据交换机类型决定是否匹配路由规则 (如需匹配,则对比消息路由键和绑定路由键)。
    • (9) 消费者: 获取并处理消息,发送反馈。
    • (10) 结束: 关闭通道和连接。

    2. 系统安装

    Ubuntu 默认已经有 Erlang 环境了,直接安装即可。

    $ sudo apt-get install rabbitmq-server


    使用 rabbitmqctl 管理工具查看状态。

    $ sudo rabbitmqctl status
    
    Status of node 'rabbit@yuhen-server64' ...
    [{running_applications,[{rabbit,"RabbitMQ","1.7.2"},
                            {mnesia,"MNESIA  CXC 138 12","4.4.12"},
                            {os_mon,"CPO  CXC 138 46","2.2.4"},
                            {sasl,"SASL  CXC 138 11","2.1.8"},
                            {stdlib,"ERTS  CXC 138 10","1.16.4"},
                            {kernel,"ERTS  CXC 138 10","2.13.4"}]},
     {nodes,['rabbit@yuhen-server64']},
     {running_nodes,['rabbit@yuhen-server64']}]
    ...done.


    安装完成后默认创建 "/" 虚拟主机,和 "guest/guest" 账号。

    别忘了 py-amqplib

    $ sudo easy_install -U amqplib


    3. 详细流程

    (1) 消费者:使用授权账号连接到特定的虚拟机上。

    >>> from amqplib import client_0_8 as amqp
    >>> conn = amqp.Connection(host = "localhost:5672", userid = "guest", password = "guest", virtual_host = "/")


    在实际应用时,我们最好删除默认账号 guest。

    $ sudo rabbitmqctl add_user mymq mypwd
    $ sudo rabbitmqctl set_permissions -p "/" mymq ".*" ".*" ".*"
    $ sudo rabbitmqctl delete_user guest


    这回就不能用默认参数连接了。

    >>> conn = amqp.Connection()
    ------------------------------------------------------------
    Traceback (most recent call last):
    ... ...
    IOError: Socket closed
    
    >>> conn = amqp.Connection(userid = "mymq", password = "mypwd")


    (2) 消费者: 创建信息通道(Channel),定义交换机和队列。

    >>> chan = conn.channel()
    >>> chan.queue_declare(queue = "my_queue", auto_delete = False)
    >>> chan.exchange_declare(exchange = "my_exchange", type = "direct", auto_delete = False)


    队列定义参数:

    • exclusive: 仅创建者可以使用的私有队列,断开后自动删除。
    • auto_delete: 当所有消费客户端连接断开后,是否自动删除队列。

    交换机定义参数:

    • type: 交换机类型,包括 fanout, direct 和 topic。
    • auto_delete: 当所有绑定队列都不再使用时,是否自动删除该交换机。

    如所定义队列和交换机已存在,queue_declare 和 exchange_declare 将直接使用,不会抛出异常。

    交换机类型:

    • Fanout: 不处理路由键,将消息广播给绑定到该交换机的所有队列。
    • Direct: 处理路由键,对消息路径进行全文匹配。消息路由键 "dog" 只能匹配 "dog" 绑定,不匹配 "dog.puppy" 这类绑定。
    • Topic: 处理路由键,按模式匹配路由键。模式符号 "#" 表示一个或多个单词,"*" 仅匹配一个单词。如 "audit.#" 可匹配 "audit.irs.corporate",但 "audit.*" 只匹配 "audit.irs"。

    (3) 消费者: 绑定交换机和队列。

    >>> chan.queue_bind(queue = "my_queue", exchange = "my_exchange", routing_key = "my_msg")


    将指定名称的交换机和队列绑定起来,并指明路由键。交换机可绑定到多个队列(名称不同)上,每个绑定队列都会接受到同一份消息副本。

    (4) 消费者: 等待接收消息。

    >>> while True:
    ...     msg = chan.basic_get("my_queue")
    ...     if msg:
    ...         print msg.body
    ...         chan.basic_ack(msg.delivery_tag)
    ...     else:
    ...         time.sleep(1)
    ...


    通常我们要在接收到消息后,使用 basic_ack() 发送一个回馈,否则这些消息将会被其他连接到该队列的消费客户端再次收取。也可以在 basic_get() 中指定参数 no_ack = True,告知 AMQP 服务器无需等待回馈。

    chan.basic_get(self, queue='', no_ack=False, ticket=None)


    当然,这种循环等待模型不怎么好看,我们可以用回调方式重构一下。

    >>> def callback(msg):
    ...     print current_process().pid, msg.body
    ...     if msg.body == "cancel":
    ...         chan.basic_cancel("x")
    ...         sys.exit(0)
    ...     
    >>> chan.basic_consume(queue = queue, no_ack = True, callback = callback, consumer_tag = "x")
    >>> while True: chan.wait()


    chan.basic_consume 注册一个命名(consumer_tag)回调,chan.wait() 阻塞等待消息接收,chan.basic_cancel() 取消回调。

    (5) 生产者:发送消息,消息中指明路由键。

    生产者无需定义队列、交换机和绑定,只需将消息投递给信息通道即可。

    >>> msg = amqp.Message("message")
    >>> chan.basic_publish(msg, exchange = "my_exchange", routing_key = "my_msg")


    (6) 关闭通道和连接。

    任务结束后,别忘了关系生产和消费者的通道和链接。

    >>> chan.close()
    >>> conn.close()


    Connection 和 channel 都实现了 __enter__ / __exit__,建议使用 with 。

    完整演示源码:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from sys import exit
    from amqplib import client_0_8 as amqp
    from time import sleep
    from multiprocessing import Process, current_process
    
    userid = "mymq"
    password = "mypwd"
    queue = "my_queue"
    exchange = "my_exchange"
    routing_key = "my_msg"
    
    def consumer(queue, exchange, type, routing_key):
        with amqp.Connection(userid = userid, password = password) as conn:
            with conn.channel() as chan:
    
                chan.queue_declare(queue = queue, auto_delete = False)
                chan.exchange_declare(exchange = exchange, type = type, auto_delete = False)
                chan.queue_bind(queue = queue, exchange = exchange, routing_key = routing_key)
    
    #            while True:
    #                msg = chan.basic_get(queue)
    #                if msg:
    #                    print current_process().pid, msg.body
    #                    chan.basic_ack(msg.delivery_tag)
    #                else:
    #                    sleep(1)
    
                def cb(msg):
                    print current_process().pid, msg.body
                    if msg.body == "cancel":
                        chan.basic_cancel("x")
                        print "Consumer Exit!"
                        exit(0)
    
                chan.basic_consume(queue = queue, no_ack = True, callback = cb, consumer_tag = "x")
                while True: chan.wait()
    
    def publisher(exchange, routing_key):
        with amqp.Connection(userid = userid, password = password) as conn:
            with conn.channel() as chan:
                x = 0
                while True:
                    msg = amqp.Message("message {0}".format(x) if x < 10 else "cancel")
                    chan.basic_publish(msg, exchange = exchange, routing_key = routing_key)
    
                    if x >= 10:
                        print "Publisher Exit!"
                        exit(0)
                    else:
                        x += 1
                        sleep(1)
    
    if __name__ == "__main__":
        pub = Process(target = publisher, args = [exchange, routing_key])
        pub.start()
    
        con = Process(target = consumer, args = [queue, exchange, "direct", routing_key])
        con.start()
    
        pub.join()
        con.join()


    --------- 疲倦的分割线 -----------------

    转载自:http://blog.csdn.net/zhangxinrun/article/details/6411845

  • 相关阅读:
    Hello_Area_Description 任务三:Project Tango采集区域描述数据
    智能小车 机器人
    Hello_Depth_Perception 任务二:Project Tango采集深度感知数据
    Project Tango Explorer
    make运行阶段划分
    关于chroot
    xargs命令
    debian配置集锦
    gdb使用技巧
    gdb调试使用autotools工程的项目
  • 原文地址:https://www.cnblogs.com/langqi250/p/2703807.html
Copyright © 2011-2022 走看看