zoukankan      html  css  js  c++  java
  • RabbitMQ(二) -- Work Queues

    RabbitMQ(一) -- Work Queues

      RabbitMQ使用Work Queues的主要目的是为了避免资源使用密集的任务,它不同于定时任务处理的方式,而是把任务封装为消息添加到队列中。而消息队列正是共享于多个工作者中使用,它们可以随意pop出数据进行处理。

    消息的持久化 Message durability

    为了保证`rabbitmq`意外重启等原因造成的消息丢失,通过设置消息的durable来实现数据的持久化,但是需要生产者和消费者同时设置持久化才能生效。

    需要注意的是,`rabbitmq`并不允许更改已经创建的消息队列的属性,假如之前已经创建过非持久化的hello消息队列,那么会返回一个错误信息。

    设置消息队列的可持久化属性(第二个参数):

    channel.queue_declare(queue='hello', durable=True)

    在消息发送时,需要指定`delivery_mode`来实现消息持久化:

    channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))

    平均分配 Fair dispatch

    `rabbitmq`实现了消息均分的功能,通过设置`basic.qos`方法的`prefetch_count`来实现。它会告诉`rabbitmq`的生产者不要给一个消费者分配过多的任务,也就是说不要在消费者处理完成已经接收到的任务之前分配新的任务。

    channel.basic_qos(prefetch_count=1)

    其中prefetch_count为可以接受处理的任务个数,如果未达到上限rabbitmq会继续向消费者推送任务。

    实例

    生产者

    #!/usr/bin/env python
    # coding=utf-8
    
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    
    for i in range(100):
        message = str(i) + ' Hello World!'
        channel.basic_publish(exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))
        print " [x] Sent %r" % (message,)
        time.sleep(1)
    connection.close()

    消费者

    #!/usr/bin/env python
    # coding=utf-8
    
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    print ' [*] Waiting for messages. To exit press CTRL+C'
    
    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
        time.sleep(2)
        print " [x] Done"
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='task_queue')
    
    channel.start_consuming()
  • 相关阅读:
    java实现二叉树的构建以及三种遍历
    binary-tree-preorder-traversal二叉树的前序遍历
    insertion-sort-list使用插入排序对链表进行排序
    binary-tree-postorder-traversa二叉树的后序遍历
    sort-list
    Redis的数据类型
    在Windows上搭建Redis服务器
    Eureka源码分析
    Eureka概念理解
    Spring Cloud Eureka
  • 原文地址:https://www.cnblogs.com/coder2012/p/4339565.html
Copyright © 2011-2022 走看看