配置文件:
1 [local_db] 2 host=127.0.0.1 3 port=3306 4 user=root 5 password=123456 6 database=userinfo
配置文件解析:
import copy import configparser conf_file = "conf" class ConfParser(): def __init__(self, filename, section=None, option=None): self.conf_file = filename self.conf_parser = configparser.ConfigParser() self._parser() def _parser(self): self.conf_parser.read(self.conf_file) def sections(self): return self.conf_parser.sections() def options(self, section): return self.conf_parser.options(section) def get_section_value(self, section, keys=None): options = self.conf_parser.options(section) key_value = {option: self.conf_parser.get(section, option) for option in options} result = copy.deepcopy(key_value) if keys: result = {i: key_value.get(i, None) for i in keys} return result if __name__ == '__main__': parser = ConfParser(filename=conf_file) print(parser.get_section_value('local_db'))
日志文件:
日志分段配置类在logging.handler中
import logging from logging.handlers import RotatingFileHandler class Logger(object): def __init__(self, name=None, level=logging.DEBUG): # (1)创建logger对象 self._logger = logging.getLogger(name=name) self._logger.setLevel(level=level) # (2)定义默认日志格式 self._fmt = logging.Formatter( '[%(asctime)s %(name)s-%(filename)s[%(levelname)s]-%(process)d:%(threadName)s-%(thread)d]:%(message)s') def create_log(self, filename, fh_fmt=None, fh_level=logging.DEBUG, sh_level=None, sh_fmt=None): self.file_handler(filename, fh_fmt, fh_level) if sh_level: self.stream_handler(sh_fmt, sh_level) return self._logger def file_handler(self, filename, fmt=None, level=logging.DEBUG): level = getattr(logging, level.upper()) if type(level) is str else level # (3)1日志操作符(文件) # fh = logging.FileHandler(filename=filename, encoding='utf-8') fh = RotatingFileHandler(filename, maxBytes=1024*1024, backupCount=2) fh.setFormatter(fmt=fmt if fmt else self._fmt) fh.setLevel(level=level) self._logger.addHandler(fh) return self._logger def stream_handler(self, fmt=None, level=logging.INFO): level = getattr(logging, level.upper()) if type(level) is str else level # (3)2日志操作符(屏幕) sh = logging.StreamHandler() sh.setFormatter(fmt=fmt if fmt else self._fmt) sh.setLevel(level=level) self._logger.addHandler(sh) return self._logger if __name__ == '__main__': filename = 'test.log' log_obj = Logger(name='test_log') logger = log_obj.create_log(filename, sh_level='debug') for i in range(100): logger.debug(msg=f'{i}debug') logger.warning(msg=f'{i}warning') logger.info(msg=f'{i}info') logger.error(msg=f'{i}error') logger.critical(msg=f'{i}critical')
数据库类:
import pymysql from log import Logger from conf_parse import ConfParser log_obj = Logger(name='test_log') log = log_obj.create_log(filename='test.log', sh_level='info') class DbConnection(): def __init__(self, host, user, password, database, port=3306, cursorclass=pymysql.cursors.Cursor, autocommit=False, **kwargs): """ :param host: 主机ip :param user: 用户名 :param password: 密码 :param database: 数据库 :param port: 端口号 :param cursorclass: 游标类(输出结果样式) :param autocommit: 是否自动提交 :param kwargs: 其它参数 """ __kwargs = {"host": host, "port": int(port), "user": user, "password": password, "database": database, "cursorclass": cursorclass, "autocommit": autocommit} self._reconnect(**__kwargs, **kwargs) def _reconnect(self, **kwargs): self.close() self._db_conn = pymysql.connect(**kwargs) # 是否自动提交,可以再实例化建立连接时指定参数autocommit,后续在写操作时无需commit() # self._db_conn.autocommit(value=True) def close(self): if getattr(self, '_db_conn', None) is not None: self._db_conn.close() self._db_conn = None def write(self, sql): """ :param sql: DDL>>insert, update, delete,DML:create;DCL>>grant, remove :return: None """ _cursor = self._db_conn.cursor() thread_id = self._db_conn.thread_id() try: _cursor.execute(sql) # 事务提交,autocommit=True自动开启 self._db_conn.commit() except Exception as e: self._db_conn.rollback() log.error('write fail[error:%s]:[Thread-%s]%s' % (e, thread_id, sql)) else: log.debug('write success:[Thread-%s]%s' % (thread_id, sql)) finally: _cursor.close() def read(self, sql, allinfo=True, size=None): """ :param sql:DQL>>select; :param all: False--fetchone, True--fetchall() :param size: size--int :return: """ result = None _cursor = self._db_conn.cursor() thread_id = self._db_conn.thread_id() try: _cursor.execute(sql) except Exception as e: log.error('read fail[error:%s]:[Thread-%s]%s' % (e, thread_id, sql)) else: log.debug('read success:[Thread-%s]%s' % (thread_id, sql)) if size: result = _cursor.fetchmany(size) elif not allinfo: result = _cursor.fetchone() elif allinfo: result = _cursor.fetchall() finally: _cursor.close() return result if __name__ == '__main__': conf_file = "conf" parser = ConfParser(filename=conf_file) db_conf = parser.get_section_value('local_db') db_conn = DbConnection(**db_conf, cursorclass=pymysql.cursors.DictCursor) sql = "select * from username;" ret = db_conn.read(sql, allinfo=True) # 插入数据 # sql = "insert into username values('c',14),('d',15);" # db_conn.write(sql) sql = "select * from username;" ret = db_conn.read(sql) print(ret) sql = "select * from username;" ret = db_conn.read(sql,size=3) print(ret) sql = "select * from username;" ret = db_conn.read(sql, allinfo=False) print(ret) db_conn.close()