zoukankan      html  css  js  c++  java
  • 使用SQLAlchemy操作已存在的数据库的表

    需求场景:
    使用sqlalchmy从现有的表中获取数据(不是自己建表)。百度了一下,网上都是使用sqlalchemy自己先创建表,然后导入数据表的模型类进行增删改查;现在不是自己建表,该如何操作呢?

    操作方案
    通过sqlalchmey执行原生的sql语句,增删改查的原生语句携带表名,就不需要导入数据表的模型类了。

    使用的包:
    SQLAlchemy (1.3.10) + mysql-connector-python (8.0.19)

    提供以下干货:

    • 演示了向原生sql语句传递变量的用法 即动态执行sql语句 更加灵活
    • 通过执行原生的sql语句实现操作已有的表
    • 演示了sql语句根据多字段排序的方法等

    DEMO

    # -*- coding:utf-8 -*-
    from sqlalchemy import create_engine,MetaData,Table,exists
    from sqlalchemy.orm import sessionmaker, scoped_session
    from util.Log import Log
    from conf.parseConfig import parseConf
    # 从配置文件中获取mysql的配置信息
    host = parseConf.get_conf('MySQLInfo', 'host')
    port = parseConf.get_conf('MySQLInfo', 'port')
    dbname = parseConf.get_conf('MySQLInfo', 'dbname')
    usernm = parseConf.get_conf('MySQLInfo', 'usernm')
    passwd = parseConf.get_conf('MySQLInfo', 'passwd')
    
    engine_str = "mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm, passwd, host, port, dbname)
    
    
    class OpsMysql(object):
        def __init__(self, log=Log(__file__).getlog()):
            self.log = log
            self.session = None
            try:
                self.engine = create_engine(
                    engine_str,
                    max_overflow=0,  # 超过连接池大小外最多创建的连接
                    pool_size=5,  # 连接池大小
                    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                    pool_recycle=-1,  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                    # echo=True,  # 显示相应执行的 sql 指令
                    encoding='utf-8'
                )
                SessionFactory = sessionmaker(bind=self.engine)
                self.session = scoped_session(SessionFactory)
            except Exception as e:
                self.log.error(str(e))
                self.log.error("Connect {0}@{1}:{2} failed!".format(dbname, host, port))
    
        def get_session(self):
            return self.session
    
        def getEngine(self):
            return self.engine
    
        def init_db(self, base):
            base.metadata.create_all(self.engine)
    
        def drop_db(self, base):
            base.metadata.drop_all(self.engine)
    
    
    if __name__ == "__main__":
        log = Log(__file__).getlog()
        tt = OpsMysql(log)
        session = tt.get_session()
        
        if session:
            # 通过执行原生的sql语句实现操作已有的表
            # 此处演示了向原生sql语句传递变量的用法 即动态执行sql语句 更加灵活
            mail_id = 1
            res = session.execute('select * from tbl_mail_addr where mail_id='" + mail_id + "' and mail_tp="c"')
            res01 = res.fetchall() # 结果是列表
            print(res01[0])
            # (1, 'c', 1, 'XX@u163.com')
            
            mail_id = '1'
            mail_type = 'c'
            # 查询id为1,类型是c的邮箱信息,并按mail_tp降序,addr_no升序排列,限制查询数量100
            sql = "select * from tbl_mail_addr where tbl_mail_addr.oper_flag='1' and tbl_mail_addr.mail_id='" + mail_id + "' and tbl_mail_addr.mail_tp='" + mail_type + "' order by tbl_mail_addr.mail_tp, tbl_mail_addr.addr_no ASC limit 100"
            result = session.execute(sql)
            value_list = result.fetchall()
            print(value_list)
            # [(1, 'c', 1, 'XX@163.com'), (1, 'c', 2, 'XX@qq.com), (1, 'c', 3, 'XX@qq.com')]
    
        session.close()
    
  • 相关阅读:
    [题解] [JSOI2015] 圈地
    [题解] [JSOI2015] 最小表示
    [题解] [JSOI2015] 套娃
    [题解] [JSOI2015] 非诚勿扰
    [题解] [JSOI2015] 送礼物
    [题解] [JSOI2015] 送礼物
    [题解] [JSOI2015] 子集选取
    [题解] [JSOI2015] salesman
    AC自动机学习笔记
    [题解] [JSOI2014] 矩形并
  • 原文地址:https://www.cnblogs.com/We612/p/12357769.html
Copyright © 2011-2022 走看看