zoukankan      html  css  js  c++  java
  • [rabbitmq] python版本(二) 工作队列

    贴一个写的比较好的rabbitmq小结,部分函数有疑惑可以查这个博客里面:https://www.jianshu.com/p/18ffa93fe5d2

    • p:生产者
    • 中间task queue:消息队列
    • c1,c2:多个消费者

    worker.py

    #!/usr/bin/env python
    import pika
    import time
    
    #连接操作
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    #声明task_queue队列
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    '''
    回调函数:
    关于str前缀u--unicode b--byte r--raw等可以参考这篇文章:https://blog.csdn.net/anlian523/article/details/80504699?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1
    根据body.count()数出来的'.'来决定time.sleep()睡几秒--模拟1秒钟的操作
    为了防止一个worker挂掉后丢失消息,消费者通过该一个ack,告诉rabbitmq已经收到并处理信息,之后rabbitmq释放删除此消息
    '''
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        #显示发送确认信息,告诉服务器消息被处理了
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    #每次只为消费者分配一条信息,处理完之后才分配第二条信息
    channel.basic_qos(prefetch_count=1)
    #channel.basic_consume(callback, queue='hello', no_ack=True) 使用no_ack=True可以关闭消息响应
    channel.basic_consume(queue='task_queue', on_message_callback=callback)
    
    channel.start_consuming()
    

    new_task.py

    #!/usr/bin/env python
    import pika
    import sys
    
    
    #建立连接
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    #声明一个叫task_queue的队列
    channel.queue_declare(queue='task_queue', durable=True)
    
    '''
    第一句是说可以输入命令行参数作为发送的随意的信息
    默认交换机,选定task_queue队列
    '''
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [x] Sent %r" % message)
    connection.close()
    

    运行结果:
    最右边是作为生产者,左边2个(其实是三个,有一个没有拆分到这个界面),可以看到worker采用的是轮询(round-robin)的方式,每个worker依次“执行一次操作”

    日积月累,水滴石穿
  • 相关阅读:
    python基础
    python基础
    python基础
    在hive下使用dual伪表
    mariadb 压缩包gz安装方式
    linux下 mysql5.7.20安装(精华)
    在开启kerberos 后,hbase存在数据命名空间的问题(解决方案)
    LINUX下解决TIME_WAIT等网络问题
    常用Oracle进程资源查询语句(运维必看)
    linux 下oracle 11g静默安装(完整版)
  • 原文地址:https://www.cnblogs.com/lonelyisland/p/12747137.html
Copyright © 2011-2022 走看看