zoukankan      html  css  js  c++  java
  • python 多线程处理框架

    多线程处理框架

    python2.7
    python3.5

    多线程通用任务处理型驱动框架
    probe_type 探测类型rtsp或者http
    task_queue 任务队列
    task_handler 任务处理函数
    thread_count 线程数数目
    result_queue 结果存放队列
    args,kwargs为可变参数列表,为扩展性考虑
    

    2016-8-26
    python3新增中断操作

    #!/usr/bin/env python2
    # coding=utf-8
    import threading
    import argparse
    import Queue
    
    
    class MultiThreadHandler(object):
        """
        多线程通用任务处理型驱动框架
        task_queue 任务队列
        task_handler 任务处理函数
        thread_count 线程数数目
        result_queue 结果存放队列
        args,kwargs为可变参数列表,为扩展性考虑
        """
    
        def __init__(self, task_queue, task_handler, result_queue=None, thread_count=1, *args, **kwargs):
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.task_handler = task_handler
            self.thread_count = thread_count
            self.args = args
            self.kwagrs = kwargs
    
        def run(self, block_flag):
            thread_pool = []
            for i in range(self.thread_count):
                t = _TaskHandler(self.task_queue, self.task_handler, self.result_queue, *self.args, **self.kwagrs)
                thread_pool.append(t)
            for th in thread_pool:
                th.start()
            # 阻塞等待所有线程结束
            if block_flag:
                for th in thread_pool:
                    threading.Thread.join(th)
    
    
    class _TaskHandler(threading.Thread):
        """
        一个任务处理器线程,task_queue任务队列,result_queue是结果队列,task_handler任务处理函数,args,kwargs可变控制参数
        """
    
        def __init__(self, task_queue, task_handler, result_queue=None, *args, **kwargs):
            threading.Thread.__init__(self)
            self.task_queue = task_queue
            self.task_handler = task_handler
            self.result_queue = result_queue
            self.args = args
            self.kwargs = kwargs
    
        def run(self):
            while True:
                try:
                    item = self.task_queue.get(False)
                    self.task_handler(item, self.result_queue, *self.args, **self.kwargs)
                    self.task_queue.task_done()
                except Queue.Empty, e:
                    print "all task has done!"
                    break
                except Exception, e:
                    print "error:", e
    
    
    def out(item, result_queue):        # 自行加载处理函数
        host = item
        result_queue.put(host)
    
    
    if __name__ == '__main__':
        # parse the command args
        parse = argparse.ArgumentParser()
        parse.add_argument("-f", "--file", help="the target file")
        parse.add_argument("-th", "--thread", type=int, default=1, help="the thread number")
        parse.add_argument("-o", "--outfile", help="the outputfile")
        # 解析命令行
        results = parse.parse_args()
        filename = results.file
        th = results.thread
        outfile = results.outfile
        task_queue = Queue.Queue()
        out_queue = Queue.Queue()
        with open(filename) as f:
            for line in f:
                line = line.rstrip()
                if line:
                    task_queue.put(line)
    
        MultiThreadHandler(task_queue, out, out_queue, th).run(True)
    
        with open(outfile, "w+") as f:
            while True:
                f.write(out_queue.get() + '
    ')
                if out_queue.empty():
                    break
    
    #! python3
    # coding=utf-8
    
    
    import queue
    import argparse
    import threading
    import time
    
    
    class MultiThreadHandler(object):
        """
        多线程通用任务处理型驱动框架
        task_queue 任务队列
        task_handler 任务处理函数
        thread_count 线程数数目
        result_queue 结果存放队列
        args,kwargs为可变参数列表,为扩展性考虑
        """
    
        def __init__(self, task_queue, task_handler, result_queue=None, thread_count=1, *args, **kwargs):
            self.task_queue = task_queue
            self.task_handler = task_handler
            self.result_queue = result_queue
            self.thread_count = thread_count
            self.args = args
            self.kwagrs = kwargs
            self.thread_pool = []
    
        def run(self, block_flag):
            for i in range(self.thread_count):
                t = _TaskHandler(self.task_queue, self.task_handler, self.result_queue, *self.args, **self.kwagrs)
                self.thread_pool.append(t)
            for th in self.thread_pool:
                th.setDaemon(True)
                th.start()
            '''
            # 阻塞等待所有线程结束
            if block_flag:
                for th in thread_pool:
                    threading.Thread.join(th)
            '''
            # 阻塞等待所有线程结束
            while self._check_stop():
                try:
                    time.sleep(1)
                except KeyboardInterrupt:
                    print('KeyboardInterruption')
                    self.stop_all()
                    break
            print('>>>all Done')
    
        def _check_stop(self):
            """检查线程池中所有线程是否全部运行完"""
            finish_num = 0
            for th in self.thread_pool:
                if not th.isAlive():
                    finish_num += 1
    
            return False if finish_num == len(self.thread_pool) else True
    
        def stop_all(self):
            """掉用线程体stop方法,停止所有线程"""
            for th in self.thread_pool:
                th.stop()
    
    
    class _TaskHandler(threading.Thread):
        """
        一个任务处理器线程,task_queue任务队列,task_handler任务处理函数,result_queue是结果队列,args,kwargs可变控制参数
        可外部中断
        """
    
        def __init__(self, task_queue, task_handler, result_queue=None, *args, **kwargs):
            threading.Thread.__init__(self)
            self.task_queue = task_queue
            self.task_handler = task_handler
            self.result_queue = result_queue
            self.args = args
            self.kwargs = kwargs
            self.is_stoped = True
    
        def run(self):
            while self.is_stoped:
                try:
                    item = self.task_queue.get(False)  # block= False
                    self.task_handler(item, self.result_queue, *self.args, **self.kwargs)
                    self.task_queue.task_done()  # 退出queue
                except queue.Empty as e:
                    print("all task has done!")
                    break
                except Exception as e:
                    print("error:", e)
                # time.sleep(1)
    
        def stop(self):
            self.is_stoped = False
    
    
    def out(item, result_queue):  # 加载处理函数
        result_queue.put(item)
    
    
    if __name__ == '__main__':
        # parse the command args
        start = time.time()
        parse = argparse.ArgumentParser()
        parse.add_argument("-f", "--file", help="the target file")
        parse.add_argument("-th", "--thread", type=int, default=1, help="the thread number")
        parse.add_argument("-o", "--outfile", help="the outputfile")
        # 解析命令行
        results = parse.parse_args()
        filename = results.file
        th = results.thread
        outfile = results.outfile
        task_queue = queue.Queue()
        out_queue = queue.Queue()
        with open(filename, "r+") as f:
            for line in f:
                line = line.rstrip()
                if line:
                    task_queue.put(line)
    
        MultiThreadHandler(task_queue, out, out_queue, th).run(True)
    
        with open(outfile, "w+") as f:
            while True:
                f.write(out_queue.get() + '
    ')
                if out_queue.empty():
                    break
        end = time.time()
        print(end - start)
    
  • 相关阅读:
    Android笔记(adb命令--reboot loader)
    Android笔记(预安装APK)
    Linux驱动学习(编写一个最简单的模块)
    const关键字与指针
    C++函数重载遇到了函数默认参数情况
    uboot环境变量分析
    ftp服务
    Samba服务
    mariadb_2 单表的增删改查
    mariadb_1 数据库介绍及基本操作
  • 原文地址:https://www.cnblogs.com/sinanorz/p/7636100.html
Copyright © 2011-2022 走看看