python 操作数据库有两种方法。
1. pymysql
方法1. pymysql 模块
import pymysql
db = pymysql.connect(user = 'root',password='password',host = '127.0.0.1',database='tmpdb')
with db.cursor() as cursor:
cursor.execue(sql) # 具体的sql语句
db.commit() # 涉及到增删改的时候需要执行
db.close()
2. 调用 sqlalchemy 模块。 数据库的增删改查--> 简书权威指南sqlalchemy, sqlalchemy指南,sqlalchemy底层示意图
数据库表是个二维表,包含多行多列,每一行其实可以认为是个object, 以下代码就是传说中的ORM技术:Object-Relational Mapping,把关系数据库的表结构映射到对象上,首先通过easy_install或者pip安装SQLAlchemy:
class User(object):
def __init__(self, id, name):
self.id = id
self.name = name
[
User('1', 'Michael'),
User('2', 'Bob'),
User('3', 'Adam')
]
from sqlalchemy import create_engine, Column, String, Integer, DateTime, Float # sqlalchemy 中数据类型
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker # sessionmaker生成的是数据库会话类,这个类的实例session可用于操作数据库
from deal.conf import MYSQL
engine = create_engine("mysql+pymysql://{}:{}@{}/{}".format(MYSQL.user, MYSQL.pw, MYSQL.host, MYSQL.db), echo=False, pool_recycle=60*60*7)
# echo参数为True时,会显示每条执行的SQL语句,可以关闭,create_engine()返回一个Engine的实例,并且它表示通过数据库语法处理细节的核心接口,在这种情况下,数据库语法将会被解释称Python的类方法。
# 账号 密码 host database # pool_recycle 是闲置连接自动断开的时间 https://blog.csdn.net/u013673976/article/details/45939297
Session = sessionmaker(bind=engine)
Base = declarative_base() # 在Base 基类基础上创建 新的class
'''
在使用ORM技术时,
1. 构造进程首先描述数据的表,
2.定义我们用来映射那些表的类。
其中12一般一起执行,通过使用Declarative方法,我们可以创建一些包含描述要被映射的实际数据库表的准则的映射类。使用Declarative方法定义的映射类依据一个基类,这个基类是维系类和数据表关系的目录——我们所说的Declarative base class。在一个普通的模块入口中,应用通常只需要有一个base的实例。我们通过declarative_base()功能创建一个基类:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
'''
class RecordFile(Base):
__tablename__ = "sr_recordfile"
id = Column(Integer, primary_key=True)
person_type = Column(Integer)
personid = Column(String(20))
remotefileid = Column(String)
update_isblack_time = Column(DateTime)
dayoffile = Column(String)
process_state_ctime = Column(DateTime)
class RecordFileDia(Base):
__tablename__ = "sr_recordfile_dia"
id = Column(Integer, primary_key=True)
pass 同上个类
class AlarmScores(Base):
__tablename__ = "alarm_scores"
id = Column(Integer, primary_key=True)
insert_time = Column(DateTime)
combined_id = Column(String)
test_spk2utt = Column(String)
enroll_spk2utt = Column(String)
test_telnumber = Column(String)
enroll_telnumber = Column(String)
score = Column(Float)
dayofidx = Column(String)
前面是用sqlalchemy定义了连接,然后接下来是最直接的操作了。其中contextmanager使用方法---> blog
"""
Most query in this module is base on the assumption:
`filename` is unique.
Thus, filename is used as condition to fetch target record/records.
"""
from contextlib import contextmanager
from functools import wraps
from os import path
from pymysql import err
from sqlalchemy import exc
from .tasktype import TaskType
from .models import Session, RecordFile as RF, AlarmScores, RecordFileDia, RecordFileDia as RFD, MultiTel
from .errors import AudioRecordNotFound, NoMatchedInfoFoundInDia
def try_again(fn):
"""
For MySQL connection error or operational error, try again
From my experience, if being used properly, there is little
chance for raising exception.
"""
@wraps(fn) # https://blog.csdn.net/yuyexiaohan/article/details/82860807
def inner(*args, **kwargs):
try:
return fn(*args, **kwargs)
except (exc.OperationalError, exc.InternalError, err.InterfaceError, err.OperationalError,err.InternalError) as e:
print("We met a unexpected Error concerning mysql connection! Trying again...")
return fn(*args, **kwargs)
return inner
'''
# 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
user = session.query(User).filter(User.id=='5').one()
'''
@contextmanager
def session_scope():
s = Session()
try:
yield s # 返回 s
s.commit()
except:
s.rollback()
raise
finally:
s.close()
@try_again
def select_all(task_type: TaskType): # python3 有object:class 的 传参提醒功能。
"""
Currently only avaiable for fetching AM and CU
"""
with session_scope() as s:
query = s.query(RF.fileid, RF.personid, RF.filename).filter(RF.process_state==0).filter(RF.fileid != "")
if task_type == TaskType.am:
query = query.filter(RF.person_type==2).filter(RF.checkflag==1)
elif task_type == TaskType.cu:
query = query.filter(RF.person_type==4)
else:
raise TypeError("task type: %r is not supported here" % task_type)
d = {"{}-{}".format(q.personid, q.filename) : q.fileid for q in query}
return d
@try_again
def select_black_list(task_type: TaskType):
with session_scope() as s:
if task_type == TaskType.fq:
query = s.query(RFD.telnumber, RFD.filename, RFD.fileid).filter(RFD.process_state==0).filter(RFD.person_type==1).filter(RFD.isblack==1)
return ['{}-{}'.format(q.telnumber, q.fileid) for q in query]
else:
msg = "person_type of {} has not yet declared with blacklist".format(task_type)
raise ValueError(msg)
@try_again
def fileid2utt(fileid, task_type): # 根据fileid 查询 tel-*.wav, 用到了first
with session_scope() as s:
if task_type == TaskType.fq:
query = s.query(RFD.telnumber, RFD.filename).filter(RFD.fileid==fileid).first()
if query is not None:
return "{}-{}".format(query.telnumber, query.filename)
elif task_type == TaskType.am or task_type == TaskType.cu:
query = s.query(RF.personid, RF.filename).filter(RF.fileid==fileid).first()
if query is not None:
return '{}-{}'.format(query.personid, query.filename)
else:
raise TypeError("task type error")
if query is None:
msg = "fileid `{}` does not exist in db `sr_recordfile`".format(fileid)
raise ValueError(msg)
@try_again
def update_state(filename, state, dealfiletodir=None): # update
with session_scope() as s:
query = s.query(RF).filter(RF.filename==filename).first()
if query:
query.process_state = state
if dealfiletodir:
query.dealfiletodir = dealfiletodir
return True
else:
msg = "There is no such record whose filename is `{}`".format(filename)
raise AudioRecordNotFound(msg)
@try_again
def update_by_filename(filename, name, value): # 首先根据filename 查询到某条记录,然后 将对应的name 设定为对应的value
with session_scope() as s:
query = s.query(RF).filter(RF.filename==filename).first()
if query:
setattr(query, name, value) # http://www.runoob.com/python/python-func-setattr.html python 自带 函数 setattr()
return True
else:
msg = "There is no such record whose filename is `{}`".format(filename)
raise AudioRecordNotFound(msg)
@try_again
def find_unscored(task_type: TaskType): # 查询到未打分的数据,
"""
NOTICE: except for find unscored records, here we also mark
these records as being scored by setting `scoredflag` equal to -1.
Return of this fuction can be an empty list, thus the caller should
determine if return value is empty before further processing.
"""
with session_scope() as s:
if task_type == TaskType.fq:
query = s.query(RF).filter(RF.person_type==1).filter(RF.registerflag==1).filter(RF.scoredflag==0).filter(RF.process_state==0)
rv = []
for q in query:
q.scoredflag = -1
rv.append((q.telnumber, q.filename))
return rv
# return [(q.telnumber, q.filename) for q in query]
elif task_type == TaskType.am or task_type == TaskType.cu:
query = s.query(RF).filter(RF.person_type==task_type.person_type).filter(RF.registerflag==1).filter(RF.scoredflag==0).filter(RF.process_state==0)
rv = []
for q in query:
q.scoredflag = -1
rv.append((q.personid, q.filename))
return rv
# return [(q.personid, q.filename) for q in query]
else:
raise TypeError("Unsupported task type %r" % task_type)
@try_again
def find_fq_tasks():
with session_scope() as s:
query = s.query(RF.downloadtodir, RF.filename, RF.telnumber) .filter(RF.person_type==1).filter(RF.process_state==10).filter(RF.downloadtodir != "").filter(RF.downloaded==1)
return [(q.downloadtodir, q.filename, q.telnumber) for q in query]
@try_again
def update_pushed_flag(remote_fileid, value):
with session_scope() as s:
query = s.query(RF).filter(RF.remotefileid==remote_fileid).first()
if query:
query.pushedflag = value # 更新数据
else:
msg = "There is no such record whose remote_fileid is `{}`".format(remote_fileid)
raise AudioRecordNotFound(msg)
@try_again
def insert_into_alarm(items): # items 是个list, 每个item 是一行数据 dict形式
items = [AlarmScores(**i) for i in items]
with session_scope() as s:
s.bulk_save_objects(items) # https://www.jianshu.com/p/87ac06124fe1 bulk_save_objects 批量插入数据
required_field_str = """person_type, personid, num, callid, caller,
called, telnumber, recordtime, skilltime, piecesid,等众多属性"""
required_fields = [i.strip() for i in required_field_str.split(',')] #众多属性
# `Field` needs to be modified: fileid, fileid_parent, process_state
@try_again
def fetch_from_old_tab(filename):
with session_scope() as s:
q = s.query(RF).filter(RF.filename==filename).first()
if q:
rv = {}
for key in required_fields:
rv[key] = getattr(q, key)
return rv # 获取道数据的众多属性
else:
msg = "There is no such record whose filename is `{}`".format(filename)
raise AudioRecordNotFound(msg)
@try_again
def insert_into_recordfile_dia(rfd_list): # 将各种数据写入到dia 表中
items = [RecordFileDia(**i) for i in rfd_list]
# https://blog.csdn.net/dongyouyuan/article/details/79236673 批量插入
with session_scope() as s:
s.bulk_save_objects(items) # 同 insert_into_alarm list 中多个item 每个item 包含多个dict
@try_again
def fullfill_alarm_score_by_filename(filename):
with session_scope() as s:
query = s.query(RFD.person_type, RFD.fileid, RFD.telnumber).filter(RFD.filename==filename).first()
if query:
return dict(person_type=query.person_type,fileid=query.fileid,telnumber=query.telnumber)
else:
msg = 'In `sr_recordfile_dia`, no such record with filename as: `{}`'.format(filename)
raise NoMatchedInfoFoundInDia(msg)
@try_again
def fetch_info_by_fileid(fileid):
with session_scope() as s:
query = s.query(RFD.person_type, RFD.filename,RFD.telnumber, RFD.piecesid, RFD.caller) .filter(RFD.fileid==fileid).first()
if query:
return dict(person_type=query.person_type,filename=query.filename,telnumber=query.telnumber,piecesid=query.piecesid,caller=query.caller)
else:
msg = 'In `sr_recordfile_dia`, no such record with fileid as: `{}`'.format(fileid)
raise NoMatchedInfoFoundInDia(msg)
@try_again
def fetch_info_by_filename(filename):
with session_scope() as s:
query = s.query(RFD.telnumber, RFD.fileid).filter(RFD.filename==filename).first()
if query:
return dict(fileid=query.fileid,telnumber=query.telnumber)
else:
msg = 'In `sr_recordfile_dia`, no such record with filename as: `{}`'.format(filename)
raise NoMatchedInfoFoundInDia(msg)
@try_again
def fetch_fp_by_fileid(fileid):
"""
fetch `dealfiletodir` from `sr_recordfile_dia` by `fileid`
NOTE: also return telnumber
"""
with session_scope() as s:
query = s.query(RFD.dealfiletodir, RFD.filename, RFD.telnumber).filter(RFD.fileid==fileid).first()
if query:
return path.join(query.dealfiletodir, query.filename), query.telnumber
else:
raise ValueError("fileid: %s does not exist" % fileid)
def get_number_pairs():
"""Fetch all number pairs that belongs to a same person"""
with session_scope() as s:
query = s.query(MultiTel.numbera, MultiTel.numberb).all() # 直接获取全部,
if query:
return [(q.numbera, q.numberb) for q in query]
else:
return
########## For testing ###################
def get_by_filename(filename, name):
with session_scope() as s:
query = s.query(RF).filter(RF.filename==filename).first()
if query:
return getattr(query, name)
else:
raise AudioRecordNotFound