zoukankan      html  css  js  c++  java
  • RabbitMQ之工作队列

    工作队列

    工作队列(又称:任务队列Task Queues)是为了避免等待一些占用大量资源、时间的操作,当我们把任务Task当做消息发送队列中,一个运行在后台的工作者worker进程就会取出任务然后处理。

    当有多个works,任务在它们之间共享

    创建任务

    创建任务的new_task.py

    #!/usr/bin/env python
    #-*- coding:utf8 -*-
    import sys 
    import pika
    import logging
    
    logging.basicConfig(format='%(levlename)s:%(message)s',level=logging.CRITICAL)
    
    def send():
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
    
        #创建一个队列,并且设置队列可以持久化,durable=True
        channel.queue_declare(queue='task_queue',durable=True)
    
        #将输入参数按照.号串联起来,后续在消费的时候每个点都sleep一秒钟
        if len(sys.argv) == 1:
            message = "Hello World!"
        else:
            message = '.'.join(sys.argv[1])
                                                                                                      
        #向队列task_queue发送消息,routing_key指定,与queue_declare中对应
        #发送消息为message,对应参数body
        #设置消息持久化
        channel.basic_publish(exchange='',
                routing_key = 'task_queue',
                body = message,
                properties = pika.BasicProperties(delivery_mode=2),
                )
        print " [x] Send %r" % (message,)
    
        connection.close()
    
    if __name__ == '__main__':
        send()
    

    需要重点说明:

    1、消息队列持久化,设置channel.queue_declare中durable参数为True,这样在RabbitMQ-server重启之后,消息不会丢失

    2、消息持久化,设置delivery_mode等于2

    消息持久化

    将消息持久化并不能完全保证不会丢失,以上代码只是告诉RabbitMQ要把消息存放到磁盘上,但是从RabbitMQ收到消息到保存之间还存在很小的时间间隔。RabbitMQ并不是所有的消息都使用fsync(2),可能保存到缓存中,并不一定会写到磁盘中。并不能保证真正的持久化,但可以应付简单的队列工作。

    如果需要持久化,需要修改代码支持事务。

    执行任务

    执行任务,通过work.py来操作

    #!/usr/bin/env python
    #-*-coding:utf8 -*-
    
    import pika
    import time
    
    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
    
        #每遇到一个点,就sleep一秒钟,模拟长时间任务
        sleep_time = body.count('.')*100
        print "slepp_time=%d" % sleep_time
                                                                                                      
        time.sleep(body.count('.')*100)
    
        print " [x] Done"
    
        #任务执行完成之后返回确认包
        #这样对于没有返回确认包的消息就不会丢失
        ch.basic_ack(delivery_tag= method.delivery_tag)
    
    def work():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
        channel = connection.channel()
    
        channel.queue_declare(queue='task_queue',durable=True)
    
        print "[*] Wating for messages.To exit press CTRL+C"
    
        #保证消息的公平分发,设置prefetch告诉同一时刻,不要发送超过1条消息给一个工作者
        #直到它已经处理上一条消息并且做出响应
        channel.basic_qos(prefetch_count=1)
    
        #开始消费消息
        channel.basic_consume(callback,queue='task_queue')
    
        #循环消费
        #channel.start_consuming()
    
    if __name__ == '__main__':
        try:
            work()
        except KeyboardInterrupt,e:
            print "Exit"
    

    重点说明

    1、使用工作队列的一个好处就是能够并行的处理队列。如果任务堆积,只需要添加更多的工作者work即可

    2、对于多个work,RabbitMQ会按照顺序把消息发送给每个消费者,这种方式为轮询(round-robin)

    3、消息响应:如果一个work挂掉,上面代码实现将这个消息发送给其他work,而不是丢弃。

         因此需要消息响应机制,每个work处理完成任务的时候,会发送一个ack,告诉RabbitMQ-server已经收到并处理某条消息,然后RabbitMQ-server释放并删除这条消息。

    4、消息ack没有超时的概念,这样在处理一个非常耗时的消息任务时候就不会出现问题

    5、消息ack默认是开启的,通过no_ack=True标识关闭,在回调函数中basic_ack中

    6、如果忘记调用basic_ack的话,这样消息在程序退出后重新发送,会导致RabbitMQ-server中消息堆积,占用越来越多的内存。通过如下命令进行确认:

    guosong@guosong:~/code/rabbitmq/ch2$ rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues ...
    task_queue	3	0
    ...done.
    

    存在三个堆积的任务

    7、关于队列大小:如果所有的工作者都在处理任务,队列就会被填满。需要留意这个问题,要么添加更多的工作者,要么使用其他策略,例如设置队列大小等。

    send和receive

    ChannelClosed: (406, "PRECONDITION_FAILED - parameters for queue 'rt_test_dba_queue' in vhost '/' not equivalent")
    (406, "PRECONDITION_FAILED - parameters for queue 'rt_test_dba_queue' in vhost '/' not equivalent")

    参考链接

    http://adamlu.net/rabbitmq/tutorial-two-python

    https://pika.readthedocs.org/en/latest/modules/index.html

  • 相关阅读:
    序列模型
    conda安装库时报错Solving environment: failed with initial frozen solve. Retrying with flexible solve.
    OverflowError: mktime argument out of range问题
    Supervised ML-1
    CKE(Collaborative Knowledge Base Embedding for Recommender Systems)笔记
    Word2vec学习
    BERT
    DeText: A Deep Text Ranking Framework with BERT论文笔记
    解决Server returns invalid timezone. Go to 'Advanced' tab and set 'serverTimezone' property manually.(IDEA连接mysql数据库)
    django-settings配置介绍
  • 原文地址:https://www.cnblogs.com/gsblog/p/3821377.html
Copyright © 2011-2022 走看看