zoukankan      html  css  js  c++  java
  • Python 多线程同步队列模型

    Python 多线程同步队列模型


    我面临的问题是有个非常慢的处理逻辑(比如分词、句法),有大量的语料,想用多线程来处理。

    这一个过程可以抽象成一个叫“同步队列”的模型。 具体来讲,有一个生产者(Dispatcher)一方面从语料中读入句子,并且存入队列中,一方面看有没有空闲的消费者(Segmentor),如果有,就把句子从队列中弹出并交给这个空闲的消费者处理。 然后消费者把处理完成的结果交给生产者输出,生产者要保证输出与输入顺序一致。

    消费者是典型的threading,它需要看见生成者的队列,从而从队列中拿一些数据。

    对于生产者,python中有一个叫Queue的module,实现了FIFO的同步队列。 但它只能保证输入与交付消费者的顺序的有序,但不能保障生产者在输出时有序,所以需要一个buffer来保存输出顺序。 程序的模型大概是这样的。有一个master(),用来分发任务。有N个多线程的slave用来处理任务。

    具体程序如下:

    #!/usr/bin/env python
    # real    3m0.263s
    # user    0m0.016s
    # sys     0m0.012s
    
    from time import sleep
    from random import random
    from Queue import Queue
    from threading import Thread, Lock
    
    class Segmentor(Thread):
        def __init__(self, dispatcher):
            Thread.__init__(self)
            self.d = dispatcher
    
        def run(self):
            while True:
                idx, item = self.d.get()
                # segment section
                sleep(random() * 5)
                # output section
                d.output( idx, item )
                self.d.task_done()
    
    class Dispatcher(Queue):
        def __init__(self):
            Queue.__init__(self)
            self.idx = 0
            self.box = {}
            self.lock = Lock()
    
        def output(self, idx, item):
            self.lock.acquire()
            if idx > self.idx:
                self.box[idx] = item
            elif idx == self.idx:
                self._output(item)
                self.idx += 1
                while self.idx in self.box:
                    item = self.box[self.idx]
                    self._output(item)
                    self.idx += 1
    
            self.lock.release()
    
        def _output(self, item):
            print item
    
    if __name__=="__main__":
        d = Dispatcher()
        for i in xrange(4):
            t = Segmentor(d)
            t.daemon = True
            t.start()
    
        num = 0
        for line in open("data", "r"):
            d.put( (num, line.strip()) )
            num += 1
    
        d.join()
    

    在300句的条件下,单线程的处理速度约为750s=12m,开4个线程后3m可以处理完成,并且输出是有序的。

    其他语言应该可以仿照这个方式编写程序,对于没有同步队列的语言,实现时可以参考这个http://hg.python.org/cpython/file/2.7/Lib/Queue.py

     

     

  • 相关阅读:
    C++ 虚基类表指针字节对齐
    C++ 虚函数的内存分配
    虚函数&&虚继承
    内存管理简便复习总结
    stack,heap的区别
    内存泄漏(memory leak)和内存溢出
    php+mysqli预处理技术实现添加、修改及删除多条数据的方法
    JavaScript 常用方法总结
    6个超实用的PHP代码片段
    php 备份数据库代码(生成word,excel,json,xml,sql)
  • 原文地址:https://www.cnblogs.com/timssd/p/7221110.html
Copyright © 2011-2022 走看看