zoukankan      html  css  js  c++  java
  • 在Python中使用多进程快速处理数据

    转自:https://blog.csdn.net/bryan__/article/details/78786648

    数据分片:可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并。应用场景:多进程爬虫,类mapreduce任务。缺点是子进程会拷贝父进程所有状态,内存浪费严重。

    import math
    from multiprocessing import Pool
    
    def run(data, index, size):  # data 传入数据,index 数据分片索引,size进程数
        size = math.ceil(len(data) / size)
        start = size * index
        end = (index + 1) * size if (index + 1) * size < len(data) else len(data)
        temp_data = data[start:end]
        # do something
        return data  # 可以返回数据,在后面收集起来
    
    processor = 40
    res = []
    p = Pool(processor)
    for i in range(processor):
        res.append(p.apply_async(run, args=(data, i, processor,)))
        print(str(i) + ' processor started !')
    p.close()
    p.join()
    for i in res:
        print(i.get())  # 使用get获得多进程处理的结果

    分文件处理:当内存受限时,不能再继续使用数据分片,因为子进程会拷贝父进程的所有状态,导致内存的浪费。这时候可以考虑先把大文件分片保存到磁盘,然后del 释放掉数据,接着在多进程处理的函数里面分别读取,这样子进程就会分别读取需要处理的数据,而不会占用大量内存。

    from multiprocessing import Pool
    import pandas as pd
    import math
    data=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]})
    users=pd.DataFrame(data['user_id'].unique(),columns=['user_id'])
    processor=4
    p=Pool(processor)
    l_data = len(users)
    size = math.ceil(l_data / processor)
    res = []
    def run(i):
        data=pd.read_csv('../data/user_'+str(i)+'.csv')
        #todo
    return data
    
    for i in range(processor):
        start = size * i
        end = (i + 1) * size if (i + 1) * size < l_data else l_data
        user = users[start:end]
        t_data = pd.merge(data, user, on='user_id').reset_index(drop=True)
        t_data.to_csv('../data/user_'+str(i)+'.csv',index=False)
        print(len(t_data))
    
    del data,l_data,users
    for i in range(processor):
        res.append(p.apply_async(run, args=(i,)))
        print(str(i) + ' processor started !')
    p.close()
    p.join()
    data = pd.concat([i.get() for i in res])
    

      

    多进程数据共享:当需要修改共享的数据时,那么这个时候可以使用数据共享:

    from multiprocessing import Process, Manager
    # 每个子进程执行的函数
    # 参数中,传递了一个用于多进程之间数据共享的特殊字典
    def func(i, d):
        d[i] = i + 100
        print(d.values())
    # 在主进程中创建特殊字典
    m = Manager()
    d = m.dict()
    for i in range(5):
        # 让子进程去修改主进程的特殊字典
        p = Process(target=func, args=(i, d))
        p.start()
    p.join()
    ------------
    [100]
    [100, 101]
    [100, 101, 102, 103]
    [100, 101, 102, 103]
    [100, 101, 102, 103, 104]
    

      

  • 相关阅读:
    String.getBytes()未设置字符集导致打印的pdf乱码
    git更新代码报错,error: The following untracked working tree files would be overwritten by ch
    thinkpad X1 extreme 安装Ubuntu 18.04.2 LTS
    plsql的sql窗口中文模糊查询没有作用
    mysql 触发器和存储过程组合使用,实现定时触发操作
    css 实现table 隔行变色
    meta标签详解:源http://blog.csdn.net/kongjiea/article/details/17092413
    Spring+Quartz实现定时任务的配置方法
    ECToch随笔
    转载:ecshop自定义销量
  • 原文地址:https://www.cnblogs.com/fujian-code/p/8977744.html
Copyright © 2011-2022 走看看