1.pymysql 多线程访问数据库
sql = 'insert into novel_book(provide_name, start_url, state) values(%s, %s, %s)' def pymysql_task(sql_pattern): cursor = conn.cursor() cursor.execute(sql_pattern, ("test", "test", 0)) conn.commit() print(cursor.lastrowid) return cursor.lastrowid from concurrent.futures import ThreadPoolExecutor exe = ThreadPoolExecutor(max_workers=10) tasks = [] for i in range(10): tasks.append(exe.submit(pymysql_task, (sql))) for task in tasks: print(task)
输出如下 可以看到只有第一个线程成功执行
<Future at 0x23ed4165240 state=finished returned int> <Future at 0x23ed5902c50 state=finished raised InternalError> <Future at 0x23ed5902e80 state=finished raised InternalError> <Future at 0x23ed59020f0 state=finished raised InternalError> <Future at 0x23ed5902828 state=finished raised OperationalError> <Future at 0x23ed59d9588 state=finished raised OperationalError> <Future at 0x23ed59d9550 state=finished raised OperationalError> <Future at 0x23ed59d9940 state=finished raised OperationalError> <Future at 0x23ed59d9b00 state=finished raised InternalError> <Future at 0x23ed59d9c88 state=finished raised OperationalError>
其中InternalError, OperationError的详细信息分别为
InternalError: Packet sequence number wrong - got 1 expected 2 OperationalError: (2013, 'Lost connection to MySQL server during query')
InternelError的问题在于多线程共享了一个connection,具体原因和解决方法可以查看
在使用conn时加锁
def pymysql_task(sql_pattern): cursor = conn.cursor() _lock.acquire() # get the lock cursor.execute(sql_pattern, ("test", "test", 0)) conn.commit() _lock.release() # release lock print(cursor.lastrowid) return cursor.lastrowid
输出 解决问题
<Future at 0x23ed59d9048 state=finished returned int> <Future at 0x23ed59d91d0 state=finished returned int> <Future at 0x23ed59e78d0 state=finished returned int> <Future at 0x23ed59e7a58 state=finished returned int> <Future at 0x23ed59e7be0 state=finished returned int> <Future at 0x23ed59e7d68 state=finished returned int> <Future at 0x23ed59e7ef0 state=finished returned int> <Future at 0x23ed59ef0b8 state=finished returned int> <Future at 0x23ed59ef240 state=finished returned int> <Future at 0x23ed59ef3c8 state=finished returned int>
2.sqlalchemy多线程访问数据库
from sqlalchemy import Column, String, create_engine,Integer from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base # 创建对象的基类: Base = declarative_base() # 定义User对象: class Book(Base): # 表的名字: __tablename__ = 'novel_book' # 表的结构: id = Column(Integer, primary_key=True, autoincrement=True) provide_name = Column(String(45)) start_url = Column(String(45)) state = Column(Integer) # 初始化数据库连接: engine = create_engine('mysql+pymysql://root:%s@localhost:3306/springboot' % sql_password,pool_size=10) # 有10个线程并发访问如果没有指定pool_size (默认是5)会有部分线程timeout 无法获取connection # 创建DBSession类型: DBSession = sessionmaker(bind=engine)
执行时,每一个session都相当于一个connection
def sqlalchemy_task(): session = DBSession() book = Book(provide_name="sqlalchemy", start_url="test", state=0) session.add(book) session.commit() return book.id exe_2 = ThreadPoolExecutor(max_workers=10) tasks = [] for i in range(10): tasks.append(exe_2.submit(sqlalchemy_task)) for task in tasks: print(task.done())
sqlalchemy 在连接池建立后无论是加锁访问还是每一个线程新获取一个connection,访问速度都要快上很多。
这里的时间测试存在较大问题,如果线程之间cursor也是共享的话,最后的耗时也只有
0.00299072265625
https://blog.csdn.net/zslngu/article/details/95937871