zoukankan      html  css  js  c++  java
  • readzip_multiprocessing多进程

    #!/usr/bin/env python
    import os
    import numpy as np
    import py7zr
    import shutil
    import pandas as pd
    import time
    import multiprocessing
    import re
    #import math
    
    def fun_time_l2(a,b):
        if float(a)<=float(b) :
            return 1
        else:
            return 0
    
    def read_files(filename):#读文件内容
        #print(filename)
        df1 = pd.DataFrame()
        with open(filename, "r") as f:
            listT = []
            for line in f:
                listT.append(line)
            df1 = pd.DataFrame(listT)
    
        index = df1.loc[(df1[0].str.contains("find"))].index
        if index.isnull:
            df1 = df1.drop(index=index)
        # print(df1[13870:13890])
    
        df1 = pd.DataFrame(df1[0].str.strip())
        # print(df1)
        df1 = pd.DataFrame(df1[0].str.split("	", expand=True))
        # print(df1[1].str.strip())
        # print(df1[2].str.strip())
        # print(df1[1].astype("int")*df1[2].astype("int"))
    
        df1[3] = df1[1].astype("int") * df1[2].astype("int")
        df1.columns = ["time", "price", "vol", "amount"]
        vol_t = abs(df1["vol"].astype("long")).sum()
        amount_t = abs(df1["amount"].astype("long")).sum()
    
        df_f_xiao = df1[(df1["amount"].astype("int") < 0) & ((df1["amount"].astype("int") > -40000))]
        df_f_zhong = df1[(df1["amount"].astype("int") <= -40000) & ((df1["amount"].astype("int") > -200000))]
        df_f_da = df1[(df1["amount"].astype("int") <= - 200000) & ((df1["amount"].astype("int") > -1000000))]
        df_f_te_da = df1[(df1["amount"].astype("int") <= - 1000000)]
    
        f_xiao = df_f_xiao["amount"].astype("long").sum()
        f_zhong = df_f_zhong["amount"].astype("long").sum()
        f_da = df_f_da["amount"].astype("long").sum()
        f_te_da = df_f_te_da["amount"].astype("long").sum()
    
        df_z_xiao = df1[(df1["amount"].astype("int") > 0) & ((df1["amount"].astype("int") < 40000))]
        df_z_zhong = df1[(df1["amount"].astype("int") >= 40000) & ((df1["amount"].astype("int") < 200000))]
        df_z_da = df1[(df1["amount"].astype("int") >= 200000) & ((df1["amount"].astype("int") < 1000000))]
        df_z_te_da = df1[(df1["amount"].astype("int") >= 1000000)]
    
        z_xiao = df_z_xiao["amount"].astype("long").sum()
        z_zhong = df_z_zhong["amount"].astype("long").sum()
        z_da = df_z_da["amount"].astype("long").sum()
        z_te_da = df_z_te_da["amount"].astype("long").sum()
    
        # add 增加计算最小值
        min_L = df1["price"].astype("int").min()
        sum_V = abs(df1["vol"].astype("int")).sum()
        min_2 = min_L * 1.02
        df_min_2 = df1[(df1["price"].astype("int") < min_2)]
        sum_min_2_v = abs(df_min_2["vol"].astype("long")).sum()
        re_min_L2 = abs(sum_min_2_v) / sum_V * 100
        # add time
        df_min_3 = pd.DataFrame()
        df_min_3["time"] = df_min_2["time"].str[:-2]
        df_min_3 = df_min_3.drop_duplicates(subset = ['time'],keep = 'first',inplace = False)
        time_l2 = len(df_min_3)
    
        list_return = [vol_t, amount_t, z_xiao, z_zhong, z_da, z_te_da, f_xiao, f_zhong, f_da, f_te_da, re_min_L2, time_l2]
        return list_return
    
    
    def extract_files(filename):#提出7Z文件
        with py7zr.SevenZipFile(filename, 'r') as archive:
            allfiles = archive.getnames()#获取7Z文件内的子文件名
            #print(allfiles)
            #global tempdir
            tempdir = allfiles[0].split("/")[0]#取7Z文件内文件夹名称
            #print(tempdir)
            savedir =pathsave + str(tempdir)
            #print(pathsave)
            if os.path.exists(savedir):
                shutil.rmtree(savedir)#删除同名文件夹
            os.mkdir(savedir)#重建文件夹
            #archive.extract(pathsave,allfiles[0:3])#解压到文件夹
            archive.extractall(pathsave)#解压到文件夹
            #print(archive.extractall())
            return savedir
    def read_dirs(savedir):#读文件夹
        files=np.array(os.listdir(savedir))
        file_names = np.char.add(savedir + "\",files)
        return file_names
    def sub_process(df_only_name1,q):
        list_t1 = []
        n_count = 0
        for file in df_only_name1:
            n_count = n_count + 1
            #print("No. " ,n_count)
            (filepath, tempfilename) = os.path.split(file)
            (filename, extension) = os.path.splitext(tempfilename)
    
            if not os.path.getsize(file):  # 判断文件大小是否为0
                print("file siz = 0")
                print(file)
            else:
                list_t = read_files(file)
                #print("hah")
                list_t.insert(0, filename)
                list_t1.append(list_t)
        #listP = pd.DataFrame(list_t1)
        q.put(list_t1,block = False)
        #print("out")
        exit(0)
    
    if __name__ == '__main__':
        path = r'G:datas of status	ick-by-tick trade'  # 数据文件存放位置
        pathsave = 'G:\datas of status\python codes\'  # 设定临时文件存放位置
        pathTemp = 'G:\datas of status\python codes\everyday_data\temp'
        listM = np.array(os.listdir(path))  # 获取月文件夹
        print(listM)
        listM = np.char.add(path + "\", listM)  # 获取月文件夹路径
        #====================start work
        m = 9  # 开始处理第几个文件夹(1~16,16=202004,15=202003)
        do_num = 3
        for n in range(do_num):
            i = m - n #处理第几个文件夹(1~16)
            print(listM[i])
            listD = np.array(os.listdir(listM[i]))#获取一个文件夹下所有日文件全路径
            print(listD)
            listD = np.char.add(listM[i] + "\",listD)#获取日文件全名
            print(listD)
            #tempdir = ''
            #do_work(listD)
            list_columns = ["name", "date", "vol", "amount", "z_xiao", "z_zhong", "z_da", "z_te_da", "f_xiao", "f_zhong",
                            "f_da", "f_te_da", "re_min_L2", "time_l2"]
            list_columns1 = ["name", "vol", "amount", "z_xiao", "z_zhong", "z_da", "z_te_da", "f_xiao", "f_zhong",
                            "f_da", "f_te_da", "re_min_L2", "time_l2"]
            pdM_all = pd.DataFrame(columns=list_columns)
    
            for filename in listD:
            #for filename in listD:
                # filename = listD[0]
                print("=========")
                print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                npM = pd.DataFrame()
                savedir = extract_files(filename)
                #savedir = "G:\datas of status\python codes\20200816"
                print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                savedir = re.sub("-", '', savedir)
                findt = re.search("d+$", savedir)
                tempdir = findt.group()
                #====================
                file_names = read_dirs(savedir)
                all_nums = len(file_names)
                epochs = 3
                step = int(all_nums/epochs)
                process_list = []
                datelist = []
                q = multiprocessing.Queue(maxsize=epochs)
    
                for i in range(epochs):
                    begin = i * step
                    end = begin + step
                    if i == epochs -1:
                        end = all_nums
                    df_only_name1 = file_names[begin:end]
                    tmp_process = multiprocessing.Process(target=sub_process, args=(df_only_name1, q))
                    process_list.append(tmp_process)
                for process in process_list:
                    process.start()
                    #print("start",process)
                while(q.qsize() != epochs):
                    if(q.qsize()>=1):
                        time.sleep(3)
                    else:
                        time.sleep(40)
                count = 0
                while not q.empty():
                    list_g = q.get()
                    #print(list_g)
                    #print("hhaa",count )
                    count = count +1
                    npM = npM.append(list_g)
                    #print(npM)
                #=======================
                shutil.rmtree(savedir)
                npM.columns = list_columns1
                print(len(npM))
                pdD_t = npM
                pdD_t.insert(1, "date", tempdir, allow_duplicates=False)
                #===========
                #save_dfile = pathsave + "\" + "everyday_data" + "\" + pdD_t["date"][0] + ".csv"
                save_dfile = pathsave + "\" + "everyday_data" + "\" + tempdir + ".csv"
                # print(save_dfile)
                pdD_t = pdD_t.sort_values(by=['time_l2'], ascending=True)
                pdD_t.to_csv(save_dfile, sep=",", index=False, header=True)
                pdM_all = pdM_all.append(pdD_t)
                print(filename)
                print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
            # print(pdM_all)
            save_file = pathsave + pdM_all["date"][0].str[0:6] + ".csv"
            save_file = save_file.reset_index(drop=True)
            print(save_file[0])
            # df.to_csv(‘/opt/births1880.csv’, index=False, header=False
            # pdM_all = pdM_all.sort_values(by=['re_min_L2'], ascending=True)
            pdM_all.to_csv(save_file[0], sep=",", index=False, header=True)
        exit(0)
    

      

  • 相关阅读:
    高斯消元学习
    HDU 4596 Yet another end of the world(解一阶不定方程)
    Codeforces Round #318 div2
    HDU 4463 Outlets(一条边固定的最小生成树)
    HDU 4458 Shoot the Airplane(计算几何 判断点是否在n边形内)
    HDU 4112 Break the Chocolate(简单的数学推导)
    HDU 4111 Alice and Bob (博弈)
    POJ 2481 Cows(线段树单点更新)
    HDU 4288 Coder(STL水过)
    zoj 2563 Long Dominoes
  • 原文地址:https://www.cnblogs.com/rongye/p/13338554.html
Copyright © 2011-2022 走看看