''' 封装MySQL的CURD ''' import pymysql import os import sys sys.path.append(os.path.dirname(os.path.abspath(__file__))) from common.base import ENV,DB class MysqlUtil: __conn = None __cur = None __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(MysqlUtil, cls).__new__(cls, *args) return cls.__instance # 初始化构造方法 def __init__(self): db = DB[ENV] if db is not None: self.__connect(db[ "host" ], db[ "port" ], db[ "user_name" ], db[ "password" ]) ''' 释放资源(系统GC自动调用) :return: ''' def __del__(self): try: self.__cur.close( ) self.__conn.close( ) except: print("释放MySQL资源出错!") ''' 根据连接参数,创建MySQL连接 :param host:数据库IP :param port:端口 :param user:用户名 :param password:密码 :param db:数据库 :param charset:字符集 :return: ''' def __connect(self, host, port, user, password, charset='utf8'): try: self.__conn = pymysql.connect(host=host, port=port, user=user, password=password, charset=charset) except pymysql.Error as e: print('MySQL连接出错!%d:%s' % (e.args[0], e.args[1])) self.__cur = self.__conn.cursor() ''' 执行 select 语句 :param sql:查询SQL :return: ''' def query(self, sql, args = None): try: result = self.__cur.execute(sql, args) except pymysql.Error as e: print("select出错!%d:%s" % (e.args[0], e.args[1])) result = False return result ''' 执行以字典Cursor返回方式的Select查询语句 :param sql:查询SQL :return: ''' def queryOutputDict(self, sql, args = None): self.__cur = self.__conn.cursor(pymysql.cursors.DictCursor) try: result = self.__cur.execute(sql, args) except pymysql.Error as e: print( "字典Cursor方式select出错!%d:%s" % (e.args[ 0 ], e.args[ 1 ]) ) result = False return result ''' 执行update或delete语句 :param sql: :return: ''' def update(self, sql, args = None): try: self.__cur.execute(sql, args) affectedRows = self.__conn._affected_rows self.__commit() except pymysql.Error as e: print( "update或delete出错!%d:%s" % (e.args[ 0 ], e.args[ 1 ]) ) affectedRows = False return affectedRows ''' 执行insert语句,若主键是自增ID,则返回新生成的ID :param sql: :return: ''' def insert(self, sql, args = None): try: self.__cur.execute(sql, args) insertId = self.__conn.insert_id() self.__commit() return insertId except pymysql.Error as e: print( "insert出错!%d:%s" % (e.args[0], e.args[1])) return False ''' 获取表的列名 :return: List ''' def getColumnNames(self): desc = self.__cur.description columnNames = [] for i in range(len(desc)): columnNames.append(desc[i][0]) return columnNames ''' 返回一行结果,然后游标指向下一行。到达最后一行以后,返回None :return: ''' def fetchOneRow( self ): return self.__cur.fetchone( ) ''' 返回所有结果 :return: ''' def fetchAllRows( self ): return self.__cur.fetchall( ) ''' 获取结果行数 :return: ''' def getRowCount( self ): return self.__cur.rowcount ''' 数据库commit操作 :return: ''' def __commit( self ): self.__conn.commit( ) ''' 数据库回滚操作 :return: ''' def __rollback( self ): self.__conn.rollback( ) ''' 关闭数据库连接 :return: ''' def __close( self ): self.__del__( ) if __name__=='__main__': '''使用范例''' mysql_util = MysqlUtil( ) # 创建MYSQL实例 results = mysql_util.query("select * from 数据库表") allRows = mysql_util.fetchAllRows() #取得所有结果 for i in range(results): print(str(allRows[i]) + ' ')