zoukankan      html  css  js  c++  java
  • pymysql DAO简单封装

    import pymysql
    import logging
    
    logger = logging.getLogger('app')
    
    
    class MysqlBase:
        def __init__(self, **args):
            self.host = args.get('host')
            self.user = args.get('user')
            self.pwd = args.get('pwd')
            self.db = args.get('db')
            self.port = int(args.get('port', 3306))
            self.charset = args.get('charset', 'utf8')
    
        def __enter__(self):
            try:
                self.conn = pymysql.connect(
                    host=self.host,
                    user=self.user,
                    passwd=self.pwd,
                    db=self.db,
                    charset=self.charset,
                    port=self.port
                )
                self.cur = self.conn.cursor()
                return self
            except Exception as e:
                raise ValueError('mysql connect error: {}'.format((self.host, e)))
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.cur.close()
            self.conn.close()
    
        def query(self, sql, args=None):
            try:
                self.cur.execute(sql, args=args)  # 执行sql语句
                res = self.cur.fetchall()  # 获取查询的所有记录
            except Exception as e:
                logger.error("query error: {}".format(e))
                print("query error: {}".format(e))
                raise e
            return res
    
        def update(self, sql):
            effect_rows = 0
            try:
                effect_rows = self.cur.execute(sql)
                # 提交
                self.conn.commit()
            except Exception as e:
                # 错误回滚
                self.conn.rollback()
                logger.error("update error: {}".format(e))
                print("update error: {}".format(e))
            return effect_rows
    
    
    if __name__ == "__main__":
        with MysqlBase(host='10.51.1.37', port=3306, user='root', pwd='ysyhl9T!') as db:
            res = db.query(
                "select user,authentication_string from mysql.user where user='whisky2' group by authentication_string;")
            print(res)
    #!/usr/bin/env python
    # -*-coding:utf-8-*-
    '''
    role   : mysql操作
    '''
    
    import pymysql
    from opssdk.logs import Log
    
    
    class MysqlBase:
        def __init__(self, **args):
            self.host = args.get('host')
            self.user = args.get('user')
            self.pswd = args.get('passwd')
            self.db = args.get('db')
            self.port = int(args.get('port', 3306))
            self.charset = args.get('charset', 'utf8')
    
            log_path = '/log/yunwei/yunwei_mysql.log'
            self.log_ins = Log('111', log_path)
            try:
                self.conn = pymysql.connect(host=self.host, user=self.user,
                                            password=self.pswd, db=self.db, port=self.port, charset=self.charset)
                self.cur = self.conn.cursor()
            except:
                raise ValueError('mysql connect error {0}'.format(self.host))
    
        ###释放资源
        def __del__(self):
            self.cur.close()
            self.conn.close()
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.cur.close()
            self.conn.close()
    
        ### 查询
        def query(self, sql):
            try:
                self.cur.execute(sql)  # 执行sql语句
                res = self.cur.fetchall()  # 获取查询的所有记录
            except Exception as e:
                self.log_ins.write_log("error", e)
                raise e
    
            return res
    
        def change(self, sql):
            resnum = 0
            try:
                resnum = self.cur.execute(sql)
                # 提交
                self.conn.commit()
            except Exception as e:
                # 错误回滚
                self.log_ins.write_log("error", e)
                self.conn.rollback()
            return resnum
    #!/usr/bin/env python
    # -*-coding:utf-8 -*-
    #
    #  无法执行多个query,self.conn.close()放在CdbConn类的单独函数中,每次query之后要手动close;否则多次query,会自动关闭
    import pymysql
    
    
    class CdbConn():
        def __init__(self, db_host,  db_user, db_pwd, db_name, db_port=3306):
            self.db_host = db_host
            self.db_port = db_port
            self.db_user = db_user
            self.db_pwd = db_pwd
            self.db_name = db_name
            self.status = True
            self.conn = self.getConnection()
    
    
        def getConnection(self):
            try:
                conn =  pymysql.Connect(
                    host=self.db_host,  # 设置MYSQL地址
                    port=int(self.db_port),  # 设置端口号
                    user=self.db_user,  # 设置用户名
                    passwd=self.db_pwd,  # 设置密码
                    db=self.db_name,  # 数据库名
                    charset='utf8',  # 设置编码
                    use_unicode=True
                )
                return conn
            except Exception as e:
                self.status = False
                print('数据库连接异常: ', e)
    
        def query(self, sqlString):
            cursor = self.conn.cursor()
            cursor.execute(sqlString)
            returnData = cursor.fetchall()
            cursor.close()
            # self.conn.close()
            return returnData
    
        def close(self):
            self.conn.close()
    
        def update(self, sqlString):
            cursor = self.conn.cursor()
            cursor.execute(sqlString)
            self.conn.commit()
            cursor.close()
            # self.conn.close()

    v2改进版本: https://www.cnblogs.com/blueskyyj/p/10039972.html

    # !/usr/bin/env python
    # -*-coding:utf-8 -*-
    # 多次调用query之后,要手动调用close方法关闭db连接
    import pymysql
    
    
    class CdbConn():
        def __init__(self, db_host, db_user, db_pwd, db_name, db_port=3306):
            self.db_host = db_host
            self.db_port = db_port
            self.db_user = db_user
            self.db_pwd = db_pwd
            self.db_name = db_name
            self.status = True
            self.conn = self.getConnection()
    
        def getConnection(self):
            try:
                conn = pymysql.Connect(
                    host=self.db_host,  # 设置MYSQL地址
                    port=int(self.db_port),  # 设置端口号
                    user=self.db_user,  # 设置用户名
                    passwd=self.db_pwd,  # 设置密码
                    db=self.db_name,  # 数据库名
                    charset='utf8',  # 设置编码
                    use_unicode=True
                )
                return conn
            except Exception as e:
                self.status = False
                print('数据库连接异常:{}, detail: {} '.format(
                    e, (self.db_host, self.db_name)))
                return None
    
        def query(self, sqlString):
            cursor = self.conn.cursor()
            try:
                cursor.execute(sqlString)
                returnData = cursor.fetchall()
                return returnData
            except Exception as e:
                print("fetch failed, detail:{}".format(e))
            finally:
                if cursor:
                    cursor.close()  # 多个cursor不能同时存在? https://blog.csdn.net/emaste_r/article/details/75068902
                # self.conn.close() # 调用close方法关闭db连接
    
        def update(self, sqlString):
            cursor = self.conn.cursor()
            try:
                cursor.execute(sqlString)
                self.conn.commit()
            except Exception as e:
                self.conn.rollback()
                print("update failed, detail:{}".format(e))
            finally:
                if cursor:
                    cursor.close()
                # self.conn.close() # 需先判断数据库连接是否存在 if self.conn: ,再调用close方法关闭db连接
    
        def close(self):
            if self.conn:
                self.conn.close()
    
    
    if __name__ == "__main__":
        db = CdbConn(db_host='10.51.1.37', db_port=3306, db_name='information_schema', db_user='root', db_pwd='ysyhl9T!')
        db2 = CdbConn(db_host='10.51.1.37', db_port=3306, db_name='information_schema', db_user='root', db_pwd='ysyhl9T!')
        tt = db.query("show databases")
        ss = db.query("show databases")
        # dd = db2.query("show tables from mysql")
        print(tt)
        print(ss)
        # print(dd)
    # -*- coding: UTF-8 -*- 
    
    import MySQLdb
    import MySQLdb.cursors
    import logging
    from django.db import connection
    import traceback
    logger = logging.getLogger('default')
    import json
    
    
    class Dao(object):
        _CHART_DAYS = 90
    
        # 测试指定的mysql实例连接是否成功
        def check_mysql_connect(self, ipaddress, mysqlport, username, password):
            listDb = []
            conn = None
            cursor = None
            connectflag=2
    
            try:
                conn = MySQLdb.connect(host=ipaddress, port=mysqlport, user=username, passwd=password,
                                       charset='utf8',connect_timeout=10)
                cursor = conn.cursor()
                sql = "show databases"
                n = cursor.execute(sql)
                listDb = [row[0] for row in cursor.fetchall()
                          if row[0] not in ('information_schema', 'performance_schema', 'mysql', 'test')]
                connectflag=1
            except MySQLdb.Warning as w:
                traceback.print_exc()
                #raise Exception(w)
            except MySQLdb.Error as e:
                traceback.print_exc()
                #raise Exception(e)
            finally:
                if cursor is not None:
                    cursor.close()
                if conn is not None:
                    conn.commit()
                    conn.close()
            return connectflag
    
    
        # 检查复制同步状态
        def check_slave_status(self, ipaddress, mysqlport, username, password):
            listDb = []
            conn = None
            cursor = None
    
            master_host='NOHOST'
            master_port=0
            master_user=''
            master_log_file=''
            read_master_log_pos=0
            relay_master_log_file=''
            exec_master_log_pos=0
            slave_io_running='No'
            slave_sql_running='No'
            seconds_behind_master=-1
            count=0
            io_succesed_count=0
            sql_successed_count=0
            try:
                conn = MySQLdb.connect(host=ipaddress, port=mysqlport, user=username, passwd=password,
                                       charset='utf8', connect_timeout=10, cursorclass=MySQLdb.cursors.DictCursor)
                cursor = conn.cursor()
                sql = "show slave status"
                n = cursor.execute(sql)
                for row in cursor.fetchall():
                    count = count +1
                    master_host=row['Master_Host']
                    master_port=row['Master_Port']
                    master_user = row['Master_User']
                    master_log_file = row['Master_Log_File']
                    read_master_log_pos = row['Read_Master_Log_Pos']
                    relay_master_log_file = row['Relay_Master_Log_File']
                    exec_master_log_pos = row['Exec_Master_Log_Pos']
                    cur_slave_io_running = row['Slave_IO_Running']
                    cur_slave_sql_running = row['Slave_SQL_Running']
                    cur_seconds_behind_master = row['Seconds_Behind_Master']
                    if (cur_slave_io_running == 'Yes'):
                        io_succesed_count = io_succesed_count + 1
                    if (cur_slave_sql_running == 'Yes'):
                        sql_successed_count = sql_successed_count + 1
                    if (cur_seconds_behind_master == 'NULL'):
                        seconds_behind_master = -1
                    elif (cur_seconds_behind_master > seconds_behind_master):
                        seconds_behind_master = cur_seconds_behind_master
                if ( io_succesed_count == count ):
                    slave_io_running = 'Yes'
                if ( sql_successed_count == count ):
                    slave_sql_running = 'Yes'
                if (count == 0 ):
                    slave_io_running = 'No'
                    slave_sql_running = 'No'
            except MySQLdb.Warning as w:
                traceback.print_exc()
                #raise Exception(w)
            except MySQLdb.Error as e:
                traceback.print_exc()
                #raise Exception(e)
            finally:
                if cursor is not None:
                    cursor.close()
                if conn is not None:
                    conn.commit()
                    conn.close()
            return (master_host,master_port,master_user,master_log_file,read_master_log_pos,relay_master_log_file,exec_master_log_pos,slave_io_running,slave_sql_running,seconds_behind_master)
    
    
    
        # 检查库表数量
        def check_table_num(self, ipaddress, mysqlport, username, password):
            listDb = []
            conn = None
            cursor = None
    
            dbnum = 0
            tablenum = 0
            try:
                conn = MySQLdb.connect(host=ipaddress, port=mysqlport, user=username, passwd=password,
                                       charset='utf8', cursorclass=MySQLdb.cursors.DictCursor)
                conn.select_db('information_schema')
                cursor = conn.cursor()
                sql = "select count(*) dbnum from SCHEMATA where SCHEMA_NAME not in ('information_schema','mysql','performance_schema','sys')"
                effect_row = cursor.execute(sql)
                row = cursor.fetchone()
                dbnum = row['dbnum']
                sql = "select count(*) tablenum from TABLES where TABLE_SCHEMA not in ('information_schema','mysql','performance_schema','sys')"
                effect_row = cursor.execute(sql)
                row = cursor.fetchone()
                tablenum = row['tablenum']
            except MySQLdb.Warning as w:
                traceback.print_exc()
                # raise Exception(w)
            except MySQLdb.Error as e:
                traceback.print_exc()
                # raise Exception(e)
            finally:
                if cursor is not None:
                    cursor.close()
                if conn is not None:
                    conn.commit()
                    conn.close()
            return (dbnum,tablenum)
    
        # 连进指定的mysql实例里,读取所有databases并返回
        def getAlldbByCluster(self, masterHost, masterPort, masterUser, masterPassword):
            listDb = []
            conn = None
            cursor = None
    
            try:
                conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword,
                                       charset='utf8')
                cursor = conn.cursor()
                sql = "show databases"
                n = cursor.execute(sql)
                listDb = [row[0] for row in cursor.fetchall()
                          if row[0] not in ('information_schema', 'performance_schema', 'mysql', 'test')]
            except MySQLdb.Warning as w:
                raise Exception(w)
            except MySQLdb.Error as e:
                raise Exception(e)
            finally:
                if cursor is not None:
                    cursor.close()
                if conn is not None:
                    conn.commit()
                    conn.close()
            return listDb
    
        # 连进指定的mysql实例里,读取所有tables并返回
        def getAllTableByDb(self, masterHost, masterPort, masterUser, masterPassword, dbName):
            listTb = []
            conn = None
            cursor = None
    
            try:
                conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
                                       charset='utf8')
                cursor = conn.cursor()
                sql = "show tables"
                n = cursor.execute(sql)
                listTb = [row[0] for row in cursor.fetchall()
                          if row[0] not in (
                              'test')]
            except MySQLdb.Warning as w:
                raise Exception(w)
            except MySQLdb.Error as e:
                raise Exception(e)
            finally:
                if cursor is not None:
                    cursor.close()
                if conn is not None:
                    conn.commit()
                    conn.close()
            return listTb
    
        # 连进指定的mysql实例里,读取所有Columns并返回
        def getAllColumnsByTb(self, masterHost, masterPort, masterUser, masterPassword, dbName, tbName):
            listCol = []
            conn = None
            cursor = None
    
            try:
                conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
                                       charset='utf8')
                cursor = conn.cursor()
                sql = "SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s';" % (
                    dbName, tbName)
                n = cursor.execute(sql)
                listCol = [row[0] for row in cursor.fetchall()]
            except MySQLdb.Warning as w:
                raise Exception(w)
            except MySQLdb.Error as e:
                raise Exception(e)
            finally:
                if cursor is not None:
                    cursor.close()
                if conn is not None:
                    conn.commit()
                    conn.close()
            return listCol
    
        # 连进指定的mysql实例里,执行sql并返回
        def mysql_query(self, masterHost, masterPort, masterUser, masterPassword, dbName, sql, limit_num=0):
            result = {'column_list': [], 'rows': [], 'effect_row': 0}
            conn = None
            cursor = None
    
            try:
                conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
                                       charset='utf8')
                cursor = conn.cursor()
                effect_row = cursor.execute(sql)
                if int(limit_num) > 0:
                    rows = cursor.fetchmany(size=int(limit_num))
                else:
                    rows = cursor.fetchall()
                fields = cursor.description
    
                column_list = []
                if fields:
                    for i in fields:
                        column_list.append(i[0])
                result = {}
                result['column_list'] = column_list
                result['rows'] = rows
                result['effect_row'] = effect_row
    
            except MySQLdb.Warning as w:
                logger.warning(str(w))
                result['Warning'] = str(w)
            except MySQLdb.Error as e:
                logger.error(str(e))
                result['Error'] = str(e)
            finally:
                if cursor is not None:
                    cursor.close()
                if conn is not None:
                    try:
                        conn.rollback()
                        conn.close()
                    except:
                        conn.close()
            return result
    
        # 连进指定的mysql实例里,执行sql并返回
        def mysql_execute(self, masterHost, masterPort, masterUser, masterPassword, dbName, sql):
            result = {}
            conn = None
            cursor = None
    
            try:
                conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
                                       charset='utf8')
                cursor = conn.cursor()
                effect_row = cursor.execute(sql)
                # result = {}
                # result['effect_row'] = effect_row
                conn.commit()
            except MySQLdb.Warning as w:
                logger.warning(str(w))
                result['Warning'] = str(w)
            except MySQLdb.Error as e:
                logger.error(str(e))
                result['Error'] = str(e)
            finally:
                if result.get('Error') or result.get('Warning'):
                    conn.close()
                elif cursor is not None:
                    cursor.close()
                    conn.close()
            return result
    
        def getWorkChartsByMonth(self):
            cursor = connection.cursor()
            sql = "select date_format(create_time, '%%m-%%d'),count(*) from sql_workflow where create_time>=date_add(now(),interval -%s day) group by date_format(create_time, '%%m-%%d') order by 1 asc;" % (
                Dao._CHART_DAYS)
            cursor.execute(sql)
            result = cursor.fetchall()
            return result
    
        def getWorkChartsByPerson(self):
            cursor = connection.cursor()
            sql = "select engineer, count(*) as cnt from sql_workflow where create_time>=date_add(now(),interval -%s day) group by engineer order by cnt desc limit 50;" % (
                Dao._CHART_DAYS)
            cursor.execute(sql)
            result = cursor.fetchall()
            return result
    
    
        # 取出otter中的频道、管道、cannal相关信息
        def get_otter_pipeline_infos(self, ipaddress, mysqlport, username, password,dbname):
            otter_pipeline_info_dict_list = []
            conn = None
            cursor = None
    
            try:
                conn = MySQLdb.connect(host=ipaddress, port=mysqlport, user=username, passwd=password,db=dbname,
                                       charset='utf8', connect_timeout=10, cursorclass=MySQLdb.cursors.DictCursor)
                cursor = conn.cursor()
                sql = """SELECT c.ID channelid ,c.`NAME` channelname ,p.ID pipelineid ,p.`NAME` pipelinename ,p.PARAMETERS pipelineparams
    FROM CHANNEL c,PIPELINE p
    where c.id = p.CHANNEL_ID
    order by c.ID
                """
                n = cursor.execute(sql)
                for row in cursor.fetchall():
                    otter_pipeline_info_dict={}
                    otter_pipeline_info_dict["channelid"] = row['channelid']
                    otter_pipeline_info_dict["channelname"] = row['channelname']
                    otter_pipeline_info_dict["pipelineid"] = row['pipelineid']
                    otter_pipeline_info_dict["pipelinename"] = row['pipelinename']
                    pipelineparams=row['pipelineparams']
                    jsonmsg=json.loads(pipelineparams)
                    canalname=jsonmsg['destinationName']
                    otter_pipeline_info_dict["canalname"] = canalname
                    otter_pipeline_info_dict_list.append(otter_pipeline_info_dict)
            except MySQLdb.Warning as w:
                traceback.print_exc()
                #raise Exception(w)
            except MySQLdb.Error as e:
                traceback.print_exc()
                #raise Exception(e)
            finally:
                if cursor is not None:
                    cursor.close()
                if conn is not None:
                    conn.commit()
                    conn.close()
            return otter_pipeline_info_dict_list
    MySQLDB dao封装

      

    https://www.cnblogs.com/wardensky/p/4783010.html

    #coding=utf-8 
    #!/usr/bin/python
    
    import pymysql
    
    
    class MYSQL:
        """
        对pymysql的简单封装
        """
        def __init__(self,host,user,pwd,db):
            self.host = host
            self.user = user
            self.pwd = pwd
            self.db = db
    
        def __GetConnect(self):
            """
            得到连接信息
            返回: conn.cursor()
            """
            if not self.db:
                raise(NameError,"没有设置数据库信息")
            self.conn = pymysql.connect(host=self.host,user=self.user,password=self.pwd,database=self.db,charset="utf8")
            cur = self.conn.cursor()
            if not cur:
                raise(NameError,"连接数据库失败")
            else:
                return cur
    
        def ExecQuery(self,sql):
            """
            执行查询语句
            返回的是一个包含tuple的list,list的元素是记录行,tuple的元素是每行记录的字段
    
            调用示例:
                    ms = MYSQL(host="localhost",user="sa",pwd="123456",db="PythonWeiboStatistics")
                    resList = ms.ExecQuery("SELECT id,NickName FROM WeiBoUser")
                    for (id,NickName) in resList:
                        print str(id),NickName
            """
            cur = self.__GetConnect()
            cur.execute(sql)
            resList = cur.fetchall()
    
            #查询完毕后必须关闭连接
            self.conn.close()
            return resList
    
        def ExecNonQuery(self,sql):
            """
            执行非查询语句
    
            调用示例:
                cur = self.__GetConnect()
                cur.execute(sql)
                self.conn.commit()
                self.conn.close()
            """
            cur = self.__GetConnect()
            cur.execute(sql)
            self.conn.commit()
            self.conn.close()
    
    def main():
    
        mysql = MYSQL(host="192.168.163.36",user="wisdomhr",pwd="wisdomhr",db="WISDOMHR")
        resList = mysql.ExecQuery("SELECT CITY FROM RES_SCHOOL")
        for inst in resList:
            print(inst)
    if __name__ == '__main__':
        main()
    # 调用:
    #!/usr/bin/python
    
    #version 3.4
    
    import wispymysql
    
    mysql = wispymysql.MYSQL(host="192.168.163.36",user="wisdomhr",pwd="wisdomhr",db="WISDOMHR")
    selectsql = "SELECT ID, CITY FROM RES_SCHOOL WHERE CITY LIKE '%
    %'"
    result = mysql.ExecQuery(selectsql)
    
    for (dbid, city) in result:
        rightcity = city.replace('
    ','')
        updatesql= "UPDATE RES_SCHOOL SET CITY = '" + rightcity + "' WHERE ID = " + str(dbid)
        print(updatesql)
        mysql.ExecNonQuery(updatesql)

    http://justcode.ikeepstudying.com/2019/01/python%EF%BC%9Amysql%E8%BF%9E%E5%BA%93%E5%8F%8A%E7%AE%80%E5%8D%95%E5%B0%81%E8%A3%85%E4%BD%BF%E7%94%A8-python-mysql%E6%93%8D%E4%BD%9C%E7%B1%BB/

  • 相关阅读:
    【BUAA软工】Beta阶段设计与计划
    Visual Lab Online —— 事后分析
    Alpha阶段项目展示
    Visual Lab Online —— Alpha版本发布声明
    【BUAA软工】Alpha阶段测试报告
    Scrum Meeting #10 2020/04/19
    BUAA-OO-第一单元总结
    [软工顶级理解组] 0520第26次会议
    [软工顶级理解组] 0519第25次会议
    [软工顶级理解组] 0517第24次会议
  • 原文地址:https://www.cnblogs.com/yum777/p/11676806.html
Copyright © 2011-2022 走看看