zoukankan      html  css  js  c++  java
  • python 线程之 数据同步 Queue

    Queue:将数据从一个线程发往另外一个线程比较通用的方式是使用queue模块的Queue类

    1, 首先创建一个Queue模块的对象,创建Queue对象可以传递maxsize也可以不传递

    2. 使用对象的put和get函数用来添加和移除元素。

     1 import _thread
     2 import queue
     3 import time
     4 
     5 consumercount = 2
     6 producercount = 4
     7 nummessages = 4
     8 
     9 safeprint = _thread.allocate_lock()
    10 dataQueue = queue.Queue()
    11 
    12 
    13 def producer(idnum):
    14     for msgnum in range(nummessages):
    15         time.sleep(idnum)
    16         dataQueue.put('producer id = {0},count={1}'.format(idnum, msgnum))
    17 
    18 def consumer(idnum):
    19     while True:
    20         time.sleep(0.1)
    21         try:
    22             data = dataQueue.get(block=False)
    23         except queue.Empty:
    24             pass
    25         else:
    26             with safeprint:
    27                 print('consumer id = {0}, got => {1}'.format(idnum, data))
    28 
    29 if __name__ == "__main__":
    30     for i in range(producercount):
    31         _thread.start_new_thread(producer, (i,))
    32     for i in range(consumercount):
    33         _thread.start_new_thread(consumer, (i,))
    34     time.sleep(10)
    35     print('main process existing')
    View Code

    在调用get的方法时,如果设置block为True,那么队列为空的时候调用get方法,会使调用线程阻塞。

    当使用队列,如何对生产者和消费者的关闭过程进行同步。一般情况下使用一个特殊的终止值,当将这个值放入队列时消费者线程退出

     1 import queue
     2 import time
     3 import threading
     4 
     5 _sentinel = object()
     6 dataQueue = queue.Queue()
     7 
     8 
     9 def producer(out_q):
    10     n = 0
    11     while n < 10:
    12         n += 1
    13         dataQueue.put('input {0}'.format(n))
    14 
    15     dataQueue.put(_sentinel)
    16 
    17 
    18 def consumer(in_q):
    19     while True:
    20         data = in_q.get(block=False)
    21         if data is _sentinel:
    22             in_q.put(_sentinel)
    23             break
    24         print(data)
    25         time.sleep(1)
    26 
    27 threading.Thread(target=producer, args=(dataQueue,)).start()
    28 threading.Thread(target=consumer, args=(dataQueue,)).start()
    View Code

    在这个实例中,当消费者接收到这个特殊的终止值后,会立刻将其重新放回到队列中,这么做使得在同一队列上监听其他消费者线程也能接收到终止值。因此可以一个一个地将他们都关闭掉。

    Queue为线程安全的类型,所以在添加和移除的过程中会自动获取所需的锁。

  • 相关阅读:
    【数据挖掘导论】——绪论
    Debian Customer PPA RFC (by quqi99)
    uva 11248 Frequency Hopping (最大流)
    非常easy的JAVA反射教程
    【Spark】RDD操作具体解释4——Action算子
    NHibernate剖析:Mapping篇之Mapping-By-Code(1):概览
    eclipse中文凝视字体太小解决方法
    cocos2d-x-3.x bringToFront &amp; sendToBack实现
    POJ 1018 Communication System 题解
    监听器和 利 用观察者设计模式设计一个程序
  • 原文地址:https://www.cnblogs.com/someoneHan/p/6220264.html
Copyright © 2011-2022 走看看