pymsql
pymysql
这款第三方库可以帮助我们利用python
语言与mysql
进行链接
基本使用
首先要下载pymysql
pip install pymsql
以下是pymysql
的基本使用
import pymysql
# 链接,C/S架构,TCP链接
conn = pymysql.connect(
host="localhost",
database="db1",
charset="utf8mb4",
user="root",
cursorclass=pymysql.cursors.DictCursor, # 记录结果,字典显示
# password = "your password",
)
# 游标
cursor = conn.cursor()
# 执行sql
sql = "show tables"
res = cursor.execute(sql) # 提交执行,返回sql影响成功的行数
print(res) # 2 代表该数据库下有2个表
print(cursor.fetchall()) # [{'Tables_in_db1': 't1'}, {'Tables_in_db1': 't2'}]
cursor.close() # 关闭游标
conn.close()
游标概念
可以看到在上面的示例中有一个游标的概念,其实这个也非常简单,就等同于光标的上下移动,每移动一次代表一条记录。
在pymsql
中,对于select
等操作返回的结果都可以通过游标的移动配合相应方法函数来进行读取。
sql注入
如果你的某些sql
语句要进行字符串拼接,那么一定要使用pymysql
提供的execute()
方法进行拼接,不要去用python
中的%
或format()
方法,这可能导致出现sql
注入问题带来不安全的隐患。
注意:使用
execute()
时,不可传入表名,数据库名。否则会抛出语法错误,这是因为在拼接时会自动添加上``号
import pymysql
# 链接
conn = pymysql.connect(
host="localhost",
database="db1",
charset="utf8mb4",
user="root",
cursorclass=pymysql.cursors.DictCursor, # 记录结果,字典显示
# password = "your password",
)
# 游标
cursor=conn.cursor()
# 执行sql
sql = "select * from t1 where id=%s"
res = cursor.execute(sql,("1",)) # 提交执行,返回sql影响成功的行数 这里拼接能预防sql注入问题
print(res) # 1 查出一条记录
print(cursor.fetchall()) # 拿到所有记录的结果
cursor.close() # 关闭游标
conn.close()
事务提交
在执行UPDATE/INSERT/DELETE
之类的操作,必须使用conn.commit()
进行事务提交后方可生效。
或者你可以在实例化conn
对象时为他指定auto_commit
参数为true
即可自动提交事务。
import pymysql
# 链接
conn = pymysql.connect(
host="localhost",
database="db1",
charset="utf8mb4",
user="root",
cursorclass=pymysql.cursors.DictCursor, # 记录结果,字典显示
autocommit = True, # 自动提交
# password = "your password",
)
# 游标
cursor=conn.cursor()
# 执行sql
sql = "insert into t1(name) values(%s)"
res = cursor.execute(sql,("新记录",)) # 提交执行,返回sql影响成功的行数 这里拼接能预防sql注入问题
print(res) # 1 成功插入一条记录
print(cursor.lastrowid) #在插入语句后查看,查看最后一条记录的行号
print(cursor.fetchall())
# conn.commit() # 手动提交
cursor.close() # 关闭游标
conn.close()
提交多条
使用cursor.executemany()
方法可一次性提交多条sql
操作。
import pymysql
# 链接
conn = pymysql.connect(
host="localhost",
database="db1",
charset="utf8mb4",
user="root",
cursorclass=pymysql.cursors.DictCursor, # 记录结果,字典显示
autocommit = True, # 自动提交
# password = "your password",
)
# 游标
cursor=conn.cursor()
# 执行sql
sql = "insert into t1(name) values(%s)" # 同一条命令,执行3次
res = cursor.executemany(sql,[("新记录1"),("新纪录2"),("新纪录3")]) # 提交执行,返回sql影响成功的行数 这里拼接能预防sql注入问题
print(res) # 3 成功插入三条记录
print(cursor.lastrowid) #在插入语句后查看,查看最后一条记录的行号
print(cursor.fetchall())
cursor.close() # 关闭游标
conn.close()
游标相关
获取到一条记录后,我们可以控制游标移动。
也可以控制查看游标后的多少条记录
游标每移动一次代表一条记录
命令解析 | 描述 |
---|---|
cursor.scroll(3,mode='absolute') | 游标以绝对位置向后移动3条记录 |
cursor.scroll(3,mode='relative') | 游标以当前位置向后移动3条记录 |
注意:游标移动的条数即为记录的条数,如果移动值为负N就代表上N条记录 |
如果我们想获取记录,可使用以下三个方法
命令解析 | 描述 |
---|---|
cursor.fetchone() | 获取第一条记录,游标向下移动一行 |
cursor.fetchmany(2) | 获取接下来的两条记录,游标向下移动两行 |
cursor.fetchall() | 获取全部记录,游标移动到末尾,返回的是一个列表 |
import pymysql
# 链接
conn = pymysql.connect(
host="localhost",
database="db1",
charset="utf8mb4",
user="root",
cursorclass=pymysql.cursors.DictCursor, # 记录结果,字典显示
autocommit = True, # 自动提交
# password = "your password",
)
# 游标
cursor=conn.cursor()
# 执行sql
sql = "select * from t1" # t1表中4条记录
cursor.execute(sql)
print(cursor.fetchone()) 游标移动到2的位置
cursor.scroll(2,mode='relative') 向下移动2,当前游标为4
print(cursor.fetchone())
cursor.close() # 关闭游标
conn.close()
"""
{'id': 1, 'name': '记录1'}
{'id': 4, 'name': '记录4'}
"""
插入行号
如果执行的是INSERT
操作,可以在插入后查看最后插入的ID
行号
import pymysql
# 链接
conn = pymysql.connect(
host="localhost",
database="db1",
charset="utf8mb4",
user="root",
cursorclass=pymysql.cursors.DictCursor, # 记录结果,字典显示
autocommit = True, # 自动提交
# password = "your password",
)
# 游标
cursor=conn.cursor()
# 执行sql
sql = "insert into t1(name) values(%s)" # 同一条命令,执行3次
res = cursor.executemany(sql,[("新记录1"),("新纪录2"),("新纪录3")]) # 提交执行,返回sql影响成功的行数 这里拼接能预防sql注入问题
print(res) # 3 成功插入三条记录
print(cursor.lastrowid) #在插入语句后查看,查看最后一条记录的行号
print(cursor.fetchall())
# conn.commit() # 手动提交
cursor.close() # 关闭游标
conn.close()
存储过程
MySQL
中的存储过程也可在pymysql
中进行调用。
-- MySQL中创建了一个存储过程
delimiter $ -- delimiter是指自定义结束符,mysql中以;号结束,使用自定义结束符后则以自定义结束符为准
CREATE PROCEDURE p2(in n1 int, out res int) -- 创建存储过程 参数1,传入参数,int类型,参数2,返回值,int类型
BEGIN
-- 书写程序逻辑
select id from t1 where id = n1;
set res = 1; -- 设置返回值
-- 逻辑完毕
END $ -- 存储过程创建完毕
delimiter ;
-- MySQL调用
set @res = 0; -- 先将接受返回值的变量进行ding'yi
call p2(1,@res); -- 参数1,传入值,参数2,返回值
select @res; -- 查看返回值
# Python中调用存储过程
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306,
user='root', passwd='123', db='t1')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# 执行存储过程
cursor.callproc('p2', args=(1, 0)) # 第一个参数为传入参数,第二个参数为返回值。相当于set @res = 0;
# 获取执行完存储的参数
cursor.execute("select @_p2_0,@_p2_1") # @_p2_0 传入参数,@_p2_1 返回值
result = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
print(result)
DBUtils
使用DBUtils
的链接池来与MySQL
服务端建立链接,避免了频繁的socket
链接请求,这常用于并发性的访问。
下载dbutils
模块:
pip install dbutils
基本使用:
import pymysql
from dbutils.pooled_db import PooledDB
class MySqlPool(object):
config = {
"creator": pymysql, # 使用链接数据库的模块
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "",
"db": "db1",
"charset": "utf8mb4",
"autocommit": True,
"mincached": 10, # 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
"maxconnections": 70, # 连接池最大连接数量
"maxcached": 10, # 连接池中允许的闲置的最多连接数量
"maxshared": 10, # 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
"blocking": True, # 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
"maxusage": 0, # 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
"setsession": [], # 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...],常用于做初始化命令
"cursorclass": pymysql.cursors.DictCursor # 返回类型,dict
}
pool = PooledDB(**config) # 单例模式创建出池子
def get_db(self):
self.conn = MySqlPool.pool.connection()
self.cursor = self.conn.cursor()
return self
def close(self):
self.cursor.close()
self.conn.close()
def __enter__(self):
return self.get_db()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# 使用方式一
with MySqlPool() as db:
db.cursor.execute("select * from app01_user;")
print(db.cursor.fetchall())
# 使用方式二
db = MySqlPool().get_db()
db.cursor.execute("insert into app01_user (user_name,user_password,user_type) values ('baby',123,3);")
print(db.cursor.fetchall())
db.close()