# encoding: utf-8
import requests
import time
from Crypto.Cipher import AES
import base64
import hashlib
import json
from queue import Queue
import threading
import xlrd
import yaml
import logging.config
import os
import csv
class TokenHandler(object):
appid = 'appidappidappid'
secret = 'secretsecretsecretsecret'
def __init__(self):
pass
def get_token(self):
url = 'http://1.1.1.1/api/token'
current_time = str(int(time.time()))
params = {
'appid' : TokenHandler.appid,
'secret': TokenHandler.secret,
'time' : current_time,
'sign' : ''
}
list_sign_text = []
#请求参数除sign外按key正序排序,并拼接成key=value形式的字符串
for sorted_key in sorted(params.keys()):
if sorted_key != 'sign':
list_sign_text.append(sorted_key + '=' + params.get(sorted_key))
#两个key=value之间用||连接
sign_text = '||'.join(list_sign_text)
logging.info('sign_text: ' + sign_text)
#给sign_text做sha256,然后再放回params里面
sha256_x = hashlib.sha256()
sha256_x.update(sign_text.encode('utf-8'))
sign_text_sha = sha256_x.hexdigest()
logging.info(sign_text_sha)
params['sign'] = sign_text_sha
logging.info('params: ' + str(params))
#发送请求
res = requests.post(url=url,params=params)
response_text = res.content.decode('unicode_escape')
logging.info(response_text)
res_dict = json.loads(response_text)
token = res_dict.get('data').get('token')
logging.info(token)
return token
def logout(self):
pass
class QueryData(object):
"""
user_info_queue队列的消费者,
result_queue队列的生产者
当user_info_queue队列为空时往result_queue队列塞入结束标记
completed_user用于存放已处理过的用户,如果已处理则跳过
"""
appid = 'appidappidappid'
secret = 'secretsecretsecretsecret'
appkey = 'appkeyappkeyappkey'
iv = 'appkeyappkeyappkey'
token = ''
completed_user = []
# user_info_queue = Queue()
# result_queue = Queue()
def __init__(self):
pass
def do_query(self):
while not Queues.user_info_queue.empty():
user_info_dict = Queues.user_info_queue.get()
name = user_info_dict.get('name')
phone = user_info_dict.get('phone')
idcard = user_info_dict.get('idcard')
if idcard not in QueryData.completed_user:
url = 'http://1.1.1.1/api/package/b'
current_time = str(int(time.time()))
params = {
'token': QueryData.token,
'name': Encryptor.encrypt(name,QueryData.appkey,QueryData.iv),
'phone': Encryptor.encrypt(phone,QueryData.appkey,QueryData.iv),
'id_card':Encryptor.encrypt(idcard,QueryData.appkey,QueryData.iv),
'order_id': 'a123',
'time': current_time,
'sign': ''
}
list_sign_text = []
# 请求参数除sign和token外按key正序排序,并拼接成key=value形式的字符串
for sorted_key in sorted(params.keys()):
if sorted_key != 'sign' and sorted_key != 'token':
list_sign_text.append(sorted_key + '=' + params.get(sorted_key))
# 两个key=value之间用||连接
sign_text = '||'.join(list_sign_text)
logging.info('sign_text: ' + sign_text)
sha256_x = hashlib.sha256()
sha256_x.update(sign_text.encode('utf-8'))
sign_text_sha = sha256_x.hexdigest()
logging.info(sign_text_sha)
params['sign'] = sign_text_sha
logging.info('params: ' + str(params))
res = requests.post(url=url, params=params)
response_text = res.content.decode('unicode_escape')
logging.info(response_text)
res_dict = json.loads(response_text)
#token过期刷新token
if res_dict['errno'] == 106:
QueryData.token = TokenHandler().get_token()
if res_dict['errno'] == 0:
cipher_data = res_dict.get('data')
plaintext_data = Encryptor.decrypt(cipher_data,QueryData.appkey,QueryData.iv)
res_plaintext_data_dict = plaintext_data.decode("unicode_escape")
logging.info(res_plaintext_data_dict)
res_plaintext_data_dict['idcard'] =idcard
#将结果组装成字典塞到result队列
logging.info(res_plaintext_data_dict)
Queues.result_queue.put(res_plaintext_data_dict)
else:
logging.info(("请注意,这个身份证号查询失败了",idcard))
error_dict_res = {}
error_dict_res['idcard'] = idcard
error_dict_res['response'] = res_dict
err_headers = list(error_dict_res.keys())
logging.info(("headers", err_headers))
if not os.path.exists('./error_idcard.csv'):
with open('error_idcard.csv', mode='x',encoding='utf-8', newline='') as error_idcard_csvfile_cw:
dict_writer = csv.DictWriter(error_idcard_csvfile_cw, err_headers)
dict_writer.writeheader()
with open('error_idcard.csv', mode='a',encoding='utf-8', newline='') as error_idcard_csvfile_a:
dict_writer = csv.DictWriter(error_idcard_csvfile_a, err_headers)
dict_writer.writerow(error_dict_res)
else:
logging.info(idcard + "已经查询过了,结果在result.csv中,跳过。。。")
logging.info('user_info_queue is empty!')
#如果user_info队列为空则塞个标志进去
Queues.result_queue.put({'is_finished':True})
class ResultWriter(object):
"""
result_queue消费者,负责将结果写入文件,并统计结束标记,如果结束标记大于等于生产者个数则结束文件写入
"""
# result_queue = Queue()
def __init__(self):
pass
def do_write(self):
finished_thread_counter = 0
sample_data = {"idcard": "110101198709292519", "is_high_risk_user": "无",
"last_visit_dt": "2018-09-21 16:12:20", "30d_overdue_cnt": "无", "his_overdue_amt": "较高",
"last_overdue_dt": "2018-09-21", "last_overdue_amt": "较低", "curr_overdue_amt": "无",
"curr_overdue_days": "无", "first_overdue_dt": "2018-08-17", "first_overdue_amt": "高",
"last_repay_tm": "2018-09-21 16:24:27", "repay_times": "高", "curr_debt_product_cnt": "无",
"total_in_order_cnt": "高", "total_in_order_amt": "极高"}
headers = list(sample_data.keys())
logging.info(("headers", headers))
if not os.path.exists('./result.csv'):
with open('result.csv', mode='x',encoding='utf-8', newline='') as result_csvfile_x:
dict_writer = csv.DictWriter(result_csvfile_x, headers)
dict_writer.writeheader()
with open('result.csv', mode='a',encoding='utf-8', newline='') as ResultWriter_a:
dict_writer = csv.DictWriter(ResultWriter_a, headers)
while 1:
if Queues.result_queue.empty():
time.sleep(1)
else:
res_dict = Queues.result_queue.get()
if 'is_finished' in res_dict:
finished_thread_counter += 1
else:
dict_writer.writerow(res_dict)
logging.info('finished_thread_counter:' + str(finished_thread_counter))
if finished_thread_counter >= 10:
break
class Encryptor(object):
@staticmethod
def encrypt(text, key, iv):
if type(text) is str:
text = text.encode(encoding='utf_8', errors='strict')
if type(key) is str:
key = key.encode(encoding='utf_8', errors='strict')
if type(iv) is str:
iv = iv.encode(encoding='utf_8', errors='strict')
if len(text) % 16:
text += (chr(16 - len(text) % 16) * (16 - len(text) % 16)).encode(encoding='utf_8', errors='strict')
text = base64.b64encode(s=AES.new(key, mode=AES.MODE_CBC, IV=iv).encrypt(text),
altchars=None).decode(encoding='utf_8', errors='strict')
return text
@staticmethod
def decrypt(cipher_text, key, iv):
if type(key) is str:
key = key.encode(encoding='utf-8', errors='strict')
if type(iv) is str:
iv = iv.encode(encoding='utf-8', errors='strict')
if type(cipher_text) is str:
cipher_text_bytes = base64.b64decode(cipher_text)
# todo aes解密
plaintext_bytes = AES.new(key, mode=AES.MODE_CBC, IV=iv).decrypt(cipher_text_bytes)
# todo 去填充字节
for i in range(1, 17):
plaintext_bytes = plaintext_bytes.rstrip(chr(i).encode(encoding='utf-8', errors='strict'))
plaintext = plaintext_bytes.decode(encoding='utf-8', errors='strict')
return plaintext
class QueryThread(threading.Thread):
def __init__(self,name):
threading.Thread.__init__(self)
self.name = name
def run(self):
logging.info(self.name + 'start...')
QueryData().do_query()
logging.info(self.name + 'completed!')
class WriteResultThread(threading.Thread):
def __init__(self,name):
threading.Thread.__init__(self)
self.name = name
def run(self):
logging.info(self.name + 'start...')
ResultWriter().do_write()
logging.info(self.name + 'completed!')
class Queues(object):
user_info_queue = Queue()
result_queue = Queue()
def setup_logging(default_path = "log_config.yaml",default_level = logging.INFO,env_key = "LOG_CFG"):
path = default_path
value = os.getenv(env_key,None)
if value:
path = value
if os.path.exists(path):
with open(path,"r") as f:
config = yaml.load(f)
logging.config.dictConfig(config)
else:
logging.basicConfig(level = default_level)
if __name__ == '__main__':
"""运行前先安装pycryptodome==3.7.0
pip install pycryptodome
"""
setup_logging()
test = TokenHandler()
QueryData.token = test.get_token()
user_info_queue = Queue(maxsize=10001)
result_queue = Queue(maxsize=10001)
#todo 吧excel中数据读出来组装成字典放到队列user_info_queue中
book = xlrd.open_workbook('testdata_1.xlsx')
sheet = book.sheet_by_index(0) # 根据顺序获取sheet
for i in range(1,sheet.nrows ):
user_info_dict = {}
if len(sheet.cell(i,0).value) > 0 :
user_info_dict['name'] = sheet.cell(i,0).value.strip()
user_info_dict['idcard'] = sheet.cell(i,1).value.strip()
user_info_dict['phone'] = str(int(sheet.cell(i,2).value)).strip()
logging.info(user_info_dict)
user_info_queue.put(user_info_dict)
#todo 把result.csv中的idcard内容读取出来放到QueryData.completed_user
sample_data = {"idcard": "110101198709292519", "is_high_risk_user": "无",
"last_visit_dt": "2018-09-21 16:12:20", "30d_overdue_cnt": "无", "his_overdue_amt": "较高",
"last_overdue_dt": "2018-09-21", "last_overdue_amt": "较低", "curr_overdue_amt": "无",
"curr_overdue_days": "无", "first_overdue_dt": "2018-08-17", "first_overdue_amt": "高",
"last_repay_tm": "2018-09-21 16:24:27", "repay_times": "高", "curr_debt_product_cnt": "无",
"total_in_order_cnt": "高", "total_in_order_amt": "极高"}
headers = list(sample_data.keys())
logging.info(("headers", headers))
if not os.path.exists('./result.csv'):
with open('result.csv', mode='x', encoding='utf-8', newline='') as result_csvfile_x:
dict_writer = csv.DictWriter(result_csvfile_x, headers)
dict_writer.writeheader()
with open('result.csv') as f:
reader = csv.DictReader(f)
list_completed_user_idcard = []
for row in reader:
if 'idcard' in row:
logging.info(('get completed_user idcard',row['idcard']))
list_completed_user_idcard.append(row['idcard'])
logging.info(('these idcard will skiped ', list_completed_user_idcard))
QueryData.completed_user = list_completed_user_idcard
Queues.user_info_queue = user_info_queue
Queues.result_queue = result_queue
for i in range(10):
thread_q = QueryThread("queryThread" + str(i))
thread_q.start()
thread_w = WriteResultThread("writeResuleThread" )
thread_w.start()