zoukankan      html  css  js  c++  java
  • 工作队列.py

    #对列模式图
    Work Queue背后的主要思想是避免立即执行资源密集型任务的时,需要等待其他任务完成。
    所以我们把任务安排的晚一些,我们封装一个任务到消息中并把它发送到队列,
    一个进程运行在后端发送并最终执行这个工作,当你运行多个消费者的时候这个任务将在他们之间共享。

    send.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'

    """
    生产者/发送方
    """

    import sys
    import pika

    # 远程主机的RabbitMQ Server设置的用户名密码
    credentials = pika.PlainCredentials("admin", "admin123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.102', 5672, '/', credentials))

    # 创建通道
    channel = connection.channel()

    # 声明队列task_queue,RabbitMQ的消息队列机制如果队列不存在那么数据将会被丢掉,下面我们声明一个队列如果不存在创建
    channel.queue_declare(queue='task_queue')

    # 在队列中添加消息
    for i in range(100):
    message = '%s Meassage '% i or "Hello World!"
    # 发送消息
    channel.basic_publish(exchange='',
    routing_key='task_queue',
    body=message,
                    )

    # 发送消息结束,并关闭通道
    print(" [x] Sent %r" % message)

    channel.close()

    receive1.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'

    """
    消费者/接收方
    """

    import time
    import pika

    # 认证信息
    credentials = pika.PlainCredentials("admin", "admin123")
    connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.102", 5672, "/", credentials))
    # 建立通道
    channel = connection.channel()
    # 创建队列
    channel.queue_declare("task_queue")


    # 订阅的回调函数这个订阅回调函数是由pika库来调用的
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body.count(b'.'))
    time.sleep(body.count(b'.'))
    print(" [x] Done")

    # 定义通道消费者参数
    channel.basic_consume(callback,
    queue="task_queue",
    no_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
    channel.start_consuming()

    receive2.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'

    """
    消费者/接收方
    """

    import time
    import pika

    # 认证信息
    credentials = pika.PlainCredentials("admin", "admin123")
    connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.102", 5672, "/", credentials))

    # 建立通道
    channel = connection.channel()

    # 创建队列
    channel.queue_declare("task_queue")


    # 订阅的回调函数这个订阅回调函数是由pika库来调用的
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body.count(b'.'))
    time.sleep(body.count(b'.'))
    print(" [x] Done")

    # 定义通道消费者参数
    channel.basic_consume(callback,
    queue="task_queue",
    no_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
    channel.start_consuming()

    默认RabbitMQ按照顺序发送每一个消息,每个消费者会获得相同的数量消息,这种分发消息的方式称之为循环。

  • 相关阅读:
    Ext.Net 1.2.0_利用 Ext.Net 自定义 GridPanel Ajax 控件
    ASP.NET_0404_ASP.NET 重定向:页面传值
    程序设计_洗牌程序
    表单/验证表单——千万不要做一个只会拖控件、“照猫画虎”、copy/paste 程序员
    ASP.NET_0204_ASP.NET 重定向:如何将用户重定向到另一页
    Oracle 11g R1(11.1) Joins表连接
    隐藏 iframe 技术——Ajax 时代一个重要的环节
    Ext.Net 1.2.0_改变 Ext.Net.GridPanel 某行或某列的式样
    数据结构冒泡排序和直接插入排序
    XMLHttpRequest——Ajax 时代的到来
  • 原文地址:https://www.cnblogs.com/luoyan01/p/9734121.html
Copyright © 2011-2022 走看看