zoukankan      html  css  js  c++  java
  • pymysql连接池操作MySQL

    pymysql连接池操作MySQL

    安装:

    pip install PyMySQL
    pip install DBUtils
    

    实例:

    import logging
    import pymysql
    from dbutils.pooled_db import PooledDB
    
    
    class MysqlPool:
    	def __init__(self):
    		self.POOL = PooledDB(
    			creator=pymysql,  # 使用链接数据库的模块
    			maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    			mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    			maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    			# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    			maxshared=3,
    			blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    			maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    			setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    			# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    			ping=0,
    			host='127.0.0.1',
    			port=3306,
    			user='root',
    			password='123456',
    			database='play_open',
    			charset='utf8'
    		)
    
    	def __new__(cls, *args, **kw):
    		"""
    		启用单例模式
    		:param args:
    		:param kw:
    		:return:
    		"""
    		if not hasattr(cls, '_instance'):
    			cls._instance = object.__new__(cls)
    		return cls._instance
    
    	def connect(self):
    		"""
    		启动连接
    		:return:
    		"""
    		try:
    			conn = self.POOL.connection()
    			cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    			return conn, cursor
    		except Exception as e:
    			logging.error(f'mysql connect error: {e}')
    
    	@staticmethod
    	def connect_close(conn, cursor):
    		"""
    		关闭连接
    		:param conn:
    		:param cursor:
    		:return:
    		"""
    		try:
    			cursor.close()
    			conn.close()
    		except Exception as e:
    			logging.error(f'mysql close error: {e}')
    
    	def fetch_all(self, sql, args=None):
    		"""
    		批量查询
    		:param sql:
    		:param args:
    		:return:
    		"""
    		try:
    			conn, cursor = self.connect()
    			cursor.execute(sql, args)
    			record_list = cursor.fetchall()
    			self.connect_close(conn, cursor)
    
    			return record_list
    		except Exception as e:
    			logging.error(f'fetchall error: {e}')
    
    	def fetch_one(self, sql, args=None):
    		"""
    		查询单条数据
    		:param sql:
    		:param args:
    		:return:
    		"""
    		try:
    			conn, cursor = self.connect()
    			cursor.execute(sql, args)
    			result = cursor.fetchone()
    			self.connect_close(conn, cursor)
    			return result
    		except Exception as e:
    			logging.error(f'fetchone error: {e}')
    
    	def insert(self, sql, args=None):
    		"""
    		插入数据
    		:param sql:
    		:param args:
    		:return:
    		"""
    		conn, cursor = self.connect()
    		try:
    			row = cursor.execute(sql, args)
    			conn.commit()
    		except Exception as e:
    			logging.error(f'insert error: {e}')
    			conn.rollback()
    			row = {}
    		self.connect_close(conn, cursor)
    		return row
    
    	# 增加多行
    	def insertmany(self, sql, param):
    		"""
    		:param sql:
    		:param param: 必须是元组或列表[(),()]或((),())
    		:return:
    		"""
    		conn, cursor = self.connect()
    		try:
    			row = cursor.executemany(sql, param)
    			conn.commit()
    		except Exception as e:
    			logging.error(f'insertmany error: {e}')
    			conn.rollback()
    			row = "更新失败"
    		self.connect_close(cursor, conn)
    		return row
    
    	# 删除
    	def delete(self, sql, param=None):
    		conn, cursor = self.connect()
    		try:
    			row = cursor.execute(sql, param)
    			# conn.commit()
    		except Exception as e:
    			logging.error(f'delete error: {e}')
    			conn.rollback()
    			row = "更新失败"
    		self.connect_close(cursor, conn)
    		return row
    
    	# 更新
    	def update(self, sql, param=None):
    		conn, cursor = self.connect()
    		try:
    			row = cursor.execute(sql, param)
    			conn.commit()
    		except Exception as e:
    			logging.error(f'update error: {e}')
    			conn.rollback()
    			row = "更新失败"
    		self.connect_close(cursor, conn)
    		return row
    
    
    if __name__ == '__main__':
    	# 实例化
    	mp = MysqlPool()
    	# """
    	# 查询单条
    	# """
    	data = mp.fetch_all("select * from domain_list where cloud='bs' and app_name!='offline' limit 10;")
    	data = mp.fetch_one("select * from domain_list where cloud='bs' and app_name!='offline' limit 10;")
    	print(data)
    
    	# """
    	# 批量读取
    	# """
    	# data = mp.fetch_all("select * from domain_list where id=%s", (1,))
    	# print(data)
    
    	# """
    	# 插入数据
    	# """
    	# data = mp.insert("insert into domain_list(domain,status,cloud,app_name)values(%s,%s,%s,%s)", ('www.testlbl.com',"online", 'test-tx', 'damao-test'))
    	# print(data)
    
    	# 增加多条
    	# sql3 = 'insert into domain_list (domain,status,cloud,app_name) VALUES (%s,%s,%s,%s)'
    	# li = [
    	#     ('www.testlbl1.com',"online", 'test-tx', 'damao-test'),
    	#     ('www.testlbl2.com',"online", 'test-tx', 'damao-test'),
    	#     ('www.testlbl3.com', "online", 'test-tx', 'damao-test')
    	# ]
    	# ret = mp.insertmany(sql3,li)
    	# print(ret)
    
    	# 删除(commit 注释了,如需删除取消注释)
    	# sql4 = 'delete from domain_list WHERE id > %s'
    	# args = 1487
    	# ret = mp.delete(sql4, args)
    	# print(ret)
    
    	# 更新
    	# sql5 = r'update domain_list set Code_Success_rate=%s WHERE domain=%s and app_name=%s'
    	# args = ('yes', 'www.testlbl.com', 'damao-test')
    	# ret = mp.update(sql5, args)
    	# print(ret)
    

    参考来源:

    https://blog.csdn.net/weixin_45389126/article/details/115859808
    
  • 相关阅读:
    Disruptor详细介绍之快速入门
    Java魔法类:sun.misc.Unsafe
    Java开源框架推荐
    CAS(Compare and Swap)无锁算法之volatile不能保证原子性而Atomic可以
    数据库乐观锁、悲观锁、共享锁、排它锁、行锁、表锁概念的理解
    Linux上查找线程使用的CPU时间最长
    虚拟机内存结构
    插入排序和归并排序(算法学习1)
    POJ-2942:吃糖果
    POJ-4004:数字组合(用位移方法解组合数问题,Java版)
  • 原文地址:https://www.cnblogs.com/baolin2200/p/15775373.html
Copyright © 2011-2022 走看看