zoukankan      html  css  js  c++  java
  • python多线程/进程文件读取

    如果要获取数据并分析,例如用for循环,那只能按顺序读取,这样就会造成效率低下:

    循环读取多文件过慢,本文分别使用多线程、多进程方法对文件进行读取

    多线程

    由于处理完文件往往需要获取返回值,可以使用以下两种方法:

    import queue
    q = queue.Queue()
    def read_file(file):
        with open(os.path.join(path,file),'r') as f:
            q.put()
    

    1 自定义get_result()方法,取返回值

    import threading
    class ReadThread(threading.Thread):
        def __init__(self,file):
            threading.Thread.__init__(self) #super(ReadThread, self).__init__()
            self.file = file
            
        def run(self):
            self.res = read_file(self.file)
            
        def get_result(self):
            #注:此方法特别慢
            return self.res
    
    threads = []
    for file in os.listdir(path):
    	t = ReadThread(file)
        threads.append(t)
        
    [t.start() for t in threads]
    [t.join() for t in threads]
    for t in threads:
    	t.get_result()
    

    2 使用队列

    #改用多线程读取
    import threading
    
    start = time.time()
    df = pd.DataFrame()
    
    threads = []
    for file in os.listdir(path):
        t = threading.Thread(target=read_file,args=(file,))
        threads.append(t)
        
    [t.start() for t in threads]
    [t.join() for t in threads]
    
    while not q.empty():
        q.get()
        q.task_done()
        
    print("read time:",time.time()-start)
    

    关于task_done

    如果线程里每从队列里取一次,但没有执行task_done(),则join无法判断队列到底有没有结束,在最后执行个join()是等不到结果的,会一直挂起。

    理解Queue队列中join()与task_done()的关系 - andyzhang- - 博客园 (cnblogs.com)

    python伪多线程,适合IO密集型任务

    如果是一个计算为主的程序(专业一点称为CPU密集型程序),这一点确实是比较吃亏的,每个线程运行一遍,就相当于单线程再跑,甚至比单线程还要慢——CPU切换线程的上下文也是要有开销的。但是,如果是一个磁盘或网络为主的程序(IO密集型)就不同了。一个线程处在IO等待的时候,另一个线程还可以在CPU里面跑,有时候CPU闲着没事干,所有的线程都在等着IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道IO的速度比起CPU来是慢到令人发指的,python的多线程就在这时候发挥作用了。比方说多线程网络传输,多线程往不同的目录写文件,等等

    python 实现多线程并返回函数返回值的三种方法_zxc的博客-CSDN博客_python多线程函数返回值

    线程池

    最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

    多线程线程数设置多少合适_东东的专栏-CSDN博客_线程数设置多少合适

    本文阻塞、非阻塞主要指取结果时是否使用回调函数,回调函数可以避免阻塞

    线程池 +阻塞

    def read_file(file):
        with open(os.path.join(path,file),'r') as f:
        	data = json.load(f)
            return data
    
    #线程池,取结果时会阻塞
    from concurrent.futures import ThreadPoolExecutor,as_completed
    
    df = pd.DataFrame()
    
    start_time = time.time()
    with ThreadPoolExecutor(20) as executor: # 创建 ThreadPoolExecutor 
        future_list = [executor.submit(read_file, file) for file in os.listdir(path)] # 提交任务
    
     
    for future in as_completed(future_list):
        result = future.result() # 获取任务结果
        df = df.append(result,ignore_index=True)
    
    print(time.time()-start_time)
    

    线程池 +非阻塞

    #线程池,非阻塞
    from concurrent.futures import ThreadPoolExecutor,as_completed
    
    df = pd.DataFrame()
    
    start_time = time.time()
    with ThreadPoolExecutor(20) as executor: # 创建 ThreadPoolExecutor 
        future_list = [executor.submit(read_file, file) for file in os.listdir(path)] # 提交任务
    
    
    def get_result(future):
        global df
        df = df.append(future.result(),ignore_index=True)
    
        
    for future in as_completed(future_list):
        future.add_done_callback(get_result)
    
    print(time.time()-start_time)
    

    进程池

    进程数一般设置为 核数-1

    进程池 +阻塞

    from concurrent.futures import ThreadPoolExecutor,as_completed
    
    net_df = pd.DataFrame()
    
    start_time = time.time()
    # 提交任务
    with ThreadPoolExecutor(20) as executor: # 创建 ThreadPoolExecutor 
        future_list = [executor.submit(read_file, file) for file in os.listdir(path)] 
    
        
    for future in as_completed(future_list):
    	result = future.result() # 获取任务结果
    	net_df = net_df.append(result,ignore_index=True)
    
    print(time.time()-start_time)
    
    

    进程池 +非阻塞

    #进程池,非阻塞获取结果
    from concurrent.futures import ProcessPoolExecutor,as_completed
    
    df = pd.DataFrame()
    
    start_time = time.time()
    # 提交任务
    with ProcessPoolExecutor(7) as executor: # 创建 ThreadPoolExecutor 
        future_list = [executor.submit(read_file, file) for file in os.listdir(path)] 
    
    def get_result(future):
        global df
        df = df.append(future.result(),ignore_index=True)
    
        
    for future in as_completed(future_list):
        future.add_done_callback(get_result)
    
    print(time.time()-start_time)
    

    Python线程池及其原理和使用(超级详细) (biancheng.net)

    [第53天: Python 线程池 - 纯洁的微笑 - 博客园 (cnblogs.com)](https://www.cnblogs.com/ityouknow/p/12993166.html#:~:text=在 python 中使用线程池有两种方式,一种是基于第三方库 threadpool,,另一种是基于 python3 新引入的库 concurrent.futures.ThreadPoolExecutor 。)

    Python concurrent.future线程池和进程池_Gordennizaicunzai的博客-CSDN博客

  • 相关阅读:
    【每日一题-leetcode】 47.全排列 II
    【每日一题-leetcode】46.全排列
    【每日一题-leetcode】 77.组合
    【每日一题-leetcode】105.从前序与中序遍历序列构造二叉树
    【每日一题-leetcode】297.二叉树的序列化与反序列化
    【读书笔记】《淘宝技术这十年》
    python第17天-网络复习
    python编码风格
    python第16天-网络4
    python第15天-网络3
  • 原文地址:https://www.cnblogs.com/gongyanzh/p/14820926.html
Copyright © 2011-2022 走看看