zoukankan      html  css  js  c++  java
  • do_sings_user_threading.py

    #!/usr/bin/env python
    from struct import *
    import pandas as pd
    import numpy as np
    import os
    import re
    import pathlib
    
    import threading
    import time
    
    
    
    
    def get_new_data():
        #pathdir = "G:\datas of status\python codes\data of every months"
        pathdir = "G:\datas of status\python codes\everyday_data"#新数据原文件存放位置
        files = np.array(os.listdir(pathdir))
    
        #file_names = np.char.add(pathdir + "\", files)
        file_names =[pathdir+"\"+f for f in files if re.search('.csv$',f)]
        df_all = pd.DataFrame()
        for file in file_names:
            print(file)
            df1 = pd.read_csv(file)
            df_all = df_all.append(df1)
    
    
        #print(df_all)
        df_all["name"] = df_all["name"].str.replace("SZ","0_")
        df_all["name"] = df_all["name"].str.replace("SH","1_")
        df_all["name"] = df_all["name"] + ".dat"
        df_all["begin"] = df_all["name"].map(str) + df_all["date"].map(str)
        #print(df_all.columns.values)
        return df_all
    
    
    
    
    
    def read_file_out(file):
        f3 = open(file,mode='rb')
        buf = f3.read()
        #print(buf)
        num=len(buf)
        #print(num)
        no = num / 8
        b = 0
        e = 8
        items = list()
        df_all = pd.DataFrame()
        for i in range(int(no)):
            date = unpack("I",buf[b:e-4])
            con = unpack("f",buf[b+4:e])
            data = {"date":date,
                    'content':con}
            df1 = pd.DataFrame(data)
            #print(date)
            #print(data)
            #print(df1)
            df_all = df_all.append(df1)
            #print(a)
            #print(c)
            b = b + 8
            e = e + 8
        df_all = df_all.reset_index(drop=True)
        return df_all
    
    
    def chang_to_bin(filew,df_all):
        #fw = open(filew, mode='wb')#重写模式
        fw = open(filew, mode='ab')#追加模式
    
        for i in range(len(df_all)):
            #print(df_all.loc[i,"date"])
            #print(type(df_all.loc[i,"date"]))
            fw.write(pack("I",int(df_all.loc[i,"date"])))
            fw.write(pack("f",df_all.loc[i,"content"]))
        fw.close()
    
    
    dir_dict = {
        "z_da" : "signals_user_43",
        "z_te_da" : "signals_user_44",
        "f_da" : "signals_user_47",
        "f_te_da" : "signals_user_48"
        }
    
    dir_dict2 = {
        "z_xiao" : "signals_user_41",
        "z_zhong" : "signals_user_42",
        "z_da" : "signals_user_43",
        "z_te_da" : "signals_user_44",
        "f_xiao" : "signals_user_45",
        "f_zhong" : "signals_user_46",
        "f_da" : "signals_user_47",
        "f_te_da" : "signals_user_48"
        }
    
    
    
    
    def dowork(df_only_name):
        for name in df_only_name:#控件股票数df_only_name[13:15]
            #print(name)
            df_t_name = df_new_data[df_new_data["name"] == name]
            for key in dir_dict.keys():
                #if key == "f_zhong":
                    #print(key)
                    pathdir_key = pathdir + "\" + dir_dict[key]
                    file_t_name = pathdir_key + "\" + name
                    df_t_name_key =pd.DataFrame(df_t_name,columns = ["date",key])
                    df_t_name_key["content"] = df_t_name_key[key]
                    df_t_name_key = df_t_name_key.drop(columns=key)
                    df_t_name_key = df_t_name_key.drop_duplicates(subset=["date"], keep="last", inplace=False)
                    #print(df_t_name_key)
                    if not os.path.isfile(file_t_name):
                        pathlib.Path(file_t_name).touch()#创建空文件
    
                    '''#以下为与原数据混合处理,同时要修改chang_to_bin里的读文件模式配合使用
                    df_old = read_file_out(file_t_name)
                    df_old = df_old.drop_duplicates(subset=["date"], keep="last", inplace=False)
                    #print(df_old)
                    df_t_name_key = df_old.append(df_t_name_key,sort = False)
                    df_t_name_key = df_t_name_key.drop_duplicates(subset = ["date"],keep = "last",inplace=False)
                    '''
    
    
                    df_t_name_key = df_t_name_key.reset_index(drop =  True)
                   #print(df_t_name_key)
    
                    chang_to_bin(file_t_name, df_t_name_key)
    
    
    def run(df_only_name1, semaphore):
        semaphore.acquire()   #加锁
        dowork(df_only_name1)
        #time.sleep(1)
        #print("run the thread:%s
    " % n)
    
    
    
    
        semaphore.release()     #释放
    
    if __name__ == '__main__':
    
        pathdir = "F:\python\untitled1\core\do_sings_user"  # 更新后数据存放位置
        # pathdir = "F:\通达信\New tdx vip2020.03开心果整合版红顶栏\New tdx vip2020.03开心果整合版\T0002\signals"
        df_new_data = get_new_data()
        df_only_name = df_new_data["name"]
        # df_only = df_only.drop_duplicates(subset=['name'],keep='last',inplace=False)
        df_only_name = df_only_name.drop_duplicates(keep='last', inplace=False)
        print(len(df_only_name))
        all_nums = len(df_only_name)
    
        every_batch = 13
        epochs = int(all_nums / every_batch)
        print(epochs)
        num_of_thread = 303
        #num = 1
        semaphore = threading.BoundedSemaphore(num_of_thread)  # 最多允许5个线程同时运行
        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
        for i in range(epochs+1):
            begin = i*every_batch
            end = begin +every_batch
    
            if all_nums<=end:
                end = all_nums - 1
            df_only_name1 = df_only_name[begin:end]
            t = threading.Thread(target=run, args=(df_only_name1, semaphore))
            t.start()
            #print(i)
            print(i,begin,end)
        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    
        while threading.active_count() != 1:
            print(threading.active_count())
    
            time.sleep(10)
            pass  # print threading.active_count()
        else:
            print('-----all threads done-----')
            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
  • 相关阅读:
    flask 指定前端文件路径以及静态文件路径
    pycharm git修改密码
    Web应用搭建
    python学习
    python解析jSON文件
    通过DLNA将电脑视频投射到电视屏幕
    U盘自动复制文件
    kali PIN码破解
    mdk3洪水攻击教程
    sqlmap(网站数据库注入)
  • 原文地址:https://www.cnblogs.com/rongye/p/13196067.html
Copyright © 2011-2022 走看看