zoukankan      html  css  js  c++  java
  • 工作队列.py

    #对列模式图
    Work Queue背后的主要思想是避免立即执行资源密集型任务的时,需要等待其他任务完成。
    所以我们把任务安排的晚一些,我们封装一个任务到消息中并把它发送到队列,
    一个进程运行在后端发送并最终执行这个工作,当你运行多个消费者的时候这个任务将在他们之间共享。

    send.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'

    """
    生产者/发送方
    """

    import sys
    import pika

    # 远程主机的RabbitMQ Server设置的用户名密码
    credentials = pika.PlainCredentials("admin", "admin123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.102', 5672, '/', credentials))

    # 创建通道
    channel = connection.channel()

    # 声明队列task_queue,RabbitMQ的消息队列机制如果队列不存在那么数据将会被丢掉,下面我们声明一个队列如果不存在创建
    channel.queue_declare(queue='task_queue')

    # 在队列中添加消息
    for i in range(100):
    message = '%s Meassage '% i or "Hello World!"
    # 发送消息
    channel.basic_publish(exchange='',
    routing_key='task_queue',
    body=message,
                    )

    # 发送消息结束,并关闭通道
    print(" [x] Sent %r" % message)

    channel.close()

    receive1.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'

    """
    消费者/接收方
    """

    import time
    import pika

    # 认证信息
    credentials = pika.PlainCredentials("admin", "admin123")
    connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.102", 5672, "/", credentials))
    # 建立通道
    channel = connection.channel()
    # 创建队列
    channel.queue_declare("task_queue")


    # 订阅的回调函数这个订阅回调函数是由pika库来调用的
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body.count(b'.'))
    time.sleep(body.count(b'.'))
    print(" [x] Done")

    # 定义通道消费者参数
    channel.basic_consume(callback,
    queue="task_queue",
    no_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
    channel.start_consuming()

    receive2.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'

    """
    消费者/接收方
    """

    import time
    import pika

    # 认证信息
    credentials = pika.PlainCredentials("admin", "admin123")
    connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.102", 5672, "/", credentials))

    # 建立通道
    channel = connection.channel()

    # 创建队列
    channel.queue_declare("task_queue")


    # 订阅的回调函数这个订阅回调函数是由pika库来调用的
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body.count(b'.'))
    time.sleep(body.count(b'.'))
    print(" [x] Done")

    # 定义通道消费者参数
    channel.basic_consume(callback,
    queue="task_queue",
    no_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
    channel.start_consuming()

    默认RabbitMQ按照顺序发送每一个消息,每个消费者会获得相同的数量消息,这种分发消息的方式称之为循环。

  • 相关阅读:
    第07组 Alpha事后诸葛亮
    第07组 Alpha冲刺(4/4)
    第07组 Alpha冲刺(3/4)
    第07组 Alpha冲刺(2/4)
    第07组 Alpha冲刺(1/4)
    2021-7-15
    2021-7-13工作笔记
    第07组 Beta版本演示
    第07组 Beta冲刺(2/4)
    第07组 Beta冲刺(3/4)
  • 原文地址:https://www.cnblogs.com/luoyan01/p/9734121.html
Copyright © 2011-2022 走看看