zoukankan      html  css  js  c++  java
  • python使用rabbitMQ介绍二(工作队列模式)

    一模式介绍

    第一章节的生产-消费者模式,是非常简单的模式,一发一收。在实际的应用中,消费者有的时候需要工作较长的时间,则需要增加消费者。

    队列模型:

    这时mq实现了一下几个功能:

    • rabbitmq循环调度,将消息循环发送给不同的消费者
    • 消息确认机制。为了确保一个消息不会丢失,RabbitMQ支持消息的确认 , 一个 ack(acknowlegement) 是从消费者端发送一个确认去告诉RabbitMQ 消息已经接收了、处理了,RabbitMQ可以释放并删除掉了。如果一个消费者死掉了(channel关闭、connection关闭、或者TCP连接断开了)而没有发送ack,RabbitMQ 就会认为这个消息没有被消费者处理,并会重新发送到生产者的队列里,如果同时有另外一个消费者在线,rabbitmq将会将消息很快转发到另外一个消费者中。 那样的话你就能确保虽然一个消费者死掉,但消息不会丢失。 这个是没有超时的,当消费方(consumer)死掉后RabbitMQ会重新转发消息,即使处理这个消息需要很长很长时间也没有问题。消息的 acknowlegments 默认是打开的,在前面的例子中关闭了: no_ack = True . 现在删除这个标识 然后 发送一个 acknowledgment。
    • 消息持久化,将消息写入硬盘中。
    • 公平调度。在一个消费者未处理完一个消息之前不要分发新的消息给它,而是将这个新消息分发给另一个不是很忙的消费者进行处理。为了解决这个问题我们可以在消费者代码中使用 channel.basic.qos ( prefetch_count = 1 ),将消费者设置为公平调度。

    二 代码示例

    生产者:

     1 #!/usr/bin/env python
     2 import pika
     3 import sys
     4 
     5 parameters = pika.ConnectionParameters(host='localhost')
     6 connection = pika.BlockingConnection(parameters)
     7 
     8 channel = connection.channel()
     9 channel.queue_declare(queue='task_queue', durable=True)
    10 
    11 for i in range(10):
    12     message = 'Hello World: {}'.format(i)
    13     channel.basic_publish(exchange='',
    14                           routing_key='task_queue',
    15                           body=message,
    16                           properties=pika.BasicProperties(delivery_mode=2))
    17     print(" [x] Sent %r " % message)
    18 
    19 connection.close()
    View Code

    消费者:

    #!/usr/bin/env python
    import pika
    import time
    
    parameters = pika.ConnectionParameters(host='localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Warting for messages. To exit press CTRL+C')
    
    
    def call_back(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(call_back, queue='task_queue')
    channel.start_consuming()
    View Code

    生产者打印输出:

     1 Task:
     2  [x] Sent 'Hello World: 0' 
     3  [x] Sent 'Hello World: 1' 
     4  [x] Sent 'Hello World: 2' 
     5  [x] Sent 'Hello World: 3' 
     6  [x] Sent 'Hello World: 4' 
     7  [x] Sent 'Hello World: 5' 
     8  [x] Sent 'Hello World: 6' 
     9  [x] Sent 'Hello World: 7' 
    10  [x] Sent 'Hello World: 8' 
    11  [x] Sent 'Hello World: 9'

    woerk1输出:

    [*] Warting for messages. To exit press CTRL+C
    
     [x] Received b'Hello World: 0'
    
     [x] Done
    
     [x] Received b'Hello World: 2'
    
     [x] Done
    
     [x] Received b'Hello World: 4'
    
     [x] Done
    
     [x] Received b'Hello World: 6'
    
     [x] Done
    
     [x] Received b'Hello World: 7'
    
     [x] Done
    
     [x] Received b'Hello World: 9'
    
     [x] Done

    worker二输出:

    [*] Warting for messages. To exit press CTRL+C
     [x] Received b'Hello World: 1'
     [x] Done
     [x] Received b'Hello World: 3'
     [x] Done
     [x] Received b'Hello World: 5'
     [x] Done
     [x] Received b'Hello World: 8'
     [x] Done

    三 队列信息

    在web管理页面,可以看到channel情况

     

    在queue页面,可以看到

     

     

  • 相关阅读:
    React Native 使用 react-native-webview 渲染 HTML
    如何对 React 函数式组件进行优化?
    如何在前端中使用protobuf?
    Grunt之预处理
    基于Hooks 的 Redux 速成课
    AssemblyScript 入门指南
    webpack常用构建优化总览
    如何在前端中使用protobuf(node篇)
    哪种编程语言最适合区块链?
    hdu 相遇周期
  • 原文地址:https://www.cnblogs.com/StitchSun/p/9392701.html
Copyright © 2011-2022 走看看