zoukankan      html  css  js  c++  java
  • RabbitMQ--work queues(二)

    封装一个task到一个message,并发送到queue。consumer会去除task并执行这个task。

    这里我们简化了操作,发送消息到队列中,consumer取出消息计算里面'.'号有几个就sleep几秒。

    task.py

    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          ))
    print(" [x] Sent %r" % message)
    connection.close()

    work.py

    #!/usr/bin/env python
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,
                          queue='task_queue')
    
    channel.start_consuming()

    代码解释

    channel.queue_declare(queue='task_queue', durable=True)
    告诉rabbitmq永不丢失queue,即使rabbitmq server挂掉。
    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
        time.sleep( body.count('.') )
        print " [x] Done"
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello')
    
    在先前例子中,如果consumer突然死掉,回丢失掉正在处理的信息。如何避免呢?如果consumer死掉,怎么将这条信息发送的其他consumer呢?就是使用上面代码
    channel.basic_qos(prefetch_count=1)
    
    This tells RabbitMQ not to give more than one message to a worker at a time. 
    Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one.
    Instead, it will dispatch it to the next worker that is not still busy.
    直到消费完这个消息再给consumer派送新的消息,如果没消费完,将消息发送给另一个consumer.
  • 相关阅读:
    数据仓库_Linux(3)
    2.1(构造序对)
    要修改一万个位置的jdk版本
    8个球7个一样重的,有一个偏重,一个天平,如何两次找出偏重的小球
    玄学
    异常:java.lang.NoClassDefFoundError: javax/servlet/jsp/jstl/core/LoopTag
    提高输入效率
    fan
    idea
    打印整数的补码
  • 原文地址:https://www.cnblogs.com/xiaoming279/p/6279756.html
Copyright © 2011-2022 走看看