zoukankan      html  css  js  c++  java
  • python实现指定目录下JAVA文件单词计数的多进程版本

      

           要说明的是, 串行版本足够快了, 在我的酷睿双核 debian7.6 下运行只要 0.2s , 简直是难以超越。 多进程版本难以避免大量的进程创建和数据同步与传输开销, 性能反而不如串行版本, 只能作为学习的示例了。 以后再优化吧。

     

           并发程序设计的两种基本模式:

              1.  将大数据集分解为多个小数据集并行处理后合并。 其难点在于负载均衡。

              2.  将一个复杂任务分解为多个子任务流水线并发处理。 其难点在于子任务之间的协调与同步。 发送者与接收者必须制定某种协议,避免接收者过早退出。

     

          实际场景:

              1.  任务处理。 将一个复杂任务分解为多个子任务流水线处理(多进程), 在每个子任务中并行地处理整个数据集(多线程)。

              2.  现实模拟。 每个对象都是一个并发活动原子, 对象之间靠消息传递和资源互斥同步来约束彼此行为。

      

           一个重要的教训是: 并发程序设计越复杂, 就越难控制程序进程和运行的稳定性, 并发程序的微妙之处让优化显得无力。  

     

           以下提供了两个多进程版本的实现。 我的实际想法是, 使用三个进程, 一个是文件读取进程, 内部使用多线程来读取文件, 一个是单词解析进程, 内部使用多线程来处理单词解析, 一个是主进程。 由于 python GIL 锁的缘故, 无法使用多线程来达到充分利用并发的优势。

           第一个版本说明:

           1.  WordReading 内部使用多个进程读取文件, WordAnalyzing 内部使用多个进程解析单词。 注意, 由于封装良好的缘故, 可以自由改变内部的实现(串行变并发), 对外的接口保持不变;

           2.  由于大量文件行传输需要大量的同步开销, 因此 WordReading 一次性读取完所有文件行传输给 WordAnalysing , 两个子任务仍然是串行的;

           3.  使用多重队列原本是想避免多个生产者和多个消费者对一个队列读写的激烈竞争, 由于两个子任务是串行的, 因此没排上用场。 

       

         第二个版本说明:

           1.  主要思想是,WordReading 每次只读取一部分文件的文件行, 然后传输给 WordAnalyzing 进行解析; 这样两个子任务是并发的。 

           2.  难点在于: 难以仅仅通过队列来判断文件行是读完了, 还是正在读只是暂时没有输出。程序中通过非正常消息 EOF FINISHED 标识, 正常消息是 list , 结束消息是字符串, 不会出错。

           3.  文件读取是采用线程启动的, 文件行解析在主进程中运行, 两者是并发的。

           4.  采用多重队列时, 结束消息标识可能写在任意一个队列。 当检测到结束消息时, 不能立即退出, 而是记下这个队列, 后续取消息不再从这个队列取,直到所有消息都取出完毕。 

      

         第一个版本:

    #-------------------------------------------------------------------------------
    # Name:        wordstat_multiprocessing.py
    # Purpose:     statistic words in java files of given directory by multiprocessing
    #
    # Author:      qin.shuq
    #
    # Created:     09/10/2014
    # Copyright:   (c) qin.shuq 2014
    # Licence:     <your licence>
    #-------------------------------------------------------------------------------
    
    import re
    import os
    import time
    import logging
    from Queue import Empty
    from multiprocessing import Process, Manager, Pool, Pipe, cpu_count
    
    LOG_LEVELS = {
        'DEBUG': logging.DEBUG, 'INFO': logging.INFO,
        'WARN': logging.WARNING, 'ERROR': logging.ERROR,
        'CRITICAL': logging.CRITICAL
    }
    
    ncpu = cpu_count()
    
    def initlog(filename) :
    
        logger = logging.getLogger()
        hdlr = logging.FileHandler(filename)
        formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
        hdlr.setFormatter(formatter)
        logger.addHandler(hdlr)
        logger.setLevel(LOG_LEVELS['INFO'])
    
        return logger
    
    
    errlog = initlog("error.log")
    infolog = initlog("info.log")
    
    
    class FileObtainer(object):
    
        def __init__(self, dirpath, fileFilterFunc=None):
            self.dirpath = dirpath
            self.fileFilterFunc = fileFilterFunc
    
        def findAllFilesInDir(self):
            files = []
            for path, dirs, filenames in os.walk(self.dirpath):
                if len(filenames) > 0:
                    for filename in filenames:
                        files.append(path+'/'+filename)
    
            if self.fileFilterFunc is None:
                return files
            else:
                return filter(self.fileFilterFunc, files)
    
    class MultiQueue(object):
    
        def __init__(self, qnum, timeout):
            manager = Manager()
            self.timeout = timeout
            self.qnum = qnum
            self.queues = []
            self.pindex = 0
            for i in range(self.qnum):
                qLines = manager.Queue()
                self.queues.append(qLines)
    
        def put(self, obj):
            self.queues[self.pindex].put(obj)
            self.pindex = (self.pindex+1) % self.qnum
    
        def get(self):
            for i in range(self.qnum):
                try:
                    obj = self.queues[i].get(True, self.timeout)
                    return obj
                except Empty, emp:
                    print 'Not Get.'
                    errlog.error('In WordReading:' + str(emp))
            return None
    
    def readFile(filename):
        try:
            f = open(filename, 'r')
            lines = f.readlines()
            infolog.info('[successful read file %s]
    ' % filename)
            f.close()
            return lines
        except IOError, err:
            errorInfo = 'file %s Not found 
    ' % filename
            errlog.error(errorInfo)
            return []
    
    def batchReadFiles(fileList, ioPool, mq):
        futureResult = []
        for filename in fileList:
            futureResult.append(ioPool.apply_async(readFile, args=(filename,)))
        
        allLines = []
        for res in futureResult:
            allLines.extend(res.get())
        mq.put(allLines)
    
    
    class WordReading(object):
    
        def __init__(self, allFiles, mq):
            self.allFiles = allFiles
            self.mq = mq
            self.ioPool = Pool(ncpu*3)
            infolog.info('WordReading Initialized')
        
        def run(self):
            fileNum = len(allFiles)
            batchReadFiles(self.allFiles, self.ioPool, self.mq)
    
    def processLines(lines):
        result = {}
        linesContent = ''.join(lines)
        matches = WordAnalyzing.wordRegex.findall(linesContent)
        if matches:
            for word in matches:
                if result.get(word) is None:
                    result[word] = 0
                result[word] += 1
        return result
    
    def mergeToSrcMap(srcMap, destMap):
        for key, value in destMap.iteritems():
            if srcMap.get(key):
                srcMap[key] = srcMap.get(key)+destMap.get(key)
            else:
                srcMap[key] = destMap.get(key)
        return srcMap
    
    class WordAnalyzing(object):
        '''
         return Map<Word, count>  the occurrence times of each word
        '''
        wordRegex = re.compile("[w]+")
    
        def __init__(self, mq, conn):
            self.mq = mq
            self.cpuPool = Pool(ncpu)
            self.conn = conn
            self.resultMap = {}
    
            infolog.info('WordAnalyzing Initialized')
    
        def run(self):
            starttime = time.time()
            lines = []
            futureResult = []
            while True:
                lines = self.mq.get()
                if lines is None:
                    break
                futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,)))
    
            resultMap = {}
            for res in futureResult:
                mergeToSrcMap(self.resultMap, res.get())
            endtime = time.time()
            print 'WordAnalyzing analyze cost: ', (endtime-starttime)*1000 , 'ms'
    
            self.conn.send('OK')
            self.conn.close()
    
        def obtainResult(self):
            return self.resultMap
    
    
    class PostProcessing(object):
    
        def __init__(self, resultMap):
            self.resultMap = resultMap
    
        def sortByValue(self):
            return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True)
    
        def obtainTopN(self, topN):
            sortedResult = self.sortByValue()
            sortedNum = len(sortedResult)
            topN = sortedNum if topN > sortedNum else topN
            for i in range(topN):
                topi = sortedResult[i]
                print topi[0], ' counts: ', topi[1]
    
    if __name__ == "__main__":
    
        dirpath = "/home/lovesqcc/workspace/java/javastudy/src/"
    
        if not os.path.exists(dirpath):
            print 'dir %s not found.' % dirpath
            exit(1)
    
        fileObtainer = FileObtainer(dirpath, lambda f: f.endswith('.java'))
        allFiles = fileObtainer.findAllFilesInDir()
        
        mqTimeout = 0.01
        mqNum = 1
    
        mq = MultiQueue(mqNum, timeout=mqTimeout)
        p_conn, c_conn = Pipe()
        wr = WordReading(allFiles, mq)
        wa = WordAnalyzing(mq, c_conn)
    
        wr.run()
        wa.run()
    
        msg = p_conn.recv()
        if msg == 'OK':
            pass
    
        # taking less time, parallel not needed.
        postproc = PostProcessing(wa.obtainResult())
        postproc.obtainTopN(30)
    
        print 'exit the program.'

       第二个版本: 

    #-------------------------------------------------------------------------------
    # Name:        wordstat_multiprocessing.py
    # Purpose:     statistic words in java files of given directory by multiprocessing
    #
    # Author:      qin.shuq
    #
    # Created:     09/10/2014
    # Copyright:   (c) qin.shuq 2014
    # Licence:     <your licence>
    #-------------------------------------------------------------------------------
    
    import re
    import os
    import time
    import logging
    import threading
    from Queue import Empty
    from multiprocessing import Process, Manager, Pool, Pipe, cpu_count
    
    LOG_LEVELS = {
        'DEBUG': logging.DEBUG, 'INFO': logging.INFO,
        'WARN': logging.WARNING, 'ERROR': logging.ERROR,
        'CRITICAL': logging.CRITICAL
    }
    
    ncpu = cpu_count()
    
    CompletedMsg = "EOF FINISHED"
    
    def initlog(filename) :
    
        logger = logging.getLogger()
        hdlr = logging.FileHandler(filename)
        formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
        hdlr.setFormatter(formatter)
        logger.addHandler(hdlr)
        logger.setLevel(LOG_LEVELS['INFO'])
    
        return logger
    
    
    errlog = initlog("error.log")
    infolog = initlog("info.log")
    
    
    class FileObtainer(object):
    
        def __init__(self, dirpath, fileFilterFunc=None):
            self.dirpath = dirpath
            self.fileFilterFunc = fileFilterFunc
    
        def findAllFilesInDir(self):
            files = []
            for path, dirs, filenames in os.walk(self.dirpath):
                if len(filenames) > 0:
                    for filename in filenames:
                        files.append(path+'/'+filename)
    
            if self.fileFilterFunc is None:
                return files
            else:
                return filter(self.fileFilterFunc, files)
    
    class MultiQueue(object):
    
        def __init__(self, qnum, CompletedMsg, timeout=0.01):
            manager = Manager()
            self.timeout = timeout
            self.qnum = qnum
            self.CompletedMsg = CompletedMsg
            self.queues = []
            self.pindex = 0
            self.endIndex = -1
            for i in range(self.qnum):
                qLines = manager.Queue()
                self.queues.append(qLines)
    
        def put(self, obj):
            self.queues[self.pindex].put(obj)
            self.pindex = (self.pindex+1) % self.qnum
    
        def get(self, timeout=0.01):
            for i in range(self.qnum):
                if i != self.endIndex:
                    try:
                        obj = self.queues[i].get(True, timeout)
                        if obj == self.CompletedMsg:
                            self.endIndex = i   # this queue contains 'finsh flag' msg
                            self.queues[i].put(self.CompletedMsg)
                            continue
                        return obj
                    except Empty, emp:
                        errlog.error('In WordReading:' + str(emp))
            if self.endIndex != -1:
                return self.CompletedMsg
            return None
    
    def readFile(filename):
        try:
            f = open(filename, 'r')
            lines = f.readlines()
            infolog.info('[successful read file %s]
    ' % filename)
            f.close()
            return lines
        except IOError, err:
            errorInfo = 'file %s Not found 
    ' % filename
            errlog.error(errorInfo)
            return []
    
    
    def divideNParts(total, N):
        '''
           divide [0, total) into N parts:
            return [(0, total/N), (total/N, 2M/N), ((N-1)*total/N, total)]
        '''
    
        each = total / N
        parts = []
        for index in range(N):
            begin = index*each
            if index == N-1:
                end = total
            else:
                end = begin + each
            parts.append((begin, end))
        return parts
    
    def batchReadFiles(fileList):
        allLines = []
        for filename in fileList:
            allLines.extend(readFile(filename))
        return allLines
    
    def putResult(futureResult, mq):
        for res in futureResult:
            mq.put(res.get())
        mq.put(CompletedMsg)
    
    class WordReading(object):
    
        def __init__(self, allFiles, mq):
            self.allFiles = allFiles
            self.mq = mq
            self.ioPool = Pool(ncpu*3)
            infolog.info('WordReading Initialized')
    
        def run(self):
    
            parts = divideNParts(len(self.allFiles), ncpu*3)
            futureResult = []
            for (begin, end) in parts:
                futureResult.append(self.ioPool.apply_async(func=batchReadFiles, args=(self.allFiles[begin:end],)))
    
            t = threading.Thread(target=putResult, args=(futureResult, self.mq))
            t.start()
    
            print 'Now quit'
    
    def processLines(lines):
        result = {}
        linesContent = ''.join(lines)
        matches = WordAnalyzing.wordRegex.findall(linesContent)
        if matches:
            for word in matches:
                if result.get(word) is None:
                    result[word] = 0
                result[word] += 1
        return result
    
    def mergeToSrcMap(srcMap, destMap):
        for key, value in destMap.iteritems():
            if srcMap.get(key):
                srcMap[key] = srcMap.get(key)+destMap.get(key)
            else:
                srcMap[key] = destMap.get(key)
        return srcMap
    
    class WordAnalyzing(object):
        '''
         return Map<Word, count>  the occurrence times of each word
        '''
        wordRegex = re.compile("[w]+")
    
        def __init__(self, mq, conn):
            self.mq = mq
            self.cpuPool = Pool(ncpu)
            self.conn = conn
            self.resultMap = {}
    
            infolog.info('WordAnalyzing Initialized')
    
        def run(self):
            starttime = time.time()
            lines = []
            futureResult = []
            while True:
                lines = self.mq.get()
                if lines == None:
                    continue
                if lines == CompletedMsg:
                    break
                futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,)))
    
            resultMap = {}
            for res in futureResult:
                mergeToSrcMap(self.resultMap, res.get())
            endtime = time.time()
            print 'WordAnalyzing analyze cost: ', (endtime-starttime)*1000 , 'ms'
    
            self.conn.send('OK')
            self.conn.close()
    
        def obtainResult(self):
            return self.resultMap
    
    
    class PostProcessing(object):
    
        def __init__(self, resultMap):
            self.resultMap = resultMap
    
        def sortByValue(self):
            return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True)
    
        def obtainTopN(self, topN):
            sortedResult = self.sortByValue()
            sortedNum = len(sortedResult)
            topN = sortedNum if topN > sortedNum else topN
            for i in range(topN):
                topi = sortedResult[i]
                print topi[0], ' counts: ', topi[1]
    
    if __name__ == "__main__":
    
        #dirpath = "/home/lovesqcc/workspace/java/javastudy/src/"
        dirpath = "c:\Users\qin.shuq\Desktop\region_master\src"
    
        if not os.path.exists(dirpath):
            print 'dir %s not found.' % dirpath
            exit(1)
    
        fileObtainer = FileObtainer(dirpath, lambda f: f.endswith('.java'))
        allFiles = fileObtainer.findAllFilesInDir()
    
        mqTimeout = 0.01
        mqNum = 3
    
        mq = MultiQueue(mqNum, CompletedMsg, timeout=mqTimeout)
        p_conn, c_conn = Pipe()
        wr = WordReading(allFiles, mq)
        wa = WordAnalyzing(mq, c_conn)
    
        wr.run()
        wa.run()
    
        msg = p_conn.recv()
        if msg == 'OK':
            pass
    
        # taking less time, parallel not needed.
        postproc = PostProcessing(wa.obtainResult())
        postproc.obtainTopN(30)
    
        print 'exit the program.'
  • 相关阅读:
    cmanformat
    mysql-sql语言参考
    jQuery 判断多个 input checkbox 中至少有一个勾选
    Java实现 蓝桥杯 算法提高 计算行列式
    Java实现 蓝桥杯 数独游戏
    Java实现 蓝桥杯 数独游戏
    Java实现 蓝桥杯 数独游戏
    Java实现 蓝桥杯 算法提高 成绩排序2
    Java实现 蓝桥杯 算法提高 成绩排序2
    Java实现 蓝桥杯 算法提高 成绩排序2
  • 原文地址:https://www.cnblogs.com/lovesqcc/p/4044777.html
Copyright © 2011-2022 走看看