zoukankan      html  css  js  c++  java
  • python mymsql sqlalchemy

    python 操作数据库有两种方法。

    1. pymysql

    方法1. pymysql 模块
    	import pymysql
    	db = pymysql.connect(user = 'root',password='password',host = '127.0.0.1',database='tmpdb')
    	with db.cursor() as cursor:
    		cursor.execue(sql)  # 具体的sql语句
    		db.commit() # 涉及到增删改的时候需要执行
    	db.close()
    

    2. 调用 sqlalchemy 模块。 数据库的增删改查--> 简书权威指南sqlalchemysqlalchemy指南sqlalchemy底层示意图

    数据库表是个二维表,包含多行多列,每一行其实可以认为是个object, 以下代码就是传说中的ORM技术:Object-Relational Mapping,把关系数据库的表结构映射到对象上,首先通过easy_install或者pip安装SQLAlchemy:

    class User(object):
        def __init__(self, id, name):
            self.id = id
            self.name = name
    
    [
        User('1', 'Michael'),
        User('2', 'Bob'),
        User('3', 'Adam')
    ]
    from sqlalchemy import create_engine, Column, String, Integer, DateTime, Float #  sqlalchemy 中数据类型
    from sqlalchemy.ext.declarative import declarative_base 
    from sqlalchemy.orm import sessionmaker # sessionmaker生成的是数据库会话类,这个类的实例session可用于操作数据库 
    
    from deal.conf import MYSQL
    
    
    engine = create_engine("mysql+pymysql://{}:{}@{}/{}".format(MYSQL.user, MYSQL.pw, MYSQL.host, MYSQL.db), echo=False, pool_recycle=60*60*7)
    #    echo参数为True时,会显示每条执行的SQL语句,可以关闭,create_engine()返回一个Engine的实例,并且它表示通过数据库语法处理细节的核心接口,在这种情况下,数据库语法将会被解释称Python的类方法。
    #    账号 密码 host  database # pool_recycle 是闲置连接自动断开的时间  https://blog.csdn.net/u013673976/article/details/45939297
    
    Session = sessionmaker(bind=engine)
    
    Base = declarative_base() # 在Base  基类基础上创建 新的class
    
    '''
    在使用ORM技术时,
    	1. 构造进程首先描述数据的表,
    	2.定义我们用来映射那些表的类。
    
    	其中12一般一起执行,通过使用Declarative方法,我们可以创建一些包含描述要被映射的实际数据库表的准则的映射类。使用Declarative方法定义的映射类依据一个基类,这个基类是维系类和数据表关系的目录——我们所说的Declarative base class。在一个普通的模块入口中,应用通常只需要有一个base的实例。我们通过declarative_base()功能创建一个基类:
    	from sqlalchemy.ext.declarative import declarative_base
    	Base = declarative_base()
    '''
    
    class RecordFile(Base):
    
        __tablename__ = "sr_recordfile"
        
        id = Column(Integer, primary_key=True)
        person_type = Column(Integer)
        personid = Column(String(20))
        remotefileid = Column(String)
        update_isblack_time = Column(DateTime)
        dayoffile = Column(String)
        process_state_ctime = Column(DateTime)
    
    class RecordFileDia(Base):
    
        __tablename__ = "sr_recordfile_dia"
        
        id = Column(Integer, primary_key=True)
        pass 同上个类
    
    
    class AlarmScores(Base):
    
        __tablename__ = "alarm_scores"
    
        id = Column(Integer, primary_key=True)
        insert_time = Column(DateTime)
        combined_id = Column(String)
    
        test_spk2utt = Column(String)
        enroll_spk2utt = Column(String)
        test_telnumber = Column(String)
        enroll_telnumber = Column(String)
        score = Column(Float)
        dayofidx = Column(String)
    
        

    前面是用sqlalchemy定义了连接,然后接下来是最直接的操作了。其中contextmanager使用方法---> blog

    """
    Most query in this module is base on the assumption:
        `filename` is unique.
    Thus, filename is used as condition to fetch target record/records.
    """
    
    from contextlib import contextmanager
    from functools import wraps
    from os import path
    
    from pymysql import err
    from sqlalchemy import exc
    
    from .tasktype import TaskType
    from .models import Session, RecordFile as RF, AlarmScores, RecordFileDia, RecordFileDia as RFD, MultiTel
    from .errors import AudioRecordNotFound, NoMatchedInfoFoundInDia
    
    def try_again(fn):
        """
        For MySQL connection error or operational error, try again
        From my experience, if being used properly, there is little 
        chance for raising exception.
        """
        @wraps(fn)   # https://blog.csdn.net/yuyexiaohan/article/details/82860807
        def inner(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            except (exc.OperationalError, exc.InternalError, err.InterfaceError, err.OperationalError,err.InternalError) as e:
                print("We met a unexpected Error concerning mysql connection! Trying again...")
                return fn(*args, **kwargs)
        return inner
    
    '''
    # 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
    user = session.query(User).filter(User.id=='5').one()
    
    '''
    
    @contextmanager
    def session_scope():
        s = Session()
        try:
            yield s #  返回 s
            s.commit()
        except:
            s.rollback()
            raise
        finally:
            s.close()
    
    
    @try_again
    def select_all(task_type: TaskType):  #  python3  有object:class 的 传参提醒功能。
        """
        Currently only avaiable for fetching AM and CU
        """
        with session_scope() as s:
            query = s.query(RF.fileid, RF.personid, RF.filename).filter(RF.process_state==0).filter(RF.fileid != "")
            if task_type == TaskType.am:
                query = query.filter(RF.person_type==2).filter(RF.checkflag==1)
            elif task_type == TaskType.cu:
                query = query.filter(RF.person_type==4)
            else:
                raise TypeError("task type: %r is not supported here" % task_type)
            
            d = {"{}-{}".format(q.personid, q.filename) : q.fileid for q in query}
            return d
    
    
    @try_again
    def select_black_list(task_type: TaskType):
        with session_scope() as s:
            if task_type == TaskType.fq:
                query = s.query(RFD.telnumber, RFD.filename, RFD.fileid).filter(RFD.process_state==0).filter(RFD.person_type==1).filter(RFD.isblack==1)
                return ['{}-{}'.format(q.telnumber, q.fileid) for q in query]
            else:
                msg = "person_type of {} has not yet declared with blacklist".format(task_type)
                raise ValueError(msg)
    
    
    
    @try_again
    def fileid2utt(fileid, task_type):  # 根据fileid 查询 tel-*.wav, 用到了first 
        with session_scope() as s:
            if task_type == TaskType.fq:
                query = s.query(RFD.telnumber, RFD.filename).filter(RFD.fileid==fileid).first()
                if query is not None:
                    return "{}-{}".format(query.telnumber, query.filename)
            elif task_type == TaskType.am or task_type == TaskType.cu:
                query = s.query(RF.personid, RF.filename).filter(RF.fileid==fileid).first()
                if query is not None:
                    return '{}-{}'.format(query.personid, query.filename)
            else:
                raise TypeError("task type error")
            
            if query is None:
                msg = "fileid `{}` does not exist in db `sr_recordfile`".format(fileid)
                raise ValueError(msg)
    
    
    @try_again  
    def update_state(filename, state, dealfiletodir=None): # update 
        with session_scope() as s:
            query = s.query(RF).filter(RF.filename==filename).first()
            if query:
                query.process_state = state
                if dealfiletodir:
                    query.dealfiletodir = dealfiletodir
                return True
            else:
                msg = "There is no such record whose filename is `{}`".format(filename)
                raise AudioRecordNotFound(msg)
    
    
    @try_again
    def update_by_filename(filename, name, value): #  首先根据filename 查询到某条记录,然后 将对应的name 设定为对应的value
        with session_scope() as s:
            query = s.query(RF).filter(RF.filename==filename).first()
            if query:
                setattr(query, name, value) # http://www.runoob.com/python/python-func-setattr.html  python 自带 函数 setattr()
                return True
            else:
                msg = "There is no such record whose filename is `{}`".format(filename)
                raise AudioRecordNotFound(msg)
                
    
    @try_again
    def find_unscored(task_type: TaskType):  # 查询到未打分的数据,
        """
        NOTICE: except for find unscored records, here we also mark 
        these records as being scored by setting `scoredflag` equal to -1.
    
        Return of this fuction can be an empty list, thus the caller should
        determine if return value is empty before further processing.
        """
        with session_scope() as s:
            if task_type == TaskType.fq:
                query = s.query(RF).filter(RF.person_type==1).filter(RF.registerflag==1).filter(RF.scoredflag==0).filter(RF.process_state==0)
                rv = []
                for q in query:
                    q.scoredflag = -1
                    rv.append((q.telnumber, q.filename))
                return rv
                # return [(q.telnumber, q.filename) for q in query]
            elif task_type == TaskType.am or task_type == TaskType.cu:
                query = s.query(RF).filter(RF.person_type==task_type.person_type).filter(RF.registerflag==1).filter(RF.scoredflag==0).filter(RF.process_state==0)
                rv = []
                for q in query:
                    q.scoredflag = -1
                    rv.append((q.personid, q.filename))
                return rv
                # return [(q.personid, q.filename) for q in query]
            else:
                raise TypeError("Unsupported task type %r" % task_type)
    
    
    @try_again
    def find_fq_tasks():
        with session_scope() as s:
            query = s.query(RF.downloadtodir, RF.filename, RF.telnumber) .filter(RF.person_type==1).filter(RF.process_state==10).filter(RF.downloadtodir != "").filter(RF.downloaded==1)
            return [(q.downloadtodir, q.filename, q.telnumber) for q in query]
    
    
    @try_again
    def update_pushed_flag(remote_fileid, value):
        with session_scope() as s:
            query = s.query(RF).filter(RF.remotefileid==remote_fileid).first()
            if query:
                query.pushedflag = value  # 更新数据
            else:
                msg = "There is no such record whose remote_fileid is `{}`".format(remote_fileid)
                raise AudioRecordNotFound(msg)
    
    
    @try_again
    def insert_into_alarm(items):  # items 是个list, 每个item 是一行数据 dict形式
        items = [AlarmScores(**i) for i in items]
        with session_scope() as s:
            s.bulk_save_objects(items)   # https://www.jianshu.com/p/87ac06124fe1    bulk_save_objects 批量插入数据
    
    
    required_field_str = """person_type, personid, num, callid, caller,
    called, telnumber, recordtime, skilltime, piecesid,等众多属性"""
    required_fields = [i.strip() for i in required_field_str.split(',')]  #众多属性
    
    # `Field` needs to be modified: fileid, fileid_parent, process_state 
        
    @try_again
    def fetch_from_old_tab(filename):
        with session_scope() as s:
            q = s.query(RF).filter(RF.filename==filename).first()
            if q:
                rv = {}
                for key in required_fields:
                    rv[key] = getattr(q, key)
                return rv  # 获取道数据的众多属性
            else:
                msg = "There is no such record whose filename is `{}`".format(filename)
                raise AudioRecordNotFound(msg)
    
    @try_again
    def insert_into_recordfile_dia(rfd_list): # 将各种数据写入到dia 表中
        items = [RecordFileDia(**i) for i in rfd_list]
        # https://blog.csdn.net/dongyouyuan/article/details/79236673 批量插入
        with session_scope() as s:
            s.bulk_save_objects(items)  # 同 insert_into_alarm  list  中多个item  每个item 包含多个dict
    
    
    @try_again
    def fullfill_alarm_score_by_filename(filename):
        with session_scope() as s:
            query = s.query(RFD.person_type, RFD.fileid, RFD.telnumber).filter(RFD.filename==filename).first()
            if query:
                return dict(person_type=query.person_type,fileid=query.fileid,telnumber=query.telnumber)
            else:
                msg = 'In `sr_recordfile_dia`, no such record with filename as: `{}`'.format(filename)
                raise NoMatchedInfoFoundInDia(msg)
    
    
    @try_again
    def fetch_info_by_fileid(fileid):
        with session_scope() as s:
            query = s.query(RFD.person_type, RFD.filename,RFD.telnumber, RFD.piecesid, RFD.caller) .filter(RFD.fileid==fileid).first()
            if query:
                return dict(person_type=query.person_type,filename=query.filename,telnumber=query.telnumber,piecesid=query.piecesid,caller=query.caller)
            else:
                msg = 'In `sr_recordfile_dia`, no such record with fileid as: `{}`'.format(fileid)
                raise NoMatchedInfoFoundInDia(msg)
    
    
    @try_again
    def fetch_info_by_filename(filename):
        with session_scope() as s:
            query = s.query(RFD.telnumber, RFD.fileid).filter(RFD.filename==filename).first()
            if query:
                return dict(fileid=query.fileid,telnumber=query.telnumber)
            else:
                msg = 'In `sr_recordfile_dia`, no such record with filename as: `{}`'.format(filename)
                raise NoMatchedInfoFoundInDia(msg)
    
    
    @try_again
    def fetch_fp_by_fileid(fileid):
        """
        fetch `dealfiletodir` from `sr_recordfile_dia` by `fileid`
        NOTE: also return telnumber
        """
        with session_scope() as s:
            query = s.query(RFD.dealfiletodir, RFD.filename, RFD.telnumber).filter(RFD.fileid==fileid).first()
            if query:
                return path.join(query.dealfiletodir, query.filename), query.telnumber
            else:
                raise ValueError("fileid: %s does not exist" % fileid)
    
    
    def get_number_pairs():
        """Fetch all number pairs that belongs to a same person"""
        with session_scope() as s:
            query = s.query(MultiTel.numbera, MultiTel.numberb).all() # 直接获取全部,
            if query:
                return [(q.numbera, q.numberb) for q in query]
            else:
                return 
    
    
    ########## For testing ###################
    def get_by_filename(filename, name):
        with session_scope() as s:
            query = s.query(RF).filter(RF.filename==filename).first()
            if query:
                return getattr(query, name)
            else:
                raise AudioRecordNotFound
                        
    关注公众号 海量干货等你
  • 相关阅读:
    Access操作必须使用一个可更新的查询
    SAP资料学习好地方
    Access关键词大全
    WPF零散笔记
    WPF:如何实现单实例的应用程序(Single Instance)
    WPF应用程序启动显示图片资源
    Drawable、Bitmap、Canvas和Paint的关系以及部分使用方法
    C#中一种可调用的异常处理方法
    easyui datagrid 点击列表头排序出现错乱的原因
    easyui datagrid 没数据时显示滚动条的解决方法
  • 原文地址:https://www.cnblogs.com/sowhat1412/p/12734309.html
Copyright © 2011-2022 走看看