zoukankan      html  css  js  c++  java
  • python并行任务之生产消费模式

    一. 生产者/消费者模式

    概念:生产者产生一块数据,放到buffer与此同时,消费者在从buffer中取出并消耗这些数据

    理解:像生活中厂家生产出产品,顾客购买消耗这些产品,buffer就是存放商品的仓库。

    二. 生产者/消费者模式在python中的实现

    相关模块:Queue模块

    简单介绍:Python中,队列是线程间最常用的交换数据的形式之一。Queue模块是python中提供队列操作的模块。

    原理:它创建一个"队列"对象(即用于存放数据的buffer), 然后不断产生数据并存入该"队列",同时也在不断

       地从该队列中取出数据。

    具体函数

    (1)创建一个队列对象

    1 >>> import Queue
    2 >>> q = Queue.Queue()

    注:队列长度可为无限或有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。若maxsize小于1则表示队列长度无限,例:

    (2)向队列中存入数据

    方法: q.put(item, block=True, timeout=None)

    >>> q.put('a')
    

    注:put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。

      如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

    (3)从队列中取出数据

    方法: q.get(block=True, timeout=None)

    >>> q.get()

    注:get方法可选参数为block,默认为True。

      如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

    Queue.Queue中常用方法:

    q.qsize()           返回队列的大小
    q.empty()           如果队列为空,返回True,反之False
    q.full()            如果队列已满,返回True,反之False。与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait()         相当q.get(False) 非阻塞 
    q.put(item,timeout)    写入队列,timeout等待时间
    q.put_nowait(item)      相当q.put(item, False)
    q.task_done()         完成一项工作之后, 函数向任务已经完成的队列发送一个信号
    q.join()            表示等到队列为空,再执行别的操作

    实例测试:

    #!/usr/bin/env python
    #coding=utf-8
    
    import threading, time
    import Queue    #导入消息队列模块
    import random   #导入随机数模块,是为了模拟生产者与消费者速度不一致的情形
    
    q = Queue.Queue()         #实例化一个队列对象,当有多个线程共享一个东西的时候就可以用它了
    
    def Producer():           #生产者函数
        for i in range(20):
            q.put(i)          #将结果放入消息队列中
            print '[+] Product %s' %i
            time.sleep(random.randrange(3))    #生产者的生产速度,3s内
    def Consumer():           #消费者函数
        count = 0
        while count < 20:
            data = q.get()    #取用消息队列中存放的结果
            print '[-] Consume %s' %data
            count += 1
            time.sleep(random.randrange(4))    #消费者的消费速度,4s内
    
    producter = threading.Thread(target = Producer)
    consumer = threading.Thread(target = Consumer)
    
    producter.start()
    consumer.start()

     运行结果:

           

     

  • 相关阅读:
    oracle数据库闪回执行步骤——oracle数据库回退
    10.20总结
    10.11总结
    10.10总结
    10.9总结
    10.8总结
    10.7总结
    10.6总结
    10.5总结
    10.4总结
  • 原文地址:https://www.cnblogs.com/ssooking/p/5843417.html
Copyright © 2011-2022 走看看