zoukankan      html  css  js  c++  java
  • python各模块组合实例

    # encoding: utf-8
    import requests
    import time
    from Crypto.Cipher import AES
    import base64
    import hashlib
    import json
    from queue import Queue
    import threading
    import xlrd
    import yaml
    import logging.config
    import os
    import csv
    
    class TokenHandler(object):
    
        appid = 'appidappidappid'
        secret = 'secretsecretsecretsecret'
    
        def __init__(self):
            pass
    
        def get_token(self):
            url = 'http://1.1.1.1/api/token'
            current_time = str(int(time.time()))
            params = {
                'appid' : TokenHandler.appid,
                'secret': TokenHandler.secret,
                'time' : current_time,
                'sign' : ''
            }
            list_sign_text = []
            #请求参数除sign外按key正序排序,并拼接成key=value形式的字符串
            for sorted_key in sorted(params.keys()):
                if sorted_key != 'sign':
                    list_sign_text.append(sorted_key + '=' + params.get(sorted_key))
            #两个key=value之间用||连接
            sign_text = '||'.join(list_sign_text)
            logging.info('sign_text: ' + sign_text)
            #给sign_text做sha256,然后再放回params里面
            sha256_x = hashlib.sha256()
            sha256_x.update(sign_text.encode('utf-8'))
            sign_text_sha = sha256_x.hexdigest()
            logging.info(sign_text_sha)
            params['sign'] = sign_text_sha
            logging.info('params: ' + str(params))
            #发送请求
            res = requests.post(url=url,params=params)
            response_text = res.content.decode('unicode_escape')
            logging.info(response_text)
            res_dict = json.loads(response_text)
            token = res_dict.get('data').get('token')
            logging.info(token)
            return token
    
        def logout(self):
            pass
    
    class QueryData(object):
        """
        user_info_queue队列的消费者,
        result_queue队列的生产者
        当user_info_queue队列为空时往result_queue队列塞入结束标记
        completed_user用于存放已处理过的用户,如果已处理则跳过
        """
        appid = 'appidappidappid'
        secret = 'secretsecretsecretsecret'
        appkey = 'appkeyappkeyappkey'
        iv = 'appkeyappkeyappkey'
        token = ''
        completed_user = []
        # user_info_queue = Queue()
        # result_queue = Queue()
    
        def __init__(self):
            pass
    
        def do_query(self):
            while not Queues.user_info_queue.empty():
                user_info_dict = Queues.user_info_queue.get()
                name = user_info_dict.get('name')
                phone = user_info_dict.get('phone')
                idcard = user_info_dict.get('idcard')
                if idcard not in QueryData.completed_user:
                    url = 'http://1.1.1.1/api/package/b'
                    current_time = str(int(time.time()))
                    params = {
                        'token': QueryData.token,
                        'name': Encryptor.encrypt(name,QueryData.appkey,QueryData.iv),
                        'phone': Encryptor.encrypt(phone,QueryData.appkey,QueryData.iv),
                        'id_card':Encryptor.encrypt(idcard,QueryData.appkey,QueryData.iv),
                        'order_id': 'a123',
                        'time': current_time,
                        'sign': ''
                    }
                    list_sign_text = []
                    # 请求参数除sign和token外按key正序排序,并拼接成key=value形式的字符串
                    for sorted_key in sorted(params.keys()):
                        if sorted_key != 'sign' and sorted_key != 'token':
                            list_sign_text.append(sorted_key + '=' + params.get(sorted_key))
                    # 两个key=value之间用||连接
                    sign_text = '||'.join(list_sign_text)
                    logging.info('sign_text: ' + sign_text)
                    sha256_x = hashlib.sha256()
                    sha256_x.update(sign_text.encode('utf-8'))
                    sign_text_sha = sha256_x.hexdigest()
                    logging.info(sign_text_sha)
                    params['sign'] = sign_text_sha
                    logging.info('params: ' + str(params))
                    res = requests.post(url=url, params=params)
                    response_text = res.content.decode('unicode_escape')
                    logging.info(response_text)
                    res_dict = json.loads(response_text)
                    #token过期刷新token
                    if res_dict['errno'] == 106:
                        QueryData.token = TokenHandler().get_token()
                    if res_dict['errno'] == 0:
                        cipher_data = res_dict.get('data')
                        plaintext_data = Encryptor.decrypt(cipher_data,QueryData.appkey,QueryData.iv)
                        res_plaintext_data_dict = plaintext_data.decode("unicode_escape")
                        logging.info(res_plaintext_data_dict)
                        res_plaintext_data_dict['idcard'] =idcard
                        #将结果组装成字典塞到result队列
                        logging.info(res_plaintext_data_dict)
                        Queues.result_queue.put(res_plaintext_data_dict)
                    else:
                        logging.info(("请注意,这个身份证号查询失败了",idcard))
                        error_dict_res = {}
                        error_dict_res['idcard'] = idcard
                        error_dict_res['response'] = res_dict
                        err_headers = list(error_dict_res.keys())
                        logging.info(("headers", err_headers))
                        if not os.path.exists('./error_idcard.csv'):
                            with open('error_idcard.csv', mode='x',encoding='utf-8', newline='') as error_idcard_csvfile_cw:
                                dict_writer = csv.DictWriter(error_idcard_csvfile_cw, err_headers)
                                dict_writer.writeheader()
                        with open('error_idcard.csv', mode='a',encoding='utf-8', newline='') as error_idcard_csvfile_a:
                            dict_writer = csv.DictWriter(error_idcard_csvfile_a, err_headers)
                            dict_writer.writerow(error_dict_res)
                else:
                    logging.info(idcard + "已经查询过了,结果在result.csv中,跳过。。。")
            logging.info('user_info_queue is empty!')
            #如果user_info队列为空则塞个标志进去
            Queues.result_queue.put({'is_finished':True})
    
    class ResultWriter(object):
        """
        result_queue消费者,负责将结果写入文件,并统计结束标记,如果结束标记大于等于生产者个数则结束文件写入
        """
        # result_queue = Queue()
        def __init__(self):
            pass
    
        def do_write(self):
            finished_thread_counter = 0
            sample_data = {"idcard": "110101198709292519", "is_high_risk_user": "",
                           "last_visit_dt": "2018-09-21 16:12:20", "30d_overdue_cnt": "", "his_overdue_amt": "较高",
                           "last_overdue_dt": "2018-09-21", "last_overdue_amt": "较低", "curr_overdue_amt": "",
                           "curr_overdue_days": "", "first_overdue_dt": "2018-08-17", "first_overdue_amt": "",
                           "last_repay_tm": "2018-09-21 16:24:27", "repay_times": "", "curr_debt_product_cnt": "",
                           "total_in_order_cnt": "", "total_in_order_amt": "极高"}
            headers = list(sample_data.keys())
            logging.info(("headers", headers))
            if not os.path.exists('./result.csv'):
                with open('result.csv', mode='x',encoding='utf-8', newline='') as result_csvfile_x:
                    dict_writer = csv.DictWriter(result_csvfile_x, headers)
                    dict_writer.writeheader()
            with open('result.csv', mode='a',encoding='utf-8', newline='') as ResultWriter_a:
                dict_writer = csv.DictWriter(ResultWriter_a, headers)
                while 1:
                    if Queues.result_queue.empty():
                        time.sleep(1)
                    else:
                        res_dict = Queues.result_queue.get()
                        if 'is_finished' in res_dict:
                            finished_thread_counter += 1
                        else:
                            dict_writer.writerow(res_dict)
                    logging.info('finished_thread_counter:' + str(finished_thread_counter))
                    if finished_thread_counter >= 10:
                        break
    
    class Encryptor(object):
        @staticmethod
        def encrypt(text, key, iv):
            if type(text) is str:
                text = text.encode(encoding='utf_8', errors='strict')
            if type(key) is str:
                key = key.encode(encoding='utf_8', errors='strict')
            if type(iv) is str:
                iv = iv.encode(encoding='utf_8', errors='strict')
            if len(text) % 16:
                text += (chr(16 - len(text) % 16) * (16 - len(text) % 16)).encode(encoding='utf_8', errors='strict')
            text = base64.b64encode(s=AES.new(key, mode=AES.MODE_CBC, IV=iv).encrypt(text),
                                    altchars=None).decode(encoding='utf_8', errors='strict')
            return text
    
        @staticmethod
        def decrypt(cipher_text, key, iv):
            if type(key) is str:
                key = key.encode(encoding='utf-8', errors='strict')
            if type(iv) is str:
                iv = iv.encode(encoding='utf-8', errors='strict')
            if type(cipher_text) is str:
                cipher_text_bytes = base64.b64decode(cipher_text)
            # todo aes解密
            plaintext_bytes = AES.new(key, mode=AES.MODE_CBC, IV=iv).decrypt(cipher_text_bytes)
            # todo 去填充字节
            for i in range(1, 17):
                plaintext_bytes = plaintext_bytes.rstrip(chr(i).encode(encoding='utf-8', errors='strict'))
            plaintext = plaintext_bytes.decode(encoding='utf-8', errors='strict')
            return plaintext
    
    class QueryThread(threading.Thread):
        def __init__(self,name):
            threading.Thread.__init__(self)
            self.name = name
    
        def run(self):
            logging.info(self.name + 'start...')
            QueryData().do_query()
            logging.info(self.name + 'completed!')
    
    class WriteResultThread(threading.Thread):
        def __init__(self,name):
            threading.Thread.__init__(self)
            self.name = name
    
        def run(self):
            logging.info(self.name + 'start...')
            ResultWriter().do_write()
            logging.info(self.name + 'completed!')
    
    class Queues(object):
        user_info_queue = Queue()
        result_queue = Queue()
    
    def setup_logging(default_path = "log_config.yaml",default_level = logging.INFO,env_key = "LOG_CFG"):
        path = default_path
        value = os.getenv(env_key,None)
        if value:
            path = value
        if os.path.exists(path):
            with open(path,"r") as f:
                config = yaml.load(f)
                logging.config.dictConfig(config)
        else:
            logging.basicConfig(level = default_level)
    
    if __name__ == '__main__':
        """运行前先安装pycryptodome==3.7.0
        pip install pycryptodome
        """
        setup_logging()
        test = TokenHandler()
        QueryData.token = test.get_token()
        user_info_queue = Queue(maxsize=10001)
        result_queue = Queue(maxsize=10001)
    
        #todo 吧excel中数据读出来组装成字典放到队列user_info_queue中
        book = xlrd.open_workbook('testdata_1.xlsx')
        sheet = book.sheet_by_index(0)  # 根据顺序获取sheet
        for i in range(1,sheet.nrows ):
            user_info_dict = {}
            if len(sheet.cell(i,0).value) > 0 :
                user_info_dict['name'] = sheet.cell(i,0).value.strip()
                user_info_dict['idcard'] = sheet.cell(i,1).value.strip()
                user_info_dict['phone'] = str(int(sheet.cell(i,2).value)).strip()
            logging.info(user_info_dict)
            user_info_queue.put(user_info_dict)
    
        #todo 把result.csv中的idcard内容读取出来放到QueryData.completed_user
        sample_data = {"idcard": "110101198709292519", "is_high_risk_user": "",
                       "last_visit_dt": "2018-09-21 16:12:20", "30d_overdue_cnt": "", "his_overdue_amt": "较高",
                       "last_overdue_dt": "2018-09-21", "last_overdue_amt": "较低", "curr_overdue_amt": "",
                       "curr_overdue_days": "", "first_overdue_dt": "2018-08-17", "first_overdue_amt": "",
                       "last_repay_tm": "2018-09-21 16:24:27", "repay_times": "", "curr_debt_product_cnt": "",
                       "total_in_order_cnt": "", "total_in_order_amt": "极高"}
        headers = list(sample_data.keys())
        logging.info(("headers", headers))
        if not os.path.exists('./result.csv'):
            with open('result.csv', mode='x', encoding='utf-8', newline='') as result_csvfile_x:
                dict_writer = csv.DictWriter(result_csvfile_x, headers)
                dict_writer.writeheader()
        with open('result.csv') as f:
            reader = csv.DictReader(f)
            list_completed_user_idcard = []
            for row in reader:
                if 'idcard' in row:
                    logging.info(('get completed_user idcard',row['idcard']))
                    list_completed_user_idcard.append(row['idcard'])
            logging.info(('these idcard will skiped ', list_completed_user_idcard))
            QueryData.completed_user = list_completed_user_idcard
    
        Queues.user_info_queue = user_info_queue
        Queues.result_queue = result_queue
        for i in range(10):
            thread_q = QueryThread("queryThread" + str(i))
            thread_q.start()
    
        thread_w = WriteResultThread("writeResuleThread" )
        thread_w.start()
    View Code
    version: 1
    disable_existing_loggers: False
    formatters:
            simple:
                format: "%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(lineno)d - %(message)s"
    handlers:
        console:
                class: logging.StreamHandler
                level: DEBUG
                formatter: simple
                stream: ext://sys.stdout
        info_file_handler:
                class: logging.handlers.RotatingFileHandler
                level: INFO
                formatter: simple
                filename: info.log
                maxBytes: 10485760
                backupCount: 20
                encoding: utf8
        error_file_handler:
                class: logging.handlers.RotatingFileHandler
                level: ERROR
                formatter: simple
                filename: errors.log
                maxBytes: 10485760
                backupCount: 20
                encoding: utf8
    loggers:
        my_module:
                level: ERROR
                handlers: [info_file_handler]
                propagate: no
    root:
        level: INFO
        handlers: [console,info_file_handler,error_file_handler]
    log_config.yaml
  • 相关阅读:
    YAML 语法小结
    小程序之脚本语言
    小程序WXML 使用小结
    微信小程序 js逻辑
    小程序开发1
    联想Y7000安装Ubuntu16.04/Win10双系统,wifi问题,显卡驱动和CUDA10安装
    VS2015中配置Eigen
    联想Y700安装显卡驱动和CUDA8.0
    php微信生成微信公众号二维码扫描进入公众号带参数
    Y7000 (1)安装ubuntu1604遇到的问题
  • 原文地址:https://www.cnblogs.com/xiaodebing/p/9883953.html
Copyright © 2011-2022 走看看