封装pymysql, 代码展示:
# encoding:utf-8 import pymysql.cursors class MysqlOperation(object): def __init__(self, config): self.connection = pymysql.connect(host=config['mysql_host'], port=config['mysql_port'], user=config['mysql_user'], # pymysql直接连接是passwd,用连接池连接是password passwd=config['mysql_passwd'], db=config['mysql_db'], charset='utf8', cursorclass=pymysql.cursors.DictCursor ) def read_sql(self, sql): with self.connection.cursor() as cursor: try: cursor.execute(sql) result = cursor.fetchall() return result except Exception as e: self.connection.rollback() # 回滚 print('事务失败', e) def insert_sql(self, sql): with self.connection.cursor() as cursor: try: cursor.execute(sql) self.connection.commit() except Exception as e: self.connection.rollback() print('事务失败', e) def update_sql(self, sql): # sql_update ="update user set username = '%s' where id = %d" with self.connection.cursor() as cursor: try: cursor.execute(sql) # 像sql语句传递参数 # 提交 self.connection.commit() except Exception as e: # 错误回滚 self.connection.rollback() def delect_sql(self, sql_delete): with self.connection.cursor() as cursor: try: cursor.execute(sql_delete) # 像sql语句传递参数 # 提交 self.connection.commit() except Exception as e: # 错误回滚 self.connection.rollback() def read_one(self, sql): with self.connection.cursor() as cursor: try: cursor.execute(sql) result = cursor.fetchone() return result except Exception as e: self.connection.rollback() # 回滚 print('事务失败', e) def reConnect(self): try: self.connection.ping() except: self.connection() def callpro_sql(self, proc_name, *args): with self.connection.cursor(self.connection.cursorclass) as cursor: try: cursor.callproc(proc_name, args) # 传递存储过程名和参数 self.connection.commit() # 提交 print('调用存储过程结束') except Exception as e: self.connection.rollback() # 回滚 print('事务失败', e)