zoukankan      html  css  js  c++  java
  • SQLAlchemy的使用

    sqlalchemy和django中的ORM框架作比较

    sqlalchemy是可以在任何使用sql语句查询的时候使用的
    而django中的orm操作只是在django中才可以使用

    SQLAlchemy的使用:

    单表操作:

    创建数据表
    from sqlalchemy.ext.declarative import declarative_base
    
    #基类,和django中的models.Model一样
    BaseModel = declarative_base()
    
    from sqlalchemy import Column,Integer,String
    
    #像django中创建表结构一样
    class User(BaseModel):
        #这里的tablename一般都是类名的小写
        __tablename__ = 'user'
        id = Cloumn(Integer,primary_key=True,autoincrement=True)
        username = Column(String(32),nullable=False,unique=True,index=True)
        password=Column(String(32),nullable=False)

    from sqlalchemy.engine import create_engine
    #创建引擎
         数据库名+引擎名://数据库用户名:数据库密码@ip地址:端口/数据库名?charset=utf8
    engine=create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s18?charset=utf8")
    #执行创建的指令
    BaseModel.metadata.create_all(engine)

    单表的增删改查(C增R查U改D删)

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.engine import create_engine
    
    engine=create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s18?charset=utf8")
    
    #选择数据库
    db = sessionmaker(engine)
    #新建查询窗口
    db_session = db()
    #创建一个对象
    u=User(name='alex')
    
    #新增数据
    db_session.add(u)
    #db_session.add_all(u_list)
    #只要对数据库进行了修改,就需要commit一下
    db_session.commit()
    #关闭窗口
    #db_session.close()
    
    #查询数据
    db_session.query(User).all() #查询所有数据,得到的是一个列表中装着查询到的数据,如果需要取出他们里面的属性,直接打点调用就行了
    
    #带条件的查询,查询第一条数据
    db_session.query(User).filter(id==1).first()
    
    #并列条件,就是或者的意思
    db_session.query(User).filter(id==1,name=='alex').first()
    
    #修改数据
    db_session.query(User).filter(id==1).update({name:'wusir'})
    db_session.commit()
    
    #删除数据
    db_session.query(User).filter(id==2).delete()
    db_session.commit()

    from sqlalchemy.sql import and_,or_

    #就是并且的意思,两个条件都要满足
    db_session.query(User).filter(and_(User.id >= 3,User.name == 'alex')).all()
    #就是或者的意思,满足一个条件就行
    db_session.query(User).filter(or_(User.id >3 ,User.name == 'wusir')).all()
    #结合着使用
    db_session.query(User).filter(or_(User.id==1,User.name=='佩琪',and_(User.id==2,User.name=='alex'))).all()

    #使用原生的sql筛选条件
    db_session.query(User).filter(id=1).first()

     # 字符串匹配方式筛选条件 并使用 order_by进行排序
     r6 = db_session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='DragonFire').order_by(User.id).all()

     #原生SQL查询
     r7 = db_session.query(User).from_statement(text("SELECT * FROM User where name=:name")).params(name='DragonFire').all()

     # 筛选查询列
     # query的时候我们不在使用User ORM对象,而是使用User.name来对内容进行选取
     user_list = db_session.query(User.name).all()
     print(user_list)
     for row in user_list:
     print(row.name)

     # 别名映射 name as nick
     user_list = db_session.query(User.name.label("nick")).all()
     print(user_list)
     for row in user_list:
     print(row.nick) # 这里要写别名了

     # 筛选条件格式
     user_list = db_session.query(User).filter(User.name == "DragonFire").all()
     user_list = db_session.query(User).filter(User.name == "DragonFire").first()
     user_list = db_session.query(User).filter_by(name="DragonFire").first()
     for row in user_list:
     print(row.nick)

     # 复杂查询
     from sqlalchemy.sql import text
     user_list = db_session.query(User).filter(text("id<:value and name=:name")).params(value=3,name="DragonFire")

     # 查询语句
     from sqlalchemy.sql import text
     user_list = db_session.query(User).filter(text("select * from User id<:value and name=:name")).params(value=3,name="DragonFire")

     # 排序 :
     user_list = db_session.query(User).order_by(User.id).all()
     user_list = db_session.query(User).order_by(User.id.desc()).all()
     for row in user_list:
     print(row.name,row.id)

     #其他查询条件
     """
     ret = session.query(User).filter_by(name='DragonFire').all()
     ret = session.query(User).filter(User.id > 1, User.name == 'DragonFire').all()
     ret = session.query(User).filter(User.id.between(1, 3), User.name == 'DragonFire').all() # between 大于1小于3的
     ret = session.query(User).filter(User.id.in_([1,3,4])).all() # in_([1,3,4]) 只查询id等于1,3,4的
     ret = session.query(User).filter(~User.id.in_([1,3,4])).all() # ~xxxx.in_([1,3,4]) 查询不等于1,3,4的
     ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='DragonFire'))).all() 子查询
     from sqlalchemy import and_, or_
     ret = session.query(User).filter(and_(User.id > 3, User.name == 'DragonFire')).all()
     ret = session.query(User).filter(or_(User.id < 2, User.name == 'DragonFire')).all()
     ret = session.query(User).filter(
     or_(
     User.id < 2,
     and_(User.name == 'eric', User.id > 3),
     User.extra != ""
     )).all()
     # select * from User where id<2 or (name="eric" and id>3) or extra != ""

     # 通配符
     ret = db_session.query(User).filter(User.name.like('e%')).all()
     ret = db_session.query(User).filter(~User.name.like('e%')).all() 

     # 限制
     ret = db_session.query(User)[1:2]

     # 排序
     ret = db_session.query(User).order_by(User.name.desc()).all()
     ret = db_session.query(User).order_by(User.name.desc(), User.id.asc()).all()

     # 分组
     from sqlalchemy.sql import func

     ret = db_session.query(User).group_by(User.extra).all()
     ret = db_session.query(
     func.max(User.id),
     func.sum(User.id),
     func.min(User.id)).group_by(User.name).all()

     ret = db_session.query(
     func.max(User.id),
     func.sum(User.id),
     func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all()
     """

     # 关闭连接
     db_session.close()

    一对多查询

    from sqlalchemy.ext.declarative import declarative_base
    
    basemodel = declarative_base()
    
    from sqlalchemy import Column,Integer,String
    from sqlalchemy.orm import relationship
    
    class School(basemodel):
        __tablename__ ='school'
        id = Cloumn(Integer,primary_key=True)
        name = Cloumn(String(10),nummable=False)
    
    class Student(basemodel):
        __tablename__ = 'student'
        id = Cloumn(Integer,primary_key=True)
        name = Cloumn(String,nullable=False)
        sch_id=Cloumn(Integer,ForeignKey('school.id'))
    
        #重点
        stu2sch = relationship('School',backref='sch2stu')

    from sqlalchemy.engine import create_engine
    engine = create_engine("mysql_pymysql://root:123@127.0.0.1:3306/s18?charset=utf8")

    basemodel.metadata.create_all(engine)

    from sqlalchemy.orm import sessionmaker

    db = sessionmaker(engine)
    db_session = db()

    #正向查询,添加数据
    s=student(name='alex',stu2sch=School(name='oldboy'))
    db_session.add(s)
    db_session.commit()

    db_session.close()
    #反向查询,添加数据
    sch=school(name='oldboy')
    sch.sch2stu = [
      Student(name='li'),
      Student(name='hua')
           ]
    db_session.add(sch)
    db_session.commit()

    #正向查询数据
    ret = db_session.query(Student).first()
    ret = db_session.query(Student).all()
    #反向查询数据
    ret = db_session.query(School).all()
    for sch in ret:
      for stu in sch.sch2stu:
        print(sch.name,stu.name)

    #修改和删除都是和上面的相同,在这里我们就不写了

    多对多查询 

    from sqlalchemy.ext.declarative import declarative_base
    
    BaseModel = declarative_base()
    
    from sqlalchemy import Column, Integer, String,ForeignKey
    from sqlalchemy.orm import relationship
    
    class Boy(BaseModel):
        __tablename__ = "boy"
        id = Column(Integer,primary_key=True)
        name = Column(String(32),nullable=False)
    
    class Girl(BaseModel):
        __tablename__ = "girl"
        id = Column(Integer,primary_key=True)
        name = Column(String(32),nullable=False)
        #精髓
        gyb = relationship('Boy',backref='byg',secondary='hotel')
    
    class Hotel(BaseModel):
        __tablename__ = "hotel"
        id = Column(Integer,primary_key=True)
        b_id = Column(Integer,ForeignKey("boy.id"))
        g_id = Column(Integer,ForeignKey("girl.id"))
    
    
    from sqlalchemy.engine import create_engine
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s18sqlalchemy?charset=utf8")
    
    BaseModel.metadata.create_all(engine)

    from sqlalchemy.orm import sessionmaker

    db = sessionmaker(engine)
    db_session = db()

     1.增加数据 relationship 正向添加
     g = Girl(name="赵丽颖",gyb=[Boy(name="冯绍峰"),Boy(name="DragonFire")])
     db_session.add(g)
     db_session.commit()

    
    

     2.增加数据 relationship 反向添加
     b = Boy(name="Alexander.DSB.Li")
     b.byg = [Girl(name="娟儿"),Girl(name="罗玉凤")]
     db_session.add(b)
     db_session.commit()

    
    


     3.查询数据 relationship 正向
     res = db_session.query(Girl).first()
     for b in res.gyb:
         print(res.name,b.name)

    
    

     4.查询数据 relationship 反向
     res = db_session.query(Boy).all()
     for b in res:
        for g in b.byg:
            print(b.name,g.name)

     

     Flask中使用sqlalchemy

    在新建的flask项目中

    views.py

    from flask import Blueprint,render_template
    from app01.models import User,db
    
    user = Blueprint("user",__name__)
    
    
    @user.route("/add_user",methods=["POST","GET"])
    def add_user():
        u = User(name="123456")
        db.session.add(u)
        db.session.commit()
        return render_template("add_user.html")
    
    
    @user.route("/login_user",methods=["POST","GET"])
    def login_user():
        user_name = "123456"
        user_info = User.query.filter(User.name==user_name).first()
        print(user_info,user_info.name)
        return "200 OK"
    
    
    @user.route("/del_user",methods=["POST","GET"])
    def del_user():
        user_name = "123456"
        user_info = User.query.filter(User.name==user_name).delete()
        db.session.commit()
        print(user_info)
        return "200 OK"

    __init__.py中

    from flask import Flask
    from flask_session import Session
    from flask_cors import CORS
    
    
    from flask_sqlalchemy import SQLAlchemy
    db = SQLAlchemy()
    
    from app01.views import users
    
    
    
    def create_app():
        app = Flask(__name__)
        app.config["DEBUG"] = True
        app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:123@127.0.0.1:3306/s18sqlalchemy?charset=utf8"
        app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
    
        Session(app)
        CORS(app)
        db.init_app(app)
    
        app.register_blueprint(users.user)
    
        return app

    models.py中

    # from sqlalchemy.ext.declarative import declarative_base
    # BaseModel=declarative_base()
    from app01 import db
    
    class User(db.Model):
        __tablename__="user"
        __table_args__={}
        id = db.Column(db.Integer,primary_key=True)
        name = db.Column(db.String(32),nullable=False)
    
    
    if __name__ == '__main__':
        from app01 import create_app
        app = create_app()
        db.init_app(app)
        db.drop_all(app=app)
        db.create_all(app=app)
  • 相关阅读:
    JAVA的反射理解
    网络编程-小结
    JAVA多线程的总结
    Mysql基本语句的总结
    IO流
    JAVA集合介绍
    时间复杂度
    JAVA面向对象-多态的理解
    求A的B次方
    最大公约数
  • 原文地址:https://www.cnblogs.com/zty1304368100/p/11116642.html
Copyright © 2011-2022 走看看