zoukankan      html  css  js  c++  java
  • MySql 实例化

    MySql公共方法

    #!/usr/bin/env python
    # coding:utf-8

    import pymysql
    from utils.logger import log
    from config.env.pathconfig import ENV_CONFIG_PATH, API_YML_PATH
    from utils.common import load_config, load_yml
    import datetime
    import decimal
    from utils.DB import DB

    db = DB()
    sql_data = load_yml(API_YML_PATH)


    class MySql(object):
    def __init__(self, env=''):
    db_info = load_config(ENV_CONFIG_PATH)[env]
    # 数据库初始化
    self.host = db_info['host']
    self.port = int(db_info['port'])
    self.user = db_info['user']
    self.passwd = db_info['passwd']
    self.database = db_info['database']
    self.con = None
    self.active = False

    def _start(self):
    '''
    connect database if it's not active
    :return:
    '''
    if not self.active:
    self.connect()

    def execute_sql(self, sql):
    user_list = db.query_all(sql_data['delete_db_user'])
    if 'delete' in sql:
    for i in user_list:
    data_base = i['database_list'].split(',')
    for j in data_base:
    if j in sql:
    log.info('替换用户名')
    self.user = i['username']
    self.passwd = i['passwd']
    self._start() # connect db if it's not active
    self.con.automommit = True
    log.info('execute sql {0}'.format(sql))

    if sql.startswith('select') or sql.startswith('SELECT'):
    result = self.query_all(sql)
    return result
    elif sql.startswith('drop') or sql.startswith('DROP'):
    return None, '禁止使用drop命令'
    elif sql.startswith('create') or sql.startswith('CREATE'):
    return None, '禁止使用create命令'
    else:
    result = self.change_datas(sql)
    return result
    # self.closes()

    def fetch_data(self, sql, limited_row):
    data = []
    cursors = self.con.cursor()
    log.info('execute query all sql: {0}'.format(sql))
    try:
    cursors.execute(sql)
    if not limited_row:
    resu = cursors.fetchone()
    data.append(resu)
    else:
    data = cursors.fetchmany(int(limited_row))
    cursors.close()
    log.info('query all result: {0}'.format(data))
    return data, 'sql操作执行成功!'
    except Exception as e:
    log.error('query failed with error: {0}'.format(e))
    return None, 'sql error! {}'.format(e)

    def query_all(self, sql):
    result = []
    cursors = self.con.cursor()
    log.info('execute query all sql: {0}'.format(sql))
    try:
    cursors.execute(sql)
    data = cursors.fetchall()
    for i in data:
    for k, j in i.items():
    if isinstance(j, datetime.datetime):
    i[k] = datetime.datetime.strftime(
    j, '%Y-%m-%d %H:%M:%S')
    if isinstance(j, decimal.Decimal):
    i[k] = str(j)
    result.append(i)
    cursors.close()
    log.info('query all result: {0}'.format(data))
    return result, 'sql操作执行成功!'
    except Exception as e:
    log.error('query failed with error: {0}'.format(e))
    return None, 'sql error! {}'.format(e)

    def change_datas(self, sql):
    # 增,删,改
    cursors = self.con.cursor()
    log.info('update database with sql: {0}'.format(sql))
    try:
    effect_row = cursors.execute(sql)
    self.con.commit() # commit update
    cursors.close() # close cursor
    log.info('inert into result: {0}'.format(effect_row))
    return effect_row, 'sql操作执行成功!'
    except Exception as e:
    log.error('inert into failed with error: {0}'.format(e))
    self.con.rollback() # rollback if update failed
    return None, 'sql error! {}'.format(e)

    def execute_batch_import_sql(self, sql, values):
    try:
    self._start()
    cursors = self.con.cursor()
    result = cursors.executemany(sql, values)
    cursors.close()
    self.con.commit()
    return result
    except Exception as err:
    log.error('import failed with error: {0}'.format(err))
    return None

    def connect(self):
    # 连接数据库
    try:
    connection = pymysql.connect(
    host=self.host,
    port=int(self.port),
    user=self.user,
    passwd=self.passwd,
    database=self.database,
    cursorclass=pymysql.cursors.DictCursor,
    charset='utf8',
    use_unicode=True)
    log.info('{0} :database is connecting successful'.format(
    self.host))
    self.con = connection
    except Exception as e:
    log.error('{0} :database is connecting failed : {1}'.format(
    self.host, e))

    def closes(self):
    # 关闭数据库连接
    try:
    self.active = False
    self.con.close()
    log.info('database is closed: {0}'.format(self.host))
    except Exception as e:
    self.active = False
    log.error('database closed failed with error: {0}'.format(e))
    raise 'server is error {0}'.format(e)





  • 相关阅读:
    策略模式
    模板方法模式

    kafka
    Linux下部署MongoDB
    Linux下安装ssdb
    ssdb常用知识点
    Eclipse 的 Java Web 项目环境搭建
    PLSQL连接Oracle
    redis书籍
  • 原文地址:https://www.cnblogs.com/dearddu/p/12457219.html
Copyright © 2011-2022 走看看