新版,只创建一次线程池
# -*- coding: utf-8 -*- from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from taskcenter.config import config def create_pool(): dbconfig = config.GetConfig().get('database') dbusr = dbconfig["dbusr"] dbpasswd = dbconfig["dbpasswd"] dburl = dbconfig["dbhost"] + ":" + dbconfig["dbport"] dbname = dbconfig["dbname"]
#pymysql驱动在mysql5.7时会报warning mysql 1366,使用mysqlconnector代替pymysql ,mysql+pymysql://{}:{}@{}/{} engine = create_engine("mysql+mysqlconnector://{}:{}@{}/{}".format(dbusr, dbpasswd, dburl, dbname), max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) print("创建了一次pool") return engine
# metaclass实现单例模式
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class GetEngine(metaclass=Singleton):
def __init__(self):
self.engine = create_pool()
# 全局变量模式实现单例,但是不支持编译,注释掉
# engine = create_pool()
def create_session_factory():
DBsession = sessionmaker(bind=GetEngine().engine)
return DBsession
def get_session():
DBsessionfactory = create_session_factory()
return DBsessionfactory()
# 执行sql,返回第i+1列的字段数组,paramdict为补全sql里的占位符
def executeSqlFieldList(sqlstr, paramdict, i):
resultList = []
dbsession = get_session()
try:
dataQuery = dbsession.execute(sqlstr, paramdict).fetchall()
for data in dataQuery:
tempdata = list(data)[i]
if tempdata is None:
continue
resultList.append(tempdata)
return resultList
finally:
dbsession.close()
代码如下:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker BIZ_DB = {"dbusr":"test","dbpasswd":"123456","dburl":"127.0.0.1:3306","dbname":"test"} OFFLINE_DB = {"dbusr":"test","dbpasswd":"123456","dburl":"127.0.0.1:3306","dbname":"test"} def create_session(dbusr,dbpasswd,dburl,dbname): engine = create_engine("mysql+pymysql://{}:{}@{}/{}".format(dbusr,dbpasswd,dburl,dbname)) DBsession = sessionmaker(bind=engine) session = DBsession() return session def create_session_biz(): dbusr = BIZ_DB["dbusr"] dbpasswd = BIZ_DB["dbpasswd"] dburl = BIZ_DB["dburl"] dbname = BIZ_DB["dbname"] return create_session(dbusr,dbpasswd,dburl,dbname) def create_session_offline(): dbusr = OFFLINE_DB["dbusr"] dbpasswd = OFFLINE_DB["dbpasswd"] dburl = OFFLINE_DB["dburl"] dbname = OFFLINE_DB["dbname"] return create_session(dbusr, dbpasswd, dburl, dbname)