zoukankan      html  css  js  c++  java
  • 【Python之路Day12】网络篇之Python操作RabbitMQ

    基础知识

    分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦。消息中间件这块在我们前面的学习中,是使用python中的queue模块来提供,但这个模块仅限于在本机的内存中使用,假设这个队列需要其他服务器的程序也访问的话,就需要利用socket了。不过,现成的方案很多,轮子已经有了,我们没有必要反复造轮子。直接拿来用就可以了。

    消息中间件解决方案

    流行的消息队列解决方案很多:

    ZeroMQ,号称最快的消息队列,由于支持的模式特别多: TCP、IPC、inproc、Multicas,基本已经打到替代Socket的地步了。站点地址:http://zeromq.org/

    Kafka,是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目。 一个消息发布订阅系统,现在常用于日志团队使用的工具,如程序将操作日志批量异步的发送到Kafka集群中,而不是保存在本地或者DB中。Kafka可以提供批量提交消息/压缩等,对Producer而言,几乎感觉不到性能的开销。Consumer可以使用Hadoop等其他系统化的存储和数据分析等。站点:http://kafka.apache.org/

    RocketMQ, 阿里开源的一款高性能、高吞吐量的消息中间件, 纯Java开发。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。站点: https://github.com/alibaba/RocketMQ

    RabbitMQ, RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。

    等等....

    使用消息中间件的理由?

    使用消息中间件的10个理由,请参照oschina的这篇博文: http://www.oschina.net/translate/top-10-uses-for-message-queue

    RabbitMQ

    一. 历史

       RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。

       RabbitMQ是由RabbitMQ Technologies Ltd开发并且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在并没有上市。

    官方站点:https://www.rabbitmq.com/

    RabbitMQ中文: https://rabbitmq-into-chinese.readthedocs.io/zh_CN/latest/

    二. 应用架构

     

    这个系统架构图版权属于sunjun041640。

    RabbitMQ Server: 是一种传入服务。 它的角色是维护一条从生产者(Producer) 到 消费者(Consumer)的路线,从而保证数据能够按照指定的方式进行传入。但是也并不是100%的保证,但杜宇普通的应用来说,应该是足够的。当然对于要求可靠性、完整性绝对的场景,可以再走一层数据一致性的guard, 就可以保证了。

    Client A 和 Client B:生产者(Producer), 数据的生产者, 发送方。一个消息(Message)有两个部分:有效载荷(payload) 和 标签(label). 

    • payload: 传入的数据
    • lable: exchange的名字或者说是一个tag, payload的描述信息,而且RabbitMQ是通过这个lable来决定把这个消息(Message)发给那个消费者(Consumer).AMQP仅仅描述了lable, 而RabbitMQ决定了如何使用这个lable的规则。

    Client1, client2, client3: 消费者(Consumer), 接受消息的应用程序。当有消息(Message)到达某个和Consumer关联的某个队列后,RabbitMQ会把它发送Consumer。当然也可能会发送给多个Consumer。

    一个数据从Producer到Consumer,还需要明白三个概念: exchanges, queue 和 bindings

      queue:用于消息存储的缓冲

    Connection: 一个TCP连接

    Channels:虚拟连接。它建立在TCP连接中,数据流动都是channel中进行的。一般情况是起始建立TCP连接,第二部就是建立这个Channel。

    三. 安装RabbitMQ

    Ubuntu/Debian

    apt-get install -y rabbitmq-server 

    RHEL6:

    #安装配置epel源
       # rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
     
    #安装erlang
       # yum -y install erlang
     
    #安装RabbitMQ
       # yum -y install rabbitmq-server

    服务启动/停止

    #停止服务
    #service rabbitmq-server stop
    
    #启动服务
    #service rabbitmq-server start
    
    #重启服务
    #service rabbitmq-server restart

    安装Python API(Python3)

    pip3 install pika
    
    
    #源码安装:
    https://pypi.python.org/pypi/pika
    
    #过程略...

    四. 基本操作

    基于queue(Python2.7中模块是Queue)实现的生产者消费者模型

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    import queue
    import threading
    
    
    message = queue.Queue(10)
    
    
    def producer(i):
        '''
        生产者
        :param i:
        :return:
        '''
        while True:
            message.put(i)
    
    
    def consumer(i):
        '''
        消费者
        :param i:
        :return:
        '''
        while True:
            msg = message.get()
    
    
    for i in range(12):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
    
    for j in range(10):
        t = threading.Thread(target=consumer, args=(j,))
        t.start()
    View Code

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    import pika  #导入模块
    
    ########### 生产者 #################
    
    connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='172.16.30.162') #端口如果是默认的话,不用谢
            )   #建立TCP连接
    
    channel = connection.channel()  #虚拟连接,建立在上面的TCP连接基础上
    #为什么使用Channel,而不用TCP连接?
    #因为对于OS来说,建立和关闭TCP连接是有代价的,尤其是频繁的建立和关闭. 而且TCP的连接数默认在系统内核中也有限制, 这也限制了系统处理高并发的能力.
    #但是,如果在TCP连接中建立 Channel是没有代价的,对于Procuder或者Consumer来讲,可以并发的使用多个channel来进行publish或者receive.
    
    channel.queue_declare(queue='hello')
    #Consumer和Procuder都可以使用 queue_declar创建queue.对某个channel来说,Consumer不能declare一个queue,却可以订阅queue,当然也可以创建私有的queue.
    #这样就只有APP本身才能使用这个queue. queue也可以自动删除,被标记auto-delete的queue在最后一个Consumer unsubscribe后会被自动删除.
    #如果创建一个已经存在的queue是不会有任何影响的, 就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改.
    # queue 对 load balance的处理是完美的,对于多个Consumer来说, RabbitMQ使用循环的方式轮训(Round-robin)来均衡发送给不同的Consumer
    
    
    channel.basic_publish(
        exchange='',             #消息是不能直接发送到队列的,它需要发送到交换机(exchange),下面会谈这个,此处使用一个空字符串来标识.
        routing_key='hello',  #必须指定为队列的名称
        body='How are you'       #消息主体
    )
    
    print('Send ok')
    connection.close()    #关闭连接
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    ################################消费者###########################################
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='172.16.30.162')
    )
    
    channel = connection.channel()
    channel.queue_declare('hello')  #使用queue_declare创建一个队列——我们可以运行这个命令很多次,但是只有一个队列会被创建。
    
    def callback(ch, method, properties, body):
        '''
        回调方法,当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。
        :param ch:  虚拟通道
        :param method: 方法
        :param properties: 消息属性
        :param body:  消息主体
        :return:
        '''
        print('Received: %r' %body)
    
    while True:
        channel.basic_consume(    #需要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:
            callback,
            queue='hello',
            no_ack=True
        )
    
        print('Wating for messages. To exit press CTRL+C')
    
        channel.start_consuming()

    1. 消息确认

    为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。

    消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

    消息响应:默认是开启的 no_ack, 在上面的例子中,我们把标识给置为True(关闭了)。开启后,完成一个任务后,会发送一个响应。

    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
        time.sleep( body.count('.') )
        print " [x] Done"
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello')

    PS: 这一来一往的是提高了消息的安全性,但是,开销是难免的,和默认的不加确认消息相比,这种方案性能肯定会有下降,但是这不就是一种妥协么?就看应用场景了.

    切记, 不要忘记确认(basic_ack)!消息会在你退出程序之后就重新发送,如果它不能够释放没响应的信息,RabbitMQ就会占用越来越多的内存!

    2. 消息持久化

    有个参数durable, 默认情况下,如果没有显式告诉RabbitMQ这条消息需要持久化,那么它(rabbitmq)在自己退出或者崩溃的时候,将会丢失所有队列和消息。

    为了确保不丢失,需要注意:

    必须把队列消息设置为持久化。

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    import pika  #导入模块
    
    connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='172.16.30.162') #端口如果是默认的话,不用谢
            )   #建立TCP连接
    
    channel = connection.channel()  #虚拟连接,建立在上面的TCP连接基础上
    
    #队列持久化
    channel.queue_declare(queue='hello',durable=True)
    
    
    channel.basic_publish(
        exchange='',          #消息是不能直接发送到队列的,它需要发送到交换机(exchange),下面会谈这个,此处使用一个空字符串来标识.
        routing_key='hello',  #必须指定为队列的名称
        body='How are you' ,   #消息主体
        properties=pika.BasicProperties(delivery_mode=2)    #持久化
    
    )
    
    print(" [x] Sent 'Hello World!'")
    connection.close()        #关闭连接
    生产者
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    ################################消费者###########################################
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='172.16.30.162')
    )
    
    channel = connection.channel()
    channel.queue_declare('hello')  #使用queue_declare创建一个队列——我们可以运行这个命令很多次,但是只有一个队列会被创建。
    
    def callback(ch, method, properties, body):
        '''
        回调方法,当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。
        :param ch:  虚拟通道
        :param method: 方法
        :param properties: 消息属性
        :param body:  消息主体
        :return:
        '''
        print('Received: %r' %body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    while True:
        channel.basic_consume(    #需要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:
            callback,
            queue='hello',
            no_ack=False    #no_ack 置为False
        )
    
        print('Wating for messages. To exit press CTRL+C')
    
        channel.start_consuming()
    消费者

    3. 公平调度

    默认消息队列中的数据是按照顺序来消费的。比如有两个workers,处理奇数消息的特别忙,而处理偶数的比较轻松,而RabbitMQ默认的规则还是一如既往的派发消息。它默认才不管你忙不忙!

    可以使用basic_qos方法,并设置 prefetch_count=1,告诉RabbitMQ,在同一时刻,不要发送超过一条消息给一个worker,直到它处理了上一条消息并且做出了响应。这样,RabbitMQ就能把消息分给下一个空闲的Worker了。

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    ################################消费者###########################################
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='172.16.30.162')
    )
    
    channel = connection.channel()
    channel.queue_declare('hello')  #使用queue_declare创建一个队列——我们可以运行这个命令很多次,但是只有一个队列会被创建。
    
    def callback(ch, method, properties, body):
        '''
        回调方法,当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。
        :param ch:  虚拟通道
        :param method: 方法
        :param properties: 消息属性
        :param body:  消息主体
        :return:
        '''
        print('Received: %r' %body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)    #设置 prefetch_count=1,告诉RabbitMQ,在同一时刻,不要发送超过一条消息给一个worker,直到它处理了上一条消息并且做出了响应。
    
    while True:
        channel.basic_consume(    #需要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:
            callback,
            queue='hello',
            no_ack=False
        )
    
        print('Wating for messages. To exit press CTRL+C')
    
        channel.start_consuming()
    消费者

    PS: 如果所有的工作者都处理很繁忙,你的队列就可能会被填满,需要留意这个问题,要么添加更多的Workers,要么使用其他策略!

    4. 发布/订阅

    分发一个消息给多个消费者(Consumers), 这种模式,称为"发布/订阅" 

    发布者只需要把消息发送给exchange。exchange一边从发布者放接受消息,一边推送到队列。

    exchange必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。

    exchange的几个类型:

      direct: 直连, 通过binding key的完全匹配来传递消息到相应的队列中

      topic:主题交换机,exchange将传入的”路由值“ 和”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    • # 表示可以匹配0个 或者多个单词
    • * 表示只能匹配一个单词
    #发送者路由值          队列中
    www.dbq168.com    www.*     #不匹配
    www.dbq168.com    www.#     #匹配成功

      fanout:扇形交换机, 把消息发送给和他关联的所有的队列

      headers: 头交换机

    root@test2-ubunut:~# rabbitmqctl list_exchanges
    Listing exchanges ...
        direct
    amq.direct    direct
    amq.fanout    fanout
    amq.headers    headers
    amq.match    headers
    amq.rabbitmq.log    topic
    amq.rabbitmq.trace    topic
    amq.topic    topic
    direct_logs    direct
    logs    fanout
    ...done.

    fanout,绑定(binding)

    创建一个fanout类型的exchange 和队列, 而后告诉交换机如何发送消息给我们的队列。exchange和queue之间的关系为 绑定(binding)

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    #fanout
    import pika  #导入模块
    
    connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='172.16.30.162') #端口如果是默认的话,不用谢
            )   #建立TCP连接
    
    channel = connection.channel()  #虚拟连接,建立在上面的TCP连接基础上
    
    channel.exchange_declare(exchange='logs',
                             type='fanout')   #使用fanout类型
    
    message = 'baslkdfk2sdf'
    channel.basic_publish(
        exchange='logs',      #消息是不能直接发送到队列的,它需要发送到交换机(exchange),exchange值必须为定义好的exchange值
        routing_key='',
        body=message        #消息主体
    )
    
    print(" [x] Sent %s"%message)
    connection.close()        #关闭连接
    发布
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    # fanout
    ################################消费者###########################################
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='172.16.30.162')
    )
    
    channel = connection.channel()
    channel.exchange_declare(
        exchange='logs',
        type='fanout'
    )
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    channel.queue_bind(
        exchange='logs',
        queue=queue_name
    )
    print('Wating for messages. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        '''
        回调方法,当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。
        :param ch:  虚拟通道
        :param method: 方法
        :param properties: 消息属性
        :param body:  消息主体
        :return:
        '''
        print('Received: %r' %body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    while True:
        channel.basic_consume(    #需要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:
            callback,
            queue=queue_name,
            no_ack=False
        )
        channel.start_consuming()
    订阅

    direct,多个绑定(Multiple bindings)

    多个队列使用相同的绑定键是合法的。上图这个例子中,我们可以添加一个X和Q1之间的绑定,使用black绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    #fanout
    import pika  #导入模块
    
    connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='172.16.30.162') #端口如果是默认的话,不用写
            )   #建立TCP连接
    
    channel = connection.channel()  #虚拟连接,建立在上面的TCP连接基础上
    
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')   #使用direct类型
    
    message = 'info message'
    facility = 'info'      #如果生产者发送一个info消息的话,两个消费者都能收到; 如果发送一个error级别的消息,只有 error级别的能收到..
    
    channel.basic_publish(
        exchange='direct_logs',      #消息是不能直接发送到队列的,它需要发送到交换机(exchange),exchange值必须为定义好的exchange值
        routing_key=facility,
        body=message        #消息主体
    )
    
    print(" [x] Sent %s"%message)
    connection.close()        #关闭连接
    生产
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    # fanout
    ################################消费者###########################################
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='172.16.30.162')
    )
    
    channel = connection.channel()
    channel.exchange_declare(
        exchange='direct_logs',
        type='direct'
    )
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    facility = ['info']   #日志级别定义为info
    for severity in facility:   #递归绑定到队列
        channel.queue_bind(
            exchange='direct_logs',
            queue=queue_name,
            routing_key=severity
        )
    print('Wating for messages. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        '''
        回调方法,当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。
        :param ch:  虚拟通道
        :param method: 方法
        :param properties: 消息属性
        :param body:  消息主体
        :return:
        '''
        print('Received: %r' %body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    while True:
        channel.basic_consume(    #需要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:
            callback,
            queue=queue_name,
            no_ack=False
        )
        channel.start_consuming()
    订阅一(info)
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    # fanout
    ################################消费者###########################################
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='172.16.30.162')
    )
    
    channel = connection.channel()
    channel.exchange_declare(
        exchange='direct_logs',
        type='direct'
    )
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    facility = ['info','warnning','error']   #定义为info, warnning, error的级别
    for severity in facility:   #递归绑定队列
        channel.queue_bind(
            exchange='direct_logs',
            queue=queue_name,
            routing_key=severity
        )
    print('Wating for messages. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        '''
        回调方法,当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。
        :param ch:  虚拟通道
        :param method: 方法
        :param properties: 消息属性
        :param body:  消息主体
        :return:
        '''
        print('Received: %r' %body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    while True:
        channel.basic_consume(    #需要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:
            callback,
            queue=queue_name,
            no_ack=False
        )
        channel.start_consuming()
    订阅二(info,warnning,error)

    Topic

    发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数可以随意,但是不要超过255字节。

    • * (星号) 用来表示一个单词.
    • # (井号) 用来表示任意数量(零个或多个)单词
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    #fanout
    import pika  #导入模块
    
    connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='172.16.30.162') #端口如果是默认的话,不用谢
            )   #建立TCP连接
    
    channel = connection.channel()  #虚拟连接,建立在上面的TCP连接基础上
    
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')   #使用topic类型
    
    message = 'How are you'
    routing_key = 'anonymouns.info'
    
    channel.basic_publish(
        exchange='topic_logs',      #消息是不能直接发送到队列的,它需要发送到交换机(exchange),exchange值必须为定义好的exchange值
        routing_key=routing_key,
        body=message        #消息主体
    )
    
    print(" [x] Sent %s"%message)
    connection.close()        #关闭连接
    生产
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: DBQ(Du Baoqiang)
    
    # fanout
    ################################消费者###########################################
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='172.16.30.162')
    )
    
    channel = connection.channel()
    channel.exchange_declare(
        exchange='topic_logs',
        type='topic'    #topic类型
    )
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    binding_key = ['anonymouns.info']
    
    for items in binding_key:
        channel.queue_bind(
            exchange='topic_logs',
            queue=queue_name,
            routing_key=items
        )
    print('Wating for messages. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        '''
        回调方法,当我们获取到消息的时候,Pika库就会调用此回调函数。这个回调函数会将接收到的消息内容输出到屏幕上。
        :param ch:  虚拟通道
        :param method: 方法
        :param properties: 消息属性
        :param body:  消息主体
        :return:
        '''
        print('Received: %r' %body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    while True:
        channel.basic_consume(    #需要告诉RabbitMQ这个回调函数将会从名为"hello"的队列中接收消息:
            callback,
            queue=queue_name,
            no_ack=False
        )
        channel.start_consuming()
    消费

    更多,请参照官方文档

    参考博客:

    • http://www.cnblogs.com/wupeiqi/articles/5132791.html
    • http://blog.csdn.net/anzhsoft/article/details/19563091
    • http://rabbitmq.mr-ping.com/

    MySQL、ORM框架

    移步另一篇博文>>> 

    Paramiko

    移步另一篇博文>> 

  • 相关阅读:
    PHP生成pdf文档
    PHP将数据库数据批量生成word文档
    三个常用的PHP图表类库
    Javascript——(1)
    python学习HTML之CSS(2)
    python学习HTML之CSS
    python学习之HTML
    python学习之rabbitmq
    第10周15/16/17
    多进程
  • 原文地址:https://www.cnblogs.com/dubq/p/5702663.html
Copyright © 2011-2022 走看看