先安装pymysql import pymysql # 连接到数据库 connection = pymysql.connect(host='localhost',port=3306, user='root', password='123', db='slb', charset='utf8', ) try: # with connection.cursor() as cursor: # # 插入新的数据 # sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)" # cursor.execute(sql, ('webmaster@python.org', 'very-secret')) # # # 默认情况下,连接不是自动提交。所以你必须自行保存好 # connection.commit() with connection.cursor() as cursor: # 读取单个记录 sql = "SELECT * FROM slb.slb_machine_maintain" cursor.execute(sql) result = cursor.fetchone() print(result) finally: connection.close()
from DBUtils.PooledDB import PooledDB pool = PooledDB(pymysql,30, host="127.0.0.1", user='root',passwd='root1234', db='slb', port=3306, charset="utf8") def exec_sql(method, sql): if len(method) == 0 or len(sql) == 0: return {'status': -1, 'msg': 'sql or method 不能为空'} conn = pool.connection() try: with conn.cursor() as cursor: if method == 'select': cursor.execute(sql) data = cursor.fetchall() #data =('ding', 28), ('shaf', 25), ('akif', 32) select返回查询结果的元组 if len(data) == 0: res = {'status': 0, 'msg': '查询结果为空'} return res res = {'status': 1, 'msg': data} return res elif method == 'insert' or 'update' or 'delete': data = cursor.execute(sql) print('返回的data:', data) conn.commit() if method == 'insert': # data结果为int, insert操作返回影响的行数 print('data = %s'%data) if method == 'update': print('data = %s'%data) #data结果为int, update操作返回影响的行数 if method == 'delete': print(type(data)) #<class 'int'> print('data = %s'%data) #data结果为int, update操作返回影响的行数 res = {'status': 1, 'msg': data} return res else: res = {'status': -1, 'msg': '无此操作'} return res conn.close() except Exception as e: conn.rollback() # 发生错误时回滚 res = {'status': -1, 'msg': '执行sql:%s 发生异常!!:%s' % (sql, e)} return res print(exec_sql('delete', 'delete from dyf_test_table1 where age = 100'))