从数据库中获取数据
SELECT re.tenant_id as adm_div_code, re.pa_year, re.pa_bt_bt_no nt_pa_vo_id, #re.pa_bt_bt_no, pi.created_date AS bi_date, re.accept_bt_code , '' as exec_bt_name, '' as bt_fax_proj_code, tt.bt_fax_pro_code as bt_fax_code, tt.bt_fax_pro_name as bt_fax_name, tt.bt_type as bt_in_category_code, '' as bt_in_category_name, tt.in_sort_code as in_sort_code, '' as in_sort_name, re.bi_no as bt_fax_pa_no, re.pa_bt_name as act_payer_name, '' as charge_stand_name, re.receivable_pb_am as pb_am, re.ac_pb_am as pbd_am, p.qty as pa_number, re.trade_time as pa_date# FROM bt_pa_red re LEFT JOIN bt_pa_pro_be p ON re.bid = p.ref_pa_record_bid and p.is_normal='1' LEFT JOIN nt_prd tt ON tt.bid = p.ref_project_bid and tt.is_normal='1' LEFT JOIN bt_bi pi ON pi.bi_no = re.bi_no and pi.is_normal='1' where re.is_normal='1' ;
可以看出从bt_pa_red查询联表bt_pa_pro_be,nt_prd, bt_bi。
三张表数据量:
bt_bi 2900461
bt_pa_pro_be 084380
bt_pa_red 2970776
nt_prd 2370
实际查询过程中,执行查询的机器64G内存都不够用,无法查询出想要的数据。
使用python逐行读取bt_pa_red表,再按sql中jion条件去获取数据
在写代码执行发现某一行数据重复很多,重复量达到289052。找到有问题的数据行调试,原来bt_pa_pro_be 表中bi_no 列存在空值,导致匹配数据量很大。
python代码加判断 if bt_fax_pa_no is not None and len(bt_fax_pa_no)>0: python在执行没有重复数据。同时把sql修改条件,最后where 增加 and length(bi_no)>2,sql 也就能查询,最终查询出300万行数据
python 最终代码
#encoding: utf-8 #!/bin/python3 import sys print(sys.path) import pymysql import time # sql 插入完整版 conn = pymysql.connect(host="127.0.0.1",user="bt",password="123456",db="bt",port=3306) db1 = pymysql.connect(host="127.0.0.1",user="bt",password="123456",db="bt",port=3306) db2 = pymysql.connect(host="127.0.0.1",user="bt",password="123456",db="bt",port=3306) db3 = pymysql.connect(host="127.0.0.1",user="bt",password="123456",db="bt",port=3306) db4 = pymysql.connect(host="127.0.0.1",user="bt",password="123456",db="bt",port=3306) if __name__ == "__main__": print(time.asctime( time.localtime(time.time()) )) cursor = pymysql.cursors.SSCursor(conn) cursor.execute("select * from bt_pa_red where is_normal='1' and trade_time >='2021-01-01 00:00:00' and trade_time <='2021-06-30 23:59:59'") count=0 while True: count=count+1 #计数,放在最开始 row = cursor.fetchone() if not row: break adm_div_code = row[9] pa_year=row[10] nt_pa_vo_id=row[17] bi_date=None accept_bt_code=row[12] exec_bt_name="" bt_fax_proj_code="" bt_fax_code=None bt_fax_name=None bt_in_category_code=None bt_in_category_name="" in_sort_code=None in_sort_name="" bt_fax_pa_no=row[22] act_payer_name=row[23] charge_stand_name="" pb_am=row[18] pbd_am=row[31] pa_number=None pa_date=row[33] #联表 bt_pa_pro_be rebid = row[11] bt_pa_pro_be_sql = "select * from bt_pa_pro_be where ref_pa_record_bid='"+rebid+"' and is_normal='1'" cursor_bt_pa_pro_be=db1.cursor() cursor_bt_pa_pro_be.execute(bt_pa_pro_be_sql) bt_pa_pro_be_all_data = cursor_bt_pa_pro_be.fetchall() for detail_item in bt_pa_pro_be_all_data: pa_number = detail_item[15] detail_ref_project_bid = detail_item[13] #开始处理 bt_bi 表 if bt_fax_pa_no is not None and len(bt_fax_pa_no)>0: #增加长度判断 去掉重复 bt_bi_sql = "select * from bt_bi where bi_no = '"+bt_fax_pa_no+"' and is_normal='1'" # print(bt_bi_sql) bt_bi_cursor=db2.cursor() bt_bi_cursor.execute(bt_bi_sql) bt_bi_all_data=bt_bi_cursor.fetchall() for bt_bi_item in bt_bi_all_data: bi_date = bt_bi_item[3] #开始处理nt_prd nt_prd_sql ="select * from nt_prd where bid ='"+detail_ref_project_bid+"' and is_normal='1'" nt_prd_cursor=db3.cursor() nt_prd_cursor.execute(nt_prd_sql) nt_prd_all_data=nt_prd_cursor.fetchall() for nt_prd_item in nt_prd_all_data: bt_fax_code =nt_prd_item[12] bt_fax_name=nt_prd_item[13] bt_in_category_code=nt_prd_item[14] in_sort_code=nt_prd_item[16] insert_sql= """insert into p_result values("{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}")""".format(adm_div_code,adm_div_name,pa_year,nt_pa_vo_id,bi_date,accept_bt_code,exec_bt_name,bt_fax_proj_code,bt_fax_code,bt_fax_name,bt_in_category_code,bt_in_category_name,in_sort_code,in_sort_name,bt_fax_pa_no,act_payer_name,charge_stand_name,pb_am,pbd_am,pa_number,pa_date) content = "{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{} ".format(adm_div_code,adm_div_name,pa_year,nt_pa_vo_id,bi_date,accept_bt_code,exec_bt_name,bt_fax_proj_code,bt_fax_code,bt_fax_name,bt_in_category_code,bt_in_category_name,in_sort_code,in_sort_name,bt_fax_pa_no,act_payer_name,charge_stand_name,pb_am,pbd_am,pa_number,pa_date) # print(content) # 写入文件内容 fout = open('result', 'a+', encoding='utf8') fout.write(content) fout.close() #print(insert_sql) insert_cursor = db4.cursor() insert_cursor.execute(insert_sql) db4.commit() print(time.asctime( time.localtime(time.time()) ))