zoukankan      html  css  js  c++  java
  • 大数据智能加工系统进度报告

    核心技术:

    • Flask框架
    • Pandas
    • 文件上传
    • 数据字典查看

    进度报告:

    主要实现了用户登录、文件上传、数据字典查看功能。

    核心代码:

    • 文件导入
    #文件导入
    @app.route('/import_data', methods=['POST', 'GET'])
    def import_data():
        flag=0;
        the_file = request.files.get("file")    #接收前端发送过来的文件,获取文件对象
        type=the_file.filename.split(".")[1]    #根据文件名获取文件类型
        print(type)                             #输出文件类型
    
        #根据文件类型调用对应函数保存文件
        if(type=="csv" or type=="txt"):
            the_file.save("score_table/" + the_file.filename)  # 保存文件到指定路径(score_table路径下)
            flag=connectsql.read_csv(the_file.filename)        #导入文件到数据库
        elif(type=="xlsx" or type=="xls"):
            the_file.save("excel_example/" + the_file.filename)  # 保存文件到指定路径(excel_example路径下)
            flag = connectsql.read_example(the_file.filename)
        elif(type=="docx"):
            the_file.save("word_data/" + the_file.filename)  # 保存文件到指定路径(word_data路径下)
        else:
            the_file.save("test_data/" + the_file.filename)  # 保存文件到指定路径(test_data路径下)
        if(flag==1):
            return jsonify({"code": 0, "msg": "", "data": ""})  #code代表操作状态,msg是描述信息,data是请求的业务数据。
        else:
            return jsonify({"code": -1, "msg": "", "data": ""})
    • 查询已导入文件
    @app.route('/get_table_list')
    def get_table_list():
        data=[]
        data=dictionary.get_table_data()
        data_re=[]
        for table_name,database_name,rows,data_time in data:
            #time strftime() 函数接收以时间元组,并返回以可读字符串表示的当地时间,"%Y-%m-%d %H:%M:%S"返回时间类型:2021-11-05, 10:24:28
            data_time_str=data_time.strftime("%Y-%m-%d %H:%M:%S")
            #append() 方法用于在列表末尾添加新的对象,该方法无返回值,但是会修改原来的列表
            data_re.append({"table_name":table_name,"database_name":database_name,"rows_num":rows,"create_time":data_time_str})
        count= len(data)
        print(data)
        return jsonify({"code": 0, "msg": "", "count": count,"data":data_re})
    • 查看数据字典
    @app.route('/get_look_dictionary')
    def get_look_dictionary():
        table_name=request.values.get("table_name")
        database_name=request.values.get("database_name")
        table_data,table_unit=dictionary.get_dictionary(table_name,database_name)
        data_re=[]
        count=len(table_data)
        for index in range(len(table_data)):
            print(table_data[index][4],table_unit[index])
            data_re.append({"key_english":table_data[index][0],"key_china":table_data[index][1],"key_type":table_data[index][2],
                            "key_long":table_data[index][3],"key_null":table_data[index][4],"key_unit":table_unit[index]})
        return jsonify({"code": 0, "msg": "", "count": count, "data": data_re})
    • 读取样表生成数据字典
    def read_example(path):
        flag=1
        conn, cursor = get_conn_mysql()     #连接数据库
        #将excel转换为csv文件
        data = pd.read_excel('excel_example/'+path, 'Sheet1')   #使用pandas读取excel文件
        data.fillna('', inplace=True)       #fillna——缺失值替代,inplace=True直接修改原对象,inplace=False创建副本,修改副本
        print(data)
        csv_name = path.split(".")[0]       #split()——指定分隔符对字符串进行切片,以'.'进行分割
        # 编写表创建语句(字段类型就设为string)
        # 表名
        table_name = path.split(".")[0]
        sql = "CREATE TABLE IF NOT EXISTS " + csv_name + " ("
        # 获取key值 CREATE TABLE `bigwork_data`.`table_test` (    
        # 循环加入key值
        keys_china = ""
        keys=""
        key_china=data.keys()
        j=0
        for i in data.values.tolist()[1]:
            sql = sql + i + " VARCHAR(45) NOT NULL DEFAULT '#' comment '"+key_china[j]+"',"
            j=j+1;
            keys = keys + i + ","
        keys_china = keys_china[0:-1]
        keys = keys[0:-1]
        creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;"
        print(creat_sql)
        # 获取%s
        s = ','.join(['%s' for _ in range(len(data.columns))])
        # 获取values
        keys_unit=data.values.tolist()[0];
        values=[]
        values.append(data.values.tolist()[0])
        for i in data.values.tolist()[2:]:
            values.append(i)
        print(values)
        # 组装insert语句
        insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s)
        print(insert_sql)
        # 创建表
        try:
            cursor.execute(creat_sql)
        except:
            traceback.print_exc()
            flag=0
            print("表创建失败")
        # # 插入数据
        try:
            for i in values:
                cursor.execute(insert_sql, i)
                print(insert_sql)
                print(i)
            conn.commit()
        except:
            traceback.print_exc()
            flag=0
            print("写入错误")
        close_conn_mysql(cursor, conn)
        return flag
    • 读取excel文件
    def read_excel(path):
        conn, cursor = get_conn_mysql()     #连接数据库
        #将excel转换为csv文件
        data = pd.read_excel('excel_data/'+path, 'Sheet1')
        csv_name = path.split(".")[0]   
        # 编写表创建语句(字段类型就设为string)
        # 表名
        table_name = path.split(".")[0]
        sql = "CREATE TABLE " + csv_name + " ("
        # 获取key值 CREATE TABLE `bigwork_data`.`table_test` (   
        # 循环加入key值
        keys = ""
        for i in data.keys():
            sql = sql + i + " VARCHAR(45) NOT NULL,"
            keys = keys + i + ","
        keys = keys[0:-1]
        creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;"
        # 获取%s
        s = ','.join(['%s' for _ in range(len(data.columns))])
        # 获取values
        values = data.values.tolist()
        print(values)
        # 组装insert语句
        insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s)
        print(insert_sql)
        print(creat_sql)
        print(keys);
        print(values)
        
        close_conn_mysql(cursor, conn)
    • 读取csv文件
    def read_csv(path):
        conn, cursor=get_conn_mysql()
        flag=1
        data=pd.read_csv("score_table/"+path)
        data.fillna('', inplace=True)
        #编写表创建语句(字段类型就设为string)
        #表名
        table_name = path.split(".")[0]
        sql = "CREATE TABLE IF NOT EXISTS " + table_name + " ("
        # 获取key值 CREATE TABLE `bigwork_data`.`table_test` (    
        # 循环加入key值
        keys_china = ""
        keys = ""
        key_china = data.keys()
        j = 0
        for i in data.values.tolist()[1]:
            sql = sql + i + " VARCHAR(45) NOT NULL DEFAULT '#' comment '" + key_china[j] + "',"
            j = j + 1;
            keys = keys + i + ","
        keys_china = keys_china[0:-1]
        keys = keys[0:-1]
        creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;"
        print(creat_sql)
        # 获取%s
        s = ','.join(['%s' for _ in range(len(data.columns))])
        # 获取values
        keys_unit = data.values.tolist()[0];
        values = []
        values.append(data.values.tolist()[0])
        for i in data.values.tolist()[2:]:
            values.append(i)
        print(values)
        # 组装insert语句
        insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s)
        print(insert_sql)    
        # 创建表
        try:
            cursor.execute(creat_sql)
        except:
            traceback.print_exc()
            flag = 0
            print("表创建失败")
        # # 插入数据
        try:
            for i in values:
                cursor.execute(insert_sql, i)
                print(insert_sql)
                print(i)
            conn.commit()
        except:
            traceback.print_exc()
            flag = 0
            print("写入错误")
        close_conn_mysql(cursor, conn)
        return flag
    • 获取表的数据字典
    def get_dictionary(name_table,database_name):   
        sql="select column_name,column_comment ,data_type,CHARACTER_MAXIMUM_LENGTH,COLUMN_DEFAULT " \
            "from information_schema.columns " \
            "where table_name='"+name_table+"' and table_schema='"+database_name+"'"
        res = query_mysql(sql)
        sql="select * from "+name_table+" limit 1"
        res2=query_mysql(sql)
        print(res)
        print(res2)
        return res,res2[0]
        pass
    • 获取表信息
    def get_table_data():
        sql="SELECT TABLE_NAME,TABLE_SCHEMA,TABLE_ROWS,CREATE_TIME " \
            "FROM information_schema.TABLES " \
            "where  TABLE_SCHEMA='bigdata';"
        res = query_mysql(sql)
        print(res)
        return res
        pass

    运行结果:

  • 相关阅读:
    创建数据库指定编码格式
    java开发环境配置
    Eclipse 配置工程
    声明式事务管理 的5 种方式
    web容器启动顺序
    2.1 实践篇:使用ping来检测网速
    1.1 mysql安装
    1.2 测试人员与开发人员比例
    1.0 软件测试能力
    1.4 测试各阶段(单元、集成、系统 、Alpha、Beta、验收)
  • 原文地址:https://www.cnblogs.com/zyj3955/p/15516930.html
Copyright © 2011-2022 走看看