zoukankan      html  css  js  c++  java
  • RabbitMQ 消息队列

    一、前提

    Python中的队列:

    1.线程QUEUE

      线程队列不能跨进程,只是单线程下的多个线程间的数据交互

    2.进程QUEUE

      支持父进程于子进程进行交互,或者同属于同一父进程下的多个子进程进行交互。

      因此,两个独立的程序之间是不能使用Python中的QUEUE实现交互。(因为每个程序是独立的,是一个独立的进程,所以Python的队列无法实现两个独立的程序之间的交互)。

      所以想要实现两个独立的进程间的通信,可以使用下面几种方法:

      如:

      在独立的程序之间建立socket,实现通信;

      将要通信的内容通过json写到硬盘中,a程序写入到硬盘中,b程序中硬盘中读取(耗时);

      使用broker中间商代理,a程序与代理建立socket,b程序也与代理建立socket,a程序的消息发给代理,代理再发给a程序(易于维护)。

      本节主要介绍broker的使用,目前流行的broker有RabbitMQ,ZeroMQ,ActiveMQ,MSMQ。

    二、RabbitMQ 消息队列介绍

      RabbitMQ是用erlang开发的,所以RabbitMQ依赖于erlang语言。在windows上安装和使用RabbitMQ的时候,要先装上erlang语言。

     

      在windows上安装完成erlang和RabbitMQ之后就自动启动,RabbitMQ在任务管理器的“服务”中可以进行查看确认。

      RabbitMQ支持多种语言,如:Java,.NET,Ruby,Python,PHP,JavaScript等,可以在官网 http://www.rabbitmq.com查看相应的语言支持的模块。
      在Python中:

      pika,a pure-Python AMQP 0-9-1 client ( source code ,API reference )  

      Celery ,a distributed task queue for Django and pure Python

      Halgha, an asynchrounous AMQP 0-0-1 client based on libevent ( the source code and docs are on github )

      其中,pika是本文介绍使用的模块。

    三、RabbitMQ基础信息

    在RabbitMQ中,关于在Python中如何使用的官方文档:http://www.rabbitmq.com/tutorials/tutorial-one-python.html

    3.1 RabbitMQ官方介绍

    RabbitMQ是一个消息代理(中间商):它接收并推送消息。

    想象一下,有一个邮政局:当你把想要邮寄出去的信放入邮箱,你可以确定邮政员先生最终可以将你的信发送到你的收信方。类似的,RabbitMQ在这种情况下,就是一个邮箱,邮政局和邮政员先生。

    RabbitMQ与邮政局最大的区别是,邮政局可以发送纸质的信息,但是RabbitMQ不行。RabbitMQ只接收,存储和发送二进制的数据信息。

    RabbitMQ一般情况下使用一些术语来传输信息。

    3.1.1 数据定义

    Producing就是发送。发送信息的程序是生产者。用P来指代。

    Consuming就是接收。接收信息的程序就是消费者。用C来指代。

    queue(队列)是在RabbitMQ内的一个邮箱的名称。尽管消息通过RabbitMQ和你的应用程序传输,信息只能被存储在queue中。队列仅有主机的内存和字旁限制绑定,它本质上是一个大的消息缓冲区。许多生产者可以发送信息到指定的队列,许多的消费者可以从指定的队列中收取信息。用queue_name表示。

    注意:生产者,消费者和代理不必驻留在同一台主机上。在实际大多数应用中,也是如此。

    3.1.2 RabbitMQ库介绍 

    RabbitMQ说的是AMQP 0.9.1,它是一个开放的、通用的消息传递协议。有许多不同语言的RabbitMQ客户端。在这个实例中,我们将使用Pika,这是RabbitMQ团队推荐的Python模块。要安装它,您可以使用pip包管理工具。

    pip install pika

    3.1.3 生产者,消费者和代理之间实现通信的图示 

    在生产者消费者模型中,生产者产生的信息发送隔离Broker,由Broker发送给相应的消费者。

    四、在Python中利用pika模块实现不同类型的队列通信

    实例1:最简单的使用pika实例——Hello World!

    在官方文档中,关于使用pika实现的最简单的Hello World案例中,将用Python写两个小程序。Proceder(发送方)发送信息,Consumer(接收方)接收信息并且打印这些信息。要传输的信息就是'Hello World!'

    在下面的图表中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区。

    我们的整体设计将会是:

    生产者将消息发送到“hello”队列。消费者从该队列接收消息。

     

    Sending

     

    第一个程序send.py将会把单条信息'Hello World!'发送给队列hello。在send.py文件中,我们需要做的第一个事就是建立和RabbitMQ server的连接。

    import pika  #导入模块pika
    
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
    channel=connection.channel()  #建立和RabbitMQ server的连接我们 

    通过上面步骤,我们的send.py小程序实现了连接到本地机器上的broker(代理)。如果我们想要连接到另一台机器上的代理,我们只需在这里指定它的名称或IP地址。

     

    接下来,在发送之前,我们需要确定收件人队列是否存在。如果我们将消息发送到不存在的位置,RabbitMQ将会删除该消息。让我们创建一个名为hello的队列,send发送的消息将会发送到hello队列:

    channel.queue_declare(queue='hello')
    

    在这时,我们准备发送一条消息。我们的第一个消息仅为'Hello World !'字符串。我们想把它发送到我们的hello队列。

     

    在RabbitMQ中,消息从来不能直接发送到队列中,它总是需要通过交换。但是,让我们不要被细节所拖累——如果想要了解关于exchange的内容,可以在本教程的第三部分阅读更多关于交流exchange的内容。现在我们需要知道的是如何使用由空字符串标识的默认交换(exchange='')。这个交换exchange是特殊的——它允许我们精确地指定消息应该发送到哪个队列。在routing_key参数中需要指定队列名称:

    channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')
    print("[x] Sent 'Hello World!'")
    

    在退出程序之前,我们需要确保网络缓冲区已被刷新,并且我们的消息已发送到RabbitMQ。我们可以关闭连接。

    connection.close()

    综上所述:

     1 import pika
     2 #第一步:与RabbitMQ server建立连接
     3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
     4 channel=connection.channel()
     5 
     6 #第二步:生产指定要发送的队列
     7 channel.queue_declare(queue='hello')
     8 
     9 #第三步:通过交换exchange,发送数据到相应的队列;通过routing_key指定queue name;body中存放要传输的信息
    10 channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')
    11 print("[x] Sent 'Hello World!'")
    12 
    13 #第四步:关闭连接
    14 connection.close()

    可能出现的问题:Sending doesn't work!

    如果这是你第一次使用RabbitMQ,而你没有看到“发送”的消息,那么你可能会挠头,想知道到底出了什么问题。可能代理开始时没有足够的空闲磁盘空间(默认情况下,它至少需要200 MB),因此代理拒绝接收消息。检查代理日志文件以确认并减少必要的限制。配置文件文档将向您展示如何设置disk_free_limit。

     

     

    Receiving

     

      

    我们的第二个程序receive.py将从队列queue接收消息并在屏幕上打印它们。

    首先,我们需要连接到RabbitMQ服务器。连接到RabbitMQ的代码和send.py中一样。

    import pika  #导入模块pika
    
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
    channel=connection.channel()  #建立和RabbitMQ server的连接我们 
    

      

    第二步,就像send.py一样,确保队列存在。使用queue_declare创建队列是具有幂等性的——我们可以按照我们喜欢的次数运行这个命令,并且只创建一个命令。

    channel.queue_declare(queue='hello')
    

      您可能会问为什么我们再次声明队列——我们已经在以前的代码中声明了它。如果我们确信队列已经存在,我们可以避免这种情况。例如,如果send.py程序之前运行过。但是我们还不能确定先运行哪个程序。在这种情况下,重复在两个程序中声明队列是很好的做法。

    Listing queues中看RabbitMQ中的queues和messages

    您可能希望看到RabbitMQ中有哪些队列,有多少消息在队列中。可以使用rabbitmqctl工具(作为特权用户):

    sudo rabbitmqctl list_queues
    

    在Windows上,省略了sudo

    rabbitmqctl.bat list_queues
    

      

    从队列接收消息更加复杂。它的工作方式是订阅一个回调函数到一个队列。当我们收到消息时,这个回调函数被Pika库调用。在我们的例子中,这个函数将在屏幕上打印消息的内容。

    def callback(ch,method,properties,body):
        print("[x] Received %r"%body)
    

      

     接下来,我们需要告诉RabbitMQ,这个特定的回调函数应该接收来自我们的hello队列的消息:

    channel.basic_consume(callback,queue='hello',no_ack=True)

    Receiving.py要成功运行,我们必须确保我们希望订阅的队列是存在的。幸运的是,我们对此有信心——我们已经创建了一个上面的队列——使用queue_declare。

    no_ack参数在之后会进行解释。

     

    最后,我们输入一个永无止境的循环,等待数据并在必要时运行回调函数。

    print("[x] Waiting for messages.To exit press CTRL")
    channel.start_consuming()
    

    综上所述:

     1 import pika
     2 #第一步:与RabbitMQ server建立连接
     3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
     4 channel=connection.channel()
     5 
     6 #第二步:生产指定要接收的队列
     7 channel.queue_declare(queue='hello')
     8 
     9 #第三步:用回调函数到队列queue中,接收到信息时回调函数会被Pika库调用
    10 def callback(ch,method,properties,body):
    11     print("[x] Received %r"%body)
    12 
    13 #第四步:告诉RabbitMQ,这个特定的回调函数将接收来自hello队列的信息
    14 channel.basic_consume(callback,queue='hello',no_ack=True)
    15 
    16 #第五步:进入死循环等待数据,在必要的时候运行回调函数。
    17 print("[x] Waiting for messages.To exit press CTRL + C")
    18 channel.start_consuming()

    现在我们就完成了一个一对一的Producer和Consumer的单条消息队列传输。

    首先运行receive.py,它会一直等待数据。再运行send.py之后,Producer会在数据传输完成后停止。Consumer还会继续保持等待数据的状态。(receive.py可以一直运行,但是可能可以被Ctrl+C打断)

    消息分发轮询:

    先运行N个receive.py,再运行send.py。每个receive.py会按照运行的先后相继收到新触发的send.py的消息。每个Consumer轮询地去接收Producer的信息。

    实例2:构建一个简单的工作队列 Work queues

    在上面的第一个"Hello World!"的例子中,我们编写了一些程序来从一个已命名的队列发送和接收消息。在这个过程中,我们将创建一个工作队列work queue,用于在多个工作之间分配耗时的任务。

    工作队列work queues(即任务队列task queues)背后的主要思想是避免立即执行资源密集型任务,并避免进程必须等待它完成。相反,我们可以把任务安排在以后做。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程会弹出任务并最终执行任务。当你运行多个工作时,任务将在他们之间共享。

    Work queues这个概念在web应用程序中特别有用,因为在短HTTP请求窗口中无法处理复杂的任务。

    Preparation

    在教程的"Hello World!"例子中,我们发送了一个包含“Hello World !”的消息。现在我们将发送用于复杂任务的字符串。我们没有现实世界的任务,比如要调整图像大小或者呈现pdf文件,所以让我们通过使用time . sleep()函数来假装我们很忙。我们取字符串中的点dot的个数作为它的复杂度的依据(假设情况);字符串中有多少个dot,就说明处理该条字符串需要多少秒例如,一个“Hello…”字符串中有三个点,因此假定处理该字符串需要三秒钟。

    我们会稍微修改一下send.py,使得Producer允许从命令行发送任意消息。这个修改过的程序将任务调度到我们的工作队列中。我们将其命名为new_task.py:

     1 import sys
     2 import pika
     3 
     4 #step1:create connection with RabbitMQ Server
     5 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
     6 channel=connection.channel()
     7 
     8 #step2:create a named queue
     9 channel.queue_declare(queue='hello')
    10 
    11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数
    12 message=''.join(sys.argv[1:]) or "Hello World!"
    13 channel.basic_publish(exchange='',routing_key='hello',body=message)
    14 print("[x] Sent %r"%message)
    15 
    16 #step4:close
    17 connection.close()

    我们也要稍微修改一下receive.py,使得接收的数据不立即在屏幕上打印,而是等待数据处理(依据字符串的dot数作为处理时长)完成后再向屏幕打印数据。新的Consumer将被命名为work.py:

     1 import pika
     2 import time
     3 
     4 #step1:create connection with RabbitMQ Server
     5 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
     6 channel=connection.channel()
     7 
     8 #step2:create a named queue
     9 channel.queue_declare(queue='hello')
    10 
    11 #step3:define callback function.
    12 def callback(ch,method,properties,body):
    13     print('[x] Received %r'%body)
    14     time.sleep(body.count(b'.'))
    15     print('[x] Done.')
    16 
    17 #step4:when Consumer receive message,the program call callback function to deal with message.
    18 channel.basic_consume(callback,queue='hello',no_ack=True)
    19 
    20 print('[x] Waiting for messages. To exit press "CTRL+C"')
    21 
    22 #write an endless loop.
    23 channel.start_consuming()

    Round-robin dispatching循环调度

    使用任务队列的优点之一是能够很容易地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务,这样就可以很容易地扩大规模。

    首先,让我们同时运行两个worker.py脚本。他们将从队列中获取消息,但具体如何呢?让我们来看看。

    你需要三个控制台。两个将会运行这个worker.py脚本。这些控制台将是我们的两个消费者——C1和C2。

    # shell 1
    python worker.py
    # => [*] Waiting for messages. To exit press CTRL+C
    

      

    # shell 2
    python worker.py
    # => [*] Waiting for messages. To exit press CTRL+C

    在第三个console,可以发布Producer任务:

    # shell 3
    python new_task.py First message.
    python new_task.py Second message..
    python new_task.py Third message...
    python new_task.py Fourth message....
    python new_task.py Fifth message.....

    返回结果:

    # shell 1
    python worker.py
    # => [*] Waiting for messages. To exit press CTRL+C
    # => [x] Received 'First message.'
    # => [x] Received 'Third message...'
    # => [x] Received 'Fifth message.....'
    

      

    # shell 2
    python worker.py
    # => [*] Waiting for messages. To exit press CTRL+C
    # => [x] Received 'Second message..'
    # => [x] Received 'Fourth message....'
    

      默认情况下,RabbitMQ将按顺序将每个消息发送到下一个消费者。平均每个消费者将得到相同数量的消息。这种分配消息的方式称为循环。

     

    Message acknowledgment消息确认

    完成一个任务可能需要几秒钟。如果其中一个消费者开始了一项很长的任务,而只在一定程度上完成了,那么会发生什么呢?使用我们当前的代码,一旦RabbitMQ将消息传递给Consumer,RabbitMQ就会立即从内存中删除队列中的这条信息。在这种情况下,如果Consumer中断,我们将失去这个Consumer正在处理的信息。我们也将丢失所有发送给这个特定Consumer的消息,但是实际上这条信息还没有被Consumer处理。 

    但我们不想失去任何任务信息。如果一个Consumer中断,我们希望把任务交给另一个Consumer。

    为了确保消息不会丢失,RabbitMQ支持消息确认。Consumer在处理完该任务信息后,会给RabbitMQ发送一个ack(全称 acknowledgement),告诉RabbittMQ可以删除它。

    如果一个Consumer中断(它的通道关闭了,连接关闭了,或者TCP连接丢失了),而没有发送ack,RabbitMQ将理解一条消息没有被完全处理,并且将重新排队。如果同时有其他Consumer在运行,它会很快将其转递给另一个Consumer。这样我们就可以确信,即使Consumer偶尔死亡,也不会失去任何信息。 

    其中,没有任何消息超时的限制。RabbitMQ将在Consumer中段后重新传递消息。即使处理消息需要很长时间,也不会由于超时原因导致失败。 

    默认情况下打开消息确认。在前面的例子中,我们明确地通过no_ack = True标记关闭了它们。

    所以,在默认情况下,我们都不用设置no_ack,直接使用默认的no_ack=False即可。这样确保每次Consumer收到处理完消息后给Producer返回ack消息。

    def callback(ch, method, properties, body):
        print ("[x] Received %r" % (body,))
        time.sleep( body.count('.') )
        print ("[x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,queue='hello')  #删除了no_ack=True

      把worker.py脚本中的代码按照上面的代码进行修改,就能保证Consumer中止,未给RabbitMQ发送acknowlegement,队列中的信息不会被删除。再次运行新的Consumer,信息还是会再次发送给新的在运行中的Consumer。

    Forgotten acknowledgment

      忽略basic_ack是一个常见的错误。这是一个简单的错误,但后果却是严重的。当您的客户端退出时,消息将会被重新发送(看起来可能是随机的再次发送),但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未加处理的消息。

      为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:

    rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
    

      如果脚本中忽略了basic_ack,那么队列中的信息会不停地在内存中堆积,队列中已经删除的信息,在内存中还是会放着。

    c:Program FilesRabbitMQ Server
    abbitmq_server-3.6.10sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
    Listing queues
    hello   0       3
    

      通过上面在终端中查看的结果发现,队列中所有的信息已经被Consumer处理完后,messages

    _unacknowleged的数量并未随着Consumer处理掉的信息(序列中删除的信息)的结果而减少相应的条数。

      加上了basic_ack之后,随着Consumer的处理结束,messages_unacknowleged的数量也随之减少,最后降为0.

    Message durability 消息持久化 

    我们已经学会了如何确保即使Consumer中断或停止,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。 

    当RabbitMQ退出或崩溃时,它将忘记队列queues和消息messages。为了将队列和消息持久化,我们需要做两件事.。

    首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了这样做,我们需要宣布它是持久的:

    channel.queue_declare(queue='hello',durable=True)

    尽管这个命令本身是正确的,但它在我们的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您重新定义一个具有不同参数的现有队列,并将对试图执行此操作的任何程序返回一个错误。但是有一个快速的解决方案——让我们用不同的名称来声明一个队列,例如task_queue:

    channel.queue_declare(queue='task_queue', durable=True)

    上面的queue_declare的修改在Producer和Consumer之间都需要修改。

    在这一点上,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要把我们的信息标记为persisitent持久的。标记信息为persisitent,可以通过下面的代码实现:

    channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          ))
    

      

    Note on message persistence

    将消息标记为持久的并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受了消息并保存消息期间,仍然有很短的时间窗口。而且,RabbitMQ对每条消息都不执行fsync(2),它可能只保存到缓存,而不是真正写入磁盘。持久性保证不强,但是对于简单的任务队列来说已经足够了。如果你需要一个更有力的保证,那么你可以使用publisher confirms.

     

    Fair dispatch 消息公平分发

    您可能已经注意到,调度仍然不像我们希望的那样工作。例如,在一个有两个Consumer在运行的情况下,当部分消息需要很长时间处理,部分消息只需极短的时间处理时,一个Consumer会不停地在处理,另一个Consumer几乎没有什么计算量。RabbitMQ对此一无所知,并且仍然还是会轮询地平均分配消息。

    这是因为当消息进入队列时,RabbitMQ才会发送一条消息。它不考虑Consumer未确认的消息messages_noackownledgement的数量。它只是盲目地向n个Consumer发送n个消息。

     

    为了解决上面的这个问题,我们可以是basic.qos方法,将prefetch_count设置为1。

    channel.basic_qos(prefetch_count=1)
    

    这告诉RabbitMQ一次不给一个Consumer发送一个以上的消息。或者,换句话说,在Consumer没有处理并确认之前的消息之前,不要向员工发送新消息。相反,它会把它发送给下一个不在处理的Consumer。

    Note about queue size

     如果所有在运行中的Consumers都在处理中。如果想要对此保持关注,那么可能需要增加更多的Consumer,或者使用message TTL

    最后,下面是实现消息确认+消息和队列持久化+消息公平分发的Work queues:

     new_task.py:

     1 import sys
     2 import pika
     3 
     4 #step1:create connection with RabbitMQ Server
     5 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
     6 channel=connection.channel()
     7 
     8 #step2:create a named queue
     9 channel.queue_declare(queue='hello',durable=True)  #durable=True实现队列持久化
    10 
    11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数
    12 message=''.join(sys.argv[1:]) or "Hello World!"
    13 channel.basic_publish(exchange='',routing_key='hello',body=message,
    14                       properties=pika.BasicProperties(delivery_mode=2,)  #make message persisitent实现消息持久化
    15                       )
    16 print("[x] Sent %r"%message)
    17 
    18 #step4:close
    19 connection.close()

    work.py:

     1 import pika
     2 import time
     3 
     4 #step1:create connection with RabbitMQ Server
     5 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
     6 channel=connection.channel()
     7 
     8 #step2:create a named queue
     9 channel.queue_declare(queue='hello',durable=True)  #durable=True实现队列持久化,C和P都要保持一致
    10 
    11 #step3:define callback function.
    12 def callback(ch,method,properties,body):
    13     print('[x] Received %r'%body)
    14     time.sleep(body.count(b'.'))
    15     print('[x] Done.')
    16     ch.basic_ack(delivery_tag=method.delivery_tag)  #basic_ack是消息确认的不可缺少的一部分,少了它内存中的messages_noacknowledgement不会自动较少
    17 
    18 
    19 #step4:when Consumer receive message,the program call callback function to deal with message.
    20 channel.basic_qos(prefetch_count=1)  #消息公平分发Fair dispatch ,设置RabbitMQ只向每个C发送一条message
    21 channel.basic_consume(callback,queue='hello') #使用no_ack=False的默认值,确保消息确认
    22 
    23 print('[x] Waiting for messages. To exit press "CTRL+C"')
    24 
    25 #write an endless loop.
    26 channel.start_consuming()

    实例3:构建一个简单的日志记录系统:消息发布订阅Publish/Subscribe

    在第二个实例中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都交付给一个worker。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这种模式被称为“发布/订阅”。

    为了说明这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。

    在我们的日志系统中,接收程序Consumer的每一个运行副本都将得到消息。这样我们就能运行一个接收器,并将日志引导到磁盘;同时,我们可以运行另一个接收器,在屏幕上打印日志。

    基本上,发布的日志消息将被所有Consumer接收,这种类型叫做广播broadcast。

     

    Exchanges

    在前两个实例中,Producer发送消息到队列,Consumer从队列中接收消息列。现在,我们来介绍一下RabbitMQ的完整消息传递模型。

    让我们快速回顾一下前面的教程中介绍的内容:

      生产者是发送消息的用户应用程序。

      队列是存储消息的缓冲区。

      消费者是接收消息的用户应用程序。

    RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到哪个队列。

    生产者只能将消息发送到exchange。exchange接收来自生产者的消息,再将消息推送到队列中。

    exchange必须知道如何处理它收到的消息:

      它是否应该附加到特定的队列?

      它应该被附加到许多队列吗?

      或者应该被抛弃。

    这些规则由交换类型定义。

    可用的交换类型有:direct,topic,headers,fanout。

    1.fanout  以类似广播的方式向所有的C发送信息:所有bind到类型为fanout的这个exchange的queue都可以接收信息

    2.direct  通过routingKey和exchange决定的那个唯一的queue可以接收消息

    3.topic 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

    4.headers

     

    在这个实例中,我们将使用fanout类型的exchange来实现。

    channel.exchange_declare(exchange='logs', type='fanout')

    上面的代码声明了类型为fanout,名为logs的exchange。

    fanout交换非常简单。它只是将接收到的所有消息广播到它所知道的所有队列。

    代码实现:

    channel.basic_publish(exchange='log',  #exchange的名称为logs
                          routing_key='',   #不指定序列名称
                          body=message)
                     
    

      

     

    Listing exchanges

    想要内存在RabbitMQ Server中的所有exchanges,也可以使用rabbitmqctl工具:

    rabbitmqctl list_exchanges
    

    在返回的列表中会有一些amq.*交换 和 默认的(未命名)交换。这些exchange是默认创建的,但现在我们不太可能需要使用它们。

     

    The default exchange

    在本教程的前几部分中,我们对交换一无所知,但仍然能够将消息发送到队列。这是因为我们使用的是默认交换,我们通过空字符串(“”)来识别它。

    交换参数是交换的名称。空字符串表示默认或匿名交换:消息被发送到参数routing_key指定的队列中去。

     

    Temporary queues 临时队列

    在前两个实例中,我们使用有指定名称的队列(如:hello)。对我们来说,能够命名队列是至关重要的——我们需要将Consumer指向相同的队列。当您想要在生产者和消费者之间共享队列时,给队列命名是很重要的。

    但在这个实例中,命名队列的情况却并非如此。

    我们想要听到所有日志信息,而不仅仅是他们的子集。我们也只对当前flowing messages感兴趣,而不是旧消息。要解决这两个问题,我们需要两步。

    第一步,当我们连接RabbbitMQ时,我们需要一个新的空队列。我们可以创建一个随机名称的队列或让server选择一个随机队列名称。这个可以通过不在queue_declare()中设置参数queue来实现:

    result = channel.queue_declare()

    result.method.queue包含了一个随机队列名。比方说“amq.gen-JzTY20BRgKO-HjmUJj0wLg”这样的类似的名称。

    第二步,一旦Consumer不和该随机名称的队列连接了,这个队列就可以删除。通过设置exclusive参数实现。

    result = channel.queue_declare(exclusive=True)  
    

    Bindings

     我们已经创建了一个类型为fanout的exchange和一个队列。现在我们需要告诉exchange将消息发送到我们的队列中去。exchange和queue之间的联系被称为binding绑定。

    channel.queue_bind(exchange='logs',
                queue=result.method.queue #result.method.queue中生成一个随机名的queue )

    从现在起,名为logs的exchange将会向我们的队列中添加消息。

    Listing bindings

     我们可以通过rabbitmqctl工具列出现有的bindings。

    rabbitmqctl list_bindings
    

      

    综上所述:

    Producer程序负责发送日志信息,大部分实现和前面两个实例中的Producer没有大的区别。最重要的变化是,我们现在想要把消息发布到名为logs,类型为fanout的exchange中去,而不是默认的exchange。我们需要在发送时提供一个routing_key,但是它的值被忽略了。

    下面是Producer的代码,emit_log.py:

     1 import pika
     2 import sys
     3 
     4 #step1:create connection with RabbitMQ server
     5 connection =pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     6 channel = connection.channel()
     7 
     8 #step2:declare a fanout exchange
     9 channel.exchange_declare(exchange='logs',
    10                          type='fanout')
    11 
    12 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    13 
    14 #step3:send message to exchange 'log'
    15 channel.basic_publish(exchange='logs',
    16                       routing_key='',
    17                       body=message)
    18 print(" [x] Sent %r" % message)
    19 
    20 #step4:close
    21 connection.close()

    正如您所看到的,在建立连接之后,我们声明了exchange。这个步骤是必要的,因为将信息发送给一个不存在的exchange是不行的。 

    如果没有队列绑定到交换中,消息将丢失,但这对我们来说是可以的;如果没有消费者在听,我们可以安全地丢弃这个消息。

     

    receive_logs.py的代码:

     1 import pika
     2 
     3 #step1:create connection with RabbitMQ server
     4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     5 channel = connection.channel()
     6 
     7 #step2:declare a fanout exchange
     8 channel.exchange_declare(exchange='logs',
     9                          type='fanout')
    10 
    11 #step3:declare a random name queue
    12 result = channel.queue_declare(exclusive=True)  #exclusive=True使得生成的随机名称的queue在使用后在内存中删除。
    13 queue_name = result.method.queue  #result.method.queue生成一个随机名称的queue
    14 
    15 #step4:create a bind with logs and queue
    16 channel.queue_bind(exchange='logs',
    17                    queue=queue_name)
    18 
    19 print(' [*] Waiting for logs. To exit press CTRL+C')
    20 
    21 #step5:define a callback function
    22 def callback(ch, method, properties, body):
    23     print(" [x] %r" % body)
    24 
    25 #step6:call a callback function when C receives a message
    26 channel.basic_consume(callback,
    27                       queue=queue_name,
    28                       no_ack=True)
    29 
    30 #step7:write a endless loop
    31 channel.start_consuming()

    Question:为什么在Consumer端需要一个随机名称的queue呢?

    因为Producer不会直接把消息发送给queue,而是先发送给exchange。exchange(fanout类型的)收到消息后,会遍历绑定在这个exchange上的所有的queue,一次将消息发送给所有与之bind的queue。再由quque将消息发送给Consumer。

    由于在这个过程中,queue在Consumer接收处理完数据后就自动删除了。所以就用了随机名称的queue。

    实例4:日志记录系统有选择的接收消息:Routing 

    在实例3中,我们构建了一个简单的日志记录系统。通过这个简单的日志记录系统,我们可以像广播的形式将日志信息发送给许多接收端。

    在实例4中,我们会给它增加一些特性——我们将使它可以只订阅消息的一部分,而不是全部消息。例如,我们将能够将关键error消息直接发给日志文件(以节省磁盘空间),同时还能够在控制台上打印所有日志消息。

    Bindings

    在前面的示例中,我们已经创建了binding。创建binding的代码:

    channel.queue_bind(exchange=exchange_name,queue=queue_name)
    

    绑定binding是交换exchange和队列queue之间的关系。换言之,队列对bind的exchange传来的消息感兴趣。

    绑定binding可以使用一个额外的参数routing_key。为了避免与basic_publish参数的混淆,我们将把它称为绑定键binding_key。下面是创建一个具有键binding_key的绑定binding:

    channel.queue_bind(exchange=exchange_name,queue=queue_name,
                       routing_key='black') #增加了routing_key参数
    

    绑定键binding_key的含义取决于交换类型exchange type。实例3中使用的fanout exchange,只是忽略了它的值。

    Direct exchange

    我们希望扩展实例3中的日志记录系统的功能,依据消息的严重性来过滤消息。

    例如,我们可能想要将日志消息写入磁盘的脚本只接收关键错误error,而不是在警告warning或信息info日志消息上浪费磁盘空间。

    实例3中使用的是fanout exchange,它不会给我们太多的灵活性——它只会无意识地将信息传递给所有bind它的队列。

    想要实现这个扩展的功能,实现日志信息记录的过滤接收,可以使用direct exchange。direct exchange的算法很简单——一条消息传递给队列,queue的绑定键binding_key与publish消息的路由键routing_key完全匹配。

     

    为了说明这一点,请考虑以下设置:

    在这个设置中,我们可以看到direct exchange和两个queues绑定。第一个队列与绑定键为orange的exchange绑定,第二个队列有两个绑定,一个绑定键为black,另一个绑定键位green。

    在这样的设置中,通过routing_key=orange的direct exchange发送的消息,将会被Q1接收。通过routing_key=black或routing_key=green的direct exchange发送的消息,将会被Q2接收。所有其他消息将被丢弃。

    Multiple bindings

     

    用相同的绑定键binding key绑定多个队列是完全合法的。在我们的示例中,我们可以在X和Q1之间添加绑定键black。在这种情况下,direct exchange表现为fanout,并将消息广播到所有匹配队列。一个带有routing_key=black的消息将被发送到Q1和Q2。

    Emitting logs

    我们将在我们的日志系统中使用这个模型。

    我们将发送消息到direct exchange。

    我们将提供日志的严重性log serverity作为路由键routing key。

    这样Consumer将能够选择它想要接收的信息。让我们先关注一下emiiting logs。

     

    需要创建一个exchange:

    channel.exchange_declare(exchange='direct_logs', type='direct')
    

    发送信息:

    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    

    为了简化问题,我们假设“严重性”severity可以是“信息”info、“警告”warning、“错误”error。

    Subscribing

     

    接收信息像实例3一样运行,只有一个例外——我们将为每个serverity创建一个新的binding。

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    

      

    综上所述:

    emit_log_direct.py的代码:

     1 import pika
     2 import sys
     3 
     4 #step1: create connection with RabbitMQ Server
     5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     6 channel = connection.channel()
     7 
     8 #step2: declare a  direct exchange
     9 channel.exchange_declare(exchange='direct_logs',
    10                          type='direct')
    11 
    12 #step3: aacept message severity
    13 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    14 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    15 
    16 #step4: send message to exchange with the specific severity
    17 channel.basic_publish(exchange='direct_logs',
    18                       routing_key=severity,
    19                       body=message)
    20 print(" [x] Sent %r:%r" % (severity, message))
    21 
    22 #step5:close
    23 connection.close()

    receive_logs_direct.py的代码:

     1 import pika
     2 import sys
     3 
     4 #step1: create connection with RabbitMQ server
     5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     6 channel = connection.channel()
     7 
     8 #step2: declare a direct exchange 
     9 channel.exchange_declare(exchange='direct_logs',
    10                          type='direct')
    11 
    12 #step3:declare a random name queue
    13 result = channel.queue_declare(exclusive=True)
    14 queue_name = result.method.queue
    15 
    16 #step4: 
    17 severities = sys.argv[1:]
    18 if not severities:
    19     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    20     sys.exit(1)
    21 
    22 for severity in severities:
    23     channel.queue_bind(exchange='direct_logs',
    24                        queue=queue_name,
    25                        routing_key=severity)
    26 
    27 print(' [*] Waiting for logs. To exit press CTRL+C')
    28 
    29 #step5: define a callback function 
    30 def callback(ch, method, properties, body):
    31     print(" [x] %r:%r" % (method.routing_key, body))
    32 
    33 #step6: call a callback function when receive a message 
    34 channel.basic_consume(callback,
    35                       queue=queue_name,
    36                       no_ack=True)
    37 
    38 #step7: write a endless loop
    39 channel.start_consuming()

    实例5:日志记录系统有选择的接收消息升级版:Topics

    在实例4中,我们改进了日志系统。我们使用了一个direct exchange,而不是使用一个仅能进行虚拟广播的fanout exchange。在实例4中实现了选择性地接收日志的可能性。 

    但是实例4改进过的日志系统还是具有它的局限性——不能基于多个标准进行选择性地接收信息。

    在我们的日志系统中,我们可能希望订阅的不仅是基于严重性severity为标准的日志,还应该基于发出日志的来源。我们可能从syslog unix工具中了解这个概念,该工具根据严重性severity(info / warn / crit…)和设施facility(auth / cron / kern…)来发送日志。 

    这将给我们带来很大的灵活性——我们可以收到来自“cron”和“kern”的关键错误。

    要在我们的日志系统中实现这一点,我们需要了解一个更复杂的topic exchange。

     

    Topic exchange

    发送到topic exchange的消息不能有任意的routing_key——它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定一些与消息相关的特性。一些有效的routing_key示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。在routing_key中可以有任意多个单词,最多可达255个字节。

    绑定键binding key也必须以相同的形式出现。top exchange背后的逻辑类似于direct exchange——一个带有特定routing key的消息将被传送到绑定一个匹配绑定键binding_key的所有队列。然而,绑定键有两个重要的特殊情况: 

      *(star)可以代替一个词。

      #(hash)可以替代零个或更多的词。

    在一个例子中,最简单的解释是:

     

    在这个例子中,我们将发送所有描述动物的信息。消息将发送一个由三个单词组成的routing key(两个点)。routing key中的第一个词将描述一个celerity,第二个词将描述一种colour,第三个词将描述一种species:“< celerity > . <colour> . <species>”。

    我们创建了三个binding key:Q1绑定binding key " * . orange . * ",Q2 绑定binding key" *.*.rabbit "和" lazy.# "。 

    这些绑定可以概括为:

      Q1对所有的orange动物都感兴趣。

      Q2对所有的rabbit都敢兴趣,同时也对所有的lazy的动物感兴趣。

    一个routing_key=" quick.orange.rabbit "的消息,将会被发送到队列Q1,Q2中去;

    一个routing_key=" lazy.orange.elephant "的消息,也将会被发送到队列Q1,Q2中去;

    一个routing_key=" quick.orange.fox "的消息,仅会被发送到队列Q1中去;

    一个routing_key=" lazy.brown.fox "的消息,仅会被发送到队列Q2中去;

    一个routing_key=" lazy.pink.rabbit "的消息,仅会被发送到队列Q2中去,即便它有符合两条binding key " *.*.rabbit "和" lazy.# ";

    一个routing_key=" quick.brown.fox "的消息,没有和任何一条binding key匹配上,所以这条消息会被Q1,Q2丢弃。

     

    如果我们违反规定(routing_key只能是以两个dot分隔的三个单词列表),用一个或四个单词发送一条信息,比如一条信息的routing_key="orange"或"quick.orange.male.rabbit"。这些消息不匹配任何binding key,将丢失。

    另一方面," lazy.orange.male.rabbit "。虽然它有四个单词,但它将匹配最后一个绑定" lazy.# ",并将被传送到队列Q2 。

     

    Topic exchange

    topic exchange很强大,它不仅能实现更复杂的数据筛选接收,还能实现fanout exchange和direct exchange一样的效果。

    当队列与“#”(hash)binding key绑定时,它将接收所有消息,实现了fanout exchange的功能。

    当特殊字符“*”(star)和“#”(hash)没有在binding key中使用时,实现了direct exchange的功能。

     

    综上所述:

    我们将用topic exchange实现一个新的日志系统。

    我们假设日志的routing key将由两个单词组成:" <facility>.<severity>"。

    emit_log_topic.py的代码:

     1 import pika
     2 import sys
     3 
     4 #step1:create a connection with RabbitMQ Server
     5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     6 channel = connection.channel()
     7 
     8 #step2:declare a topic exchange
     9 channel.exchange_declare(exchange='topic_logs', type='topic')
    10 
    11 #step3:receive routing_key message from sys.argv
    12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    13 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    14 
    15 #step4:send message to topic_logs
    16 channel.basic_publish(exchange='topic_logs',
    17                       routing_key=routing_key,
    18                       body=message)
    19 
    20 print(" [x] Sent %r:%r" % (routing_key, message))
    21 
    22 #step5:close.
    23 connection.close()

    receive_logs_topic.py的代码:

     1 import pika
     2 import sys
     3 
     4 #step1:create a connection with RabbitMQ Server
     5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     6 channel = connection.channel()
     7 
     8 #step2:declare a topic exchange
     9 channel.exchange_declare(exchange='topic_logs',
    10                          type='topic')
    11 
    12 #step3:declare a random name queue
    13 result = channel.queue_declare(exclusive=True)
    14 queue_name = result.method.queue
    15 
    16 #step4:receive messages from sys.argv
    17 binding_keys = sys.argv[1:]
    18 if not binding_keys:
    19     sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
    20     sys.exit(1)
    21 
    22 #step5:bind a topic exchange with a queue 
    23 for binding_key in binding_keys:
    24     channel.queue_bind(exchange='topic_logs',
    25                        queue=queue_name,
    26                        routing_key=binding_key)
    27 
    28 print(' [*] Waiting for logs. To exit press CTRL+C')
    29 
    30 #step6:create a callback function
    31 def callback(ch, method, properties, body):
    32     print(" [x] %r:%r" % (method.routing_key, body))
    33 
    34 #step7:call a callback function when receive a message
    35 channel.basic_consume(callback,
    36                       queue=queue_name,
    37                       no_ack=True)
    38 
    39 #step8:write a endless loop
    40 channel.start_consuming()

     

     

    实例6:Remote procedure call (RPC)

    在实例2中,我们学习了如何使用Work Queues在多个工作进程之间分配耗时的任务。(消息公平分发channel.basic_qos(prefetch_count=1) )

    但如果我们需要在远程计算机上运行一个函数并等待结果呢?那是另一回事了。这种模式通常被称为远程过程调用或RPC。

    在实例6中,我们将使用RabbitMQ构建一个RPC系统:客户机client和可伸缩的RPC服务器scalable RPC server。由于我们没有任何可分配的耗时任务,因此我们将创建一个能够返回斐波那契数列的虚拟RPC服务。

     

    Client interface客户端接口

    为了说明RPC服务如何使用,我们将创建一个简单的客户端类。它将公开一个名为call的方法,call方法将发送一个RPC请求和块,直到收到响应:

    fibonacci_rpc = FibonacciRpcClient()
    result = fibonacci_rpc.call(4)
    print("fib(4) is %r" % result)
    

      

    A note on RPC

    虽然RPC在计算中是一个很常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是RPC时,问题就出现了。这样的混淆导致了不可预测的系统结果,并增加了调试的复杂性。与简化软件不同,误用的RPC可能会导致代码无法维护。

    考虑到这一点,有以下建议: 

      确保函数调用来源的明确性,调用的函数是本地的还是远程的清晰可判断。

      用文件记录系统。使组件之间的依赖关系变得清晰。

      处理错误情况。当RPC服务器停机很长时间时,客户机应该如何反应?

    当有疑问要时要避免使用RPC。如果可以,应该使用异步管道asynchronous pipeline(而不是像rpc一样的阻塞进程),异步地将结果推进到下一个计算阶段。

     

    Callback queue

    一般来说,在RabbitMQ上执行RPC是很容易的。客户端发送请求消息,服务器响应消息。为了接收响应,客户端需要发送一个“callback” 队列地址。让我们试一试:

    result = channel.queue_declare(exclusive=True)
    callback_queue = result.method.queue
    
    channel.basic_publish(exchange='',
                          routing_key='rpc_queue',
                          properties=pika.BasicProperties(
                                reply_to = callback_queue,
                                ),
                          body=request)
    
    # ... and some code to read a response message from the callback_queue ...
    

      

    Message properties

    AMQP 0 - 9 - 1协议预先定义了一个14个属性集合。除以下情况外,大多数属性都很少使用:

      delivery_mode:将消息标记为持久性(值为2)或瞬态transient(任何其他值)。您可能还记得实例2中的这个属性。

      content_type:用于描述编码的MIME类型。例如,对于经常使用的JSON编码,将此属性设置为:

    应用程序/ JSON是一种很好的做法。

      reply_to:通常用于命名回调队列callback queue。

      correlationship _id:将RPC响应与请求联系起来.

     

    Correlation id

    在上述方法中,我们建议为每个RPC请求创建一个回调队列。这很低效,但幸运的是有更好的方法——让我们为每个客户端创建一个回调队列。 

    这引发了一个新的问题,在该队列中收到了响应,不清楚响应属于哪个请求。这就是使用correlation_id属性时的情况。我们将为每个请求设置唯一值。稍后,当我们在回调队列中收到消息时,我们将查看correlation_id,并基于correlation_id,我们将能够匹配响应和请求。如果我们看到一个未知的correlationship _id值,我们可以安全地丢弃该消息——它不属于我们的请求。

    您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是由于报错而失败呢?这是由于服务器端可能出现race condition。虽然不太可能,但是RPC服务器可能在发送答案后,但未发送请求确认acknowlegement信息前。如果发生这种情况,重新启动的RPC服务器将再次处理此请求。这就是为什么在客户端我们必须优雅地处理重复的响应,由于RPC服务是幂等的idempotent(意味着可以安全地对失败请求进行重试)。

     

    综上所述:

    我们的RPC将这样工作: 

    当客户机启动时,它创建一个匿名回调队列。

    对于一个RPC请求,客户机发送一条信息(有两个属性:reply_to和correlationship_id):reply_to是用来设置回调序列的,correlation_id是用来设置每个请求的唯一值的。

    请求被发送到一个rpc_queue队列。

    RPC客户端在等待从这个队列中接收请求。当一个请求出现时,它处理完请求的工作并且发送处理结果信息给客户端时,通过检查correlation_id属性找到唯一的该请求,通过reply_to回调队列发送回客户端。

    客户机在回调队列上等待数据。当消息出现时,它检查correlationship _id属性。如果它与请求的值相匹配,则返回对应用程序的响应。

    rpc_server.py的代码:

     1 import pika
     2 
     3 #建立连接
     4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     5 channel = connection.channel()
     6 
     7 #声明队列
     8 channel.queue_declare(queue='rpc_queue')
     9 
    10 #声明fibonacci函数,只假设有效的正整数输入。
    11 def fib(n):
    12     if n == 0:
    13         return 0
    14     elif n == 1:
    15         return 1
    16     else:
    17         return fib(n-1) + fib(n-2)
    18 
    19 
    20 def on_request(ch, method, props, body):
    21     n = int(body)
    22 
    23     print(" [.] fib(%s)" % n)
    24     response = fib(n)
    25 
    26     ch.basic_publish(exchange='',
    27                      routing_key=props.reply_to,
    28                      properties=pika.BasicProperties(correlation_id = 
    29                                                          props.correlation_id),
    30                      body=str(response))
    31     ch.basic_ack(delivery_tag = method.delivery_tag)
    32 
    33 channel.basic_qos(prefetch_count=1)  #为了在多个服务器上平均分配负载,我们需要设置prefetch_count设置。
    34 channel.basic_consume(on_request, queue='rpc_queue')
    35 #我们声明对basic_consume的回调,RPC服务器的核心。在接收请求时执行它。它执行工作并发送响应。
    36 
    37 print(" [x] Awaiting RPC requests")
    38 channel.start_consuming()

     

    rpc_client.py的代码:

     1 import pika
     2 import uuid
     3 
     4 class FibonacciRpcClient(object):
     5     def __init__(self):
     6         self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     7       #建立连接
     8         self.channel = self.connection.channel()
     9 
    10         result = self.channel.queue_declare(exclusive=True)  #声明一个排他的回调队列
    11         self.callback_queue = result.method.queue
    12 
    13         self.channel.basic_consume(self.on_response, no_ack=True,
    14                                    queue=self.callback_queue)
    15 
    16     def on_response(self, ch, method, props, body):
    17         if self.corr_id == props.correlation_id:
    18             self.response = body
    19 
    20     def call(self, n):
    21         self.response = None
    22         self.corr_id = str(uuid.uuid4())
    23         self.channel.basic_publish(exchange='',
    24                                    routing_key='rpc_queue',
    25                                    properties=pika.BasicProperties(
    26                                          reply_to = self.callback_queue,
    27                                          correlation_id = self.corr_id,
    28                                          ),
    29                                    body=str(n))
    30         while self.response is None:
    31             self.connection.process_data_events()
    32         return int(self.response)
    33 
    34 fibonacci_rpc = FibonacciRpcClient()
    35 
    36 print(" [x] Requesting fib(30)")
    37 response = fibonacci_rpc.call(30)
    38 print(" [.] Got %r" % response)
  • 相关阅读:
    FCKeditor的问题
    每天学习一点点(2010年二月)
    Excel使用小技巧
    JavaScript 取页面属性
    附加 数据库错误 5120
    CSS中元素水平居中显示的方法
    css中height:100%不起作用的解决方法
    SQL SERVER数据库开发之存储过程应用(转载)
    双路由器双小型交换机组建公司网络,2个公网IP上网案例(转载)
    如何解决VS2005没有代码智能提示(联想)的问题(转载)
  • 原文地址:https://www.cnblogs.com/zoe233/p/7327770.html
Copyright © 2011-2022 走看看