核心技术:
- Flask框架
- Pandas
- 文件上传
- 数据字典查看
进度报告:
主要实现了用户登录、文件上传、数据字典查看功能。
核心代码:
- 文件导入
#文件导入 @app.route('/import_data', methods=['POST', 'GET']) def import_data(): flag=0; the_file = request.files.get("file") #接收前端发送过来的文件,获取文件对象 type=the_file.filename.split(".")[1] #根据文件名获取文件类型 print(type) #输出文件类型 #根据文件类型调用对应函数保存文件 if(type=="csv" or type=="txt"): the_file.save("score_table/" + the_file.filename) # 保存文件到指定路径(score_table路径下) flag=connectsql.read_csv(the_file.filename) #导入文件到数据库 elif(type=="xlsx" or type=="xls"): the_file.save("excel_example/" + the_file.filename) # 保存文件到指定路径(excel_example路径下) flag = connectsql.read_example(the_file.filename) elif(type=="docx"): the_file.save("word_data/" + the_file.filename) # 保存文件到指定路径(word_data路径下) else: the_file.save("test_data/" + the_file.filename) # 保存文件到指定路径(test_data路径下) if(flag==1): return jsonify({"code": 0, "msg": "", "data": ""}) #code代表操作状态,msg是描述信息,data是请求的业务数据。 else: return jsonify({"code": -1, "msg": "", "data": ""})
- 查询已导入文件
@app.route('/get_table_list') def get_table_list(): data=[] data=dictionary.get_table_data() data_re=[] for table_name,database_name,rows,data_time in data: #time strftime() 函数接收以时间元组,并返回以可读字符串表示的当地时间,"%Y-%m-%d %H:%M:%S"返回时间类型:2021-11-05, 10:24:28 data_time_str=data_time.strftime("%Y-%m-%d %H:%M:%S") #append() 方法用于在列表末尾添加新的对象,该方法无返回值,但是会修改原来的列表 data_re.append({"table_name":table_name,"database_name":database_name,"rows_num":rows,"create_time":data_time_str}) count= len(data) print(data) return jsonify({"code": 0, "msg": "", "count": count,"data":data_re})
- 查看数据字典
@app.route('/get_look_dictionary') def get_look_dictionary(): table_name=request.values.get("table_name") database_name=request.values.get("database_name") table_data,table_unit=dictionary.get_dictionary(table_name,database_name) data_re=[] count=len(table_data) for index in range(len(table_data)): print(table_data[index][4],table_unit[index]) data_re.append({"key_english":table_data[index][0],"key_china":table_data[index][1],"key_type":table_data[index][2], "key_long":table_data[index][3],"key_null":table_data[index][4],"key_unit":table_unit[index]}) return jsonify({"code": 0, "msg": "", "count": count, "data": data_re})
- 读取样表生成数据字典
def read_example(path): flag=1 conn, cursor = get_conn_mysql() #连接数据库 #将excel转换为csv文件 data = pd.read_excel('excel_example/'+path, 'Sheet1') #使用pandas读取excel文件 data.fillna('', inplace=True) #fillna——缺失值替代,inplace=True直接修改原对象,inplace=False创建副本,修改副本 print(data) csv_name = path.split(".")[0] #split()——指定分隔符对字符串进行切片,以'.'进行分割 # 编写表创建语句(字段类型就设为string) # 表名 table_name = path.split(".")[0] sql = "CREATE TABLE IF NOT EXISTS " + csv_name + " (" # 获取key值 CREATE TABLE `bigwork_data`.`table_test` ( # 循环加入key值 keys_china = "" keys="" key_china=data.keys() j=0 for i in data.values.tolist()[1]: sql = sql + i + " VARCHAR(45) NOT NULL DEFAULT '#' comment '"+key_china[j]+"'," j=j+1; keys = keys + i + "," keys_china = keys_china[0:-1] keys = keys[0:-1] creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;" print(creat_sql) # 获取%s s = ','.join(['%s' for _ in range(len(data.columns))]) # 获取values keys_unit=data.values.tolist()[0]; values=[] values.append(data.values.tolist()[0]) for i in data.values.tolist()[2:]: values.append(i) print(values) # 组装insert语句 insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s) print(insert_sql) # 创建表 try: cursor.execute(creat_sql) except: traceback.print_exc() flag=0 print("表创建失败") # # 插入数据 try: for i in values: cursor.execute(insert_sql, i) print(insert_sql) print(i) conn.commit() except: traceback.print_exc() flag=0 print("写入错误") close_conn_mysql(cursor, conn) return flag
- 读取excel文件
def read_excel(path): conn, cursor = get_conn_mysql() #连接数据库 #将excel转换为csv文件 data = pd.read_excel('excel_data/'+path, 'Sheet1') csv_name = path.split(".")[0] # 编写表创建语句(字段类型就设为string) # 表名 table_name = path.split(".")[0] sql = "CREATE TABLE " + csv_name + " (" # 获取key值 CREATE TABLE `bigwork_data`.`table_test` ( # 循环加入key值 keys = "" for i in data.keys(): sql = sql + i + " VARCHAR(45) NOT NULL," keys = keys + i + "," keys = keys[0:-1] creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;" # 获取%s s = ','.join(['%s' for _ in range(len(data.columns))]) # 获取values values = data.values.tolist() print(values) # 组装insert语句 insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s) print(insert_sql) print(creat_sql) print(keys); print(values) close_conn_mysql(cursor, conn)
- 读取csv文件
def read_csv(path): conn, cursor=get_conn_mysql() flag=1 data=pd.read_csv("score_table/"+path) data.fillna('', inplace=True) #编写表创建语句(字段类型就设为string) #表名 table_name = path.split(".")[0] sql = "CREATE TABLE IF NOT EXISTS " + table_name + " (" # 获取key值 CREATE TABLE `bigwork_data`.`table_test` ( # 循环加入key值 keys_china = "" keys = "" key_china = data.keys() j = 0 for i in data.values.tolist()[1]: sql = sql + i + " VARCHAR(45) NOT NULL DEFAULT '#' comment '" + key_china[j] + "'," j = j + 1; keys = keys + i + "," keys_china = keys_china[0:-1] keys = keys[0:-1] creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;" print(creat_sql) # 获取%s s = ','.join(['%s' for _ in range(len(data.columns))]) # 获取values keys_unit = data.values.tolist()[0]; values = [] values.append(data.values.tolist()[0]) for i in data.values.tolist()[2:]: values.append(i) print(values) # 组装insert语句 insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s) print(insert_sql) # 创建表 try: cursor.execute(creat_sql) except: traceback.print_exc() flag = 0 print("表创建失败") # # 插入数据 try: for i in values: cursor.execute(insert_sql, i) print(insert_sql) print(i) conn.commit() except: traceback.print_exc() flag = 0 print("写入错误") close_conn_mysql(cursor, conn) return flag
- 获取表的数据字典
def get_dictionary(name_table,database_name): sql="select column_name,column_comment ,data_type,CHARACTER_MAXIMUM_LENGTH,COLUMN_DEFAULT " \ "from information_schema.columns " \ "where table_name='"+name_table+"' and table_schema='"+database_name+"'" res = query_mysql(sql) sql="select * from "+name_table+" limit 1" res2=query_mysql(sql) print(res) print(res2) return res,res2[0] pass
- 获取表信息
def get_table_data(): sql="SELECT TABLE_NAME,TABLE_SCHEMA,TABLE_ROWS,CREATE_TIME " \ "FROM information_schema.TABLES " \ "where TABLE_SCHEMA='bigdata';" res = query_mysql(sql) print(res) return res pass