zoukankan      html  css  js  c++  java
  • Python操作MySQL

    Python 操作 MySQL 的方式

    • 通过 pymysql 模块操作
    • 通过 sqlalchemy ORM 框架操作

    pymysql

    pymysql 是 python 下操作 mysql 的模块之一,其操作的方法与 MySQLdb 类似

    1、下载安装

    pip3 install pymysql

    2、操作

    import pymysql
    
    # 创建连接
    connection = pymysql.connect(host='10.211.55.5', port=3306, user='wenchong', password='111111', db='test')
    # 创建游标
    cursor = connection.cursor()
    
    # 执行SQL语句,effect_row 返回受影响的行数
    effect_row1 = cursor.execute("insert into user (username, group_id) values ('testuser', 5)")
    effect_row2 = cursor.execute("update user set username = 'test3' where id = %s", (13,))
    effect_row3 = cursor.executemany("insert into user (username, group_id) values (%s,%s)", [('test1',3),('test2',4)])
    
    # 获取新添加的最后一条数据中自增的 ID
    new_id = cursor.lastrowid
    
    # 执行查询语句
    cursor.execute("select * from user")
    
    # 获取第一条数据
    row_1 = cursor.fetchone()
    print(row_1)
    
    # 获取 N 条数据
    row_2 = cursor.fetchmany(5)
    print(row_2)
    
    # 获取所有的数据
    row_3 = cursor.fetchall()
    print(row_3)
    
    # 提交,否则无法保存新建或修改的数据
    connection.commit()
    
    # 关闭游标和连接
    cursor.close()
    connection.close()

    通过 fetch 获取 select 的数据时按顺序获取,可以通过 cursor.scroll(num,mode) 来移动游标位置,如:

    • cursor.scroll(1,mode='relative')  # 相对当前位置移动
    • cursor.scroll(2,mode='absolute') # 相对绝对位置移动

    通过 fetch 获取数据时默认是元组类型,如果想要字典类型的数据需要将游标设置为字典类型

    connection = pymysql.connect(host='10.211.55.5', port=3306, user='wenchong', password='111111', db='test')
    
    # 设置游标为字典类型
    cursor = connection.cursor(cursor=pymysql.cursors.DictCursor)
    
    cursor.execute("select * from user")
    
    row = cursor.fetchmany(5)
    print(row)
    
    connection.commit()
    
    cursor.close()
    connection.close()

    SQLAlchemy

    SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

    一、安装

    pip3 install sqlalchemy

    SQLAlchemy 本身无法操作数据库,必须依赖 pymysql 等第三方的模块,Dialect 用于和 DBAPI 进行交流,根据配置文件的不同,从而选择不同的 DBAPI 对数据库进行操作

    如:

    MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
       
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
       
    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
       
    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
       
    更多详见:http://docs.sqlalchemy.org/en/rel_1_1/core/engines.html

    二、内部处理

    使用 engine/ConnectionPooling/Dialect 进行数据库操作,engine 通过 ConnectionPooling 连接数据库,再通过 Dialect 对数据进行操作

    # /user/bin/env python
    __author__ = 'wenchong'
    
    from sqlalchemy import create_engine
    
    # max_overflow 最大连接池
    engine = create_engine("mysql+pymysql://wenchong:111111@10.211.55.5:3306/test", max_overflow=5)
    
    # 插入数据
    cur = engine.execute("insert into user (username, group_id) values ('test4', 3)")
    cur1 = engine.execute("insert into user (username, group_id) values (%s,%s), [('test5', 2),('test6', 3)]")
    
    # 获取最后插入的行的自增 id
    new_id = cur1.lastrowid
    
    # 执行查询语句
    cur2 = engine.execute("select * from user")
    
    # 获取查询结果
    row1 = cur2.fetchone()
    row2 = cur2.fetchmany(5)
    row3 = cur2.fetchall()

    三、ORM 功能使用

    1、创建表

    # /user/bin/env python
    __author__ = 'wenchong'
    
    
    from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
    from sqlalchemy.ext.declarative import declarative_base
    
    engine = create_engine("mysql+pymysql://wenchong:111111@10.211.55.5:3306/test", max_overflow=5)
    
    Base = declarative_base()
    
    # 创建单表
    class MyTable(Base):
        __tablename__ = 'myTable'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(32))
    
        # 当打印一个 Users 对象时,返回该方法
        def __repr__(self):
            return self.username
    
    
    # 一对多
    class Groups(Base):
        __tablename__ = 'groups'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(32))
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(32))
        group_id = Column(Integer, ForeignKey('groups.id'))  # 创建主键约束
    
    
    # 多对多
    class Servers(Base):
        __tablename__ = 'servers'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(32), nullable=False, unique=True)
        port = Column(Integer, default=22)
    
    class Products(Base):
        __tablename__ = 'products'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(64), nullable=False, unique=True)
    
    class ServerToProduct(Base):
        __tablename__ = 'servertogroup'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        server_id = Column(Integer, ForeignKey('servers.id'))
        product_id = Column(Integer, ForeignKey('products.id'))
    
    
    def create_table():
        """创建上面的所有表"""
        Base.metadata.create_all(engine)
    
    def drop_table():
        """删除上面的所有表"""
        Base.metadata.drop_all(engine)


    create_table()

    2、添加数据

    from sqlalchemy.orm import sessionmaker
    
    Session
    = sessionmaker(bind=engine) session = Session() # 添加一条数据 obj = MyTable(username='wen1') session.add(obj) # 添加多条数据 session.add_all([ MyTable(username='wen2'), MyTable(username='wen3'), MyTable(username='wen4'), ]) session.commit()

    3、删除数据

    # 删除 MyTable 表中 id 大于 2 的所有行
    session.query(MyTable).filter(MyTable.id > 2).delete()
    session.commit()

    4、修改数据

    session.query(MyTable).filter(MyTable.username == 'wen1').update({'username':'WenChong'})
    session.commit()

    5、查数据

    # MyTable 表中的所有数据,返回值为 list
    session.query(MyTable).all()
    
    # 返回指定的列,返回值为 list
    session.query(MyTable.id, MyTable.username).all()
    
    # MyTable 表中 id 大于 2 的所有数据,返回值为 list
    session.query(MyTable).filter(MyTable.id > 2).all()
    
    # MyTable 表中 id 大于 2 的第一条数据,返回值为 MyTable 对象
    session.query(MyTable).filter(MyTable.id > 2).first()
    # 条件
    ret1 = session.query(MyTable.id, MyTable.username).filter(MyTable.id > 2).all()
    ret2 = session.query(MyTable).filter(MyTable.id.between(2,5)).all()
    ret3 = session.query(MyTable).filter(MyTable.id.in_([3,4,5])).all()
    ret4_not_in = session.query(MyTable).filter(~MyTable.id.in_([3,4,5])).all()
    ret4 = session.query(MyTable).filter(MyTable.id.notin_([3,4,5])).all()
    ret5 = session.query(MyTable).filter(MyTable.id.in_(session.query(MyTable.id).filter(MyTable.username == 'wen2'))).all()
    
    from sqlalchemy import and_, or_
    ret6 = session.query(MyTable.id, MyTable.username).filter(and_(MyTable.id == 5, MyTable.username == 'wen2')).all()
    ret7 = session.query(MyTable.id, MyTable.username).filter(or_(MyTable.id == 5, MyTable.username == 'wen2')).all()
    ret8 = session.query(MyTable.id, MyTable.username).filter(
        or_(
            MyTable.id == 6,
            and_(MyTable.username == 'wen2', MyTable.id > 10),
        )
    ).all()
    
    # 通配符
    ret1 = session.query(MyTable).filter(MyTable.username.like("wen%")).all()
    ret2 = session.query(MyTable).filter(MyTable.username.like("wen_")).all()
    ret3 = session.query(MyTable).filter(MyTable.username.notlike("wen%")).all()
    
    # 限制
    ret1 = session.query(MyTable)[0:5]
    
    # 排序
    ret1 = session.query(MyTable).order_by(MyTable.id.desc()).all()
    ret2 = session.query(MyTable).order_by(MyTable.id.desc(), MyTable.username.asc()).all()
    
    # 分组
    from sqlalchemy import func
    
    ret1 = session.query(MyTable).group_by(MyTable.username).all()
    ret2 = session.query(
        func.sum(MyTable.id),
        func.max(MyTable.id),
        func.min(MyTable.id),
        func.count(MyTable.id),
        MyTable.username).group_by(MyTable.username).all()
    
    ret3 = session.query(
        func.sum(MyTable.id),
        func.max(MyTable.id),
        func.min(MyTable.id),
        func.count(MyTable.id),
        MyTable.username).group_by(MyTable.username).having(func.min(MyTable.id) > 6).all()
    
    # 连表
    ret1 = session.query(Users.username,Groups.name).filter(Users.group_id == Groups.id).all()
    ret2 = session.query(Users.username,Groups.name).join(Users).all()
    ret3 = session.query(Users.username,Groups.name).join(Groups, isouter=True).all()
    
    # 组合
    q1 = session.query(Users.username)
    q2 = session.query(Groups.name)
    ret1 = q1.union(q2).all()
    ret2 = q1.union_all(q2).all()

    6、配置联系

    relationship 用于定义一张表到另外一张表的关系

    """一对多"""
    
    from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker, relationship
    
    engine = create_engine("mysql+pymysql://wenchong:111111@10.211.55.5:3306/test", max_overflow=5)
    
    Base = declarative_base()
    
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(50))
        addresses = relationship("Address", backref="user")  # 配置关系, backref=user,为在 Address 中也添加 User 的关系
    
    class Address(Base):
        __tablename__ = 'addresses'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        email = Column(String(50))
        user_id = Column(Integer, ForeignKey('users.id'))  # 设置主键约束
    
    # 创建表
    Base.metadata.create_all(engine)
    
    # 创建 session
    Session =sessionmaker(bind=engine)
    session = Session()
    
    # 添加数据
    eric = User(
        name='eric',
        addresses = [
            Address(email='eric1@126.com'),
            Address(email='eric2@126.com'),
        ]
        )
    
    session.add(eric)
    session.commit()
    
    # 使用 join 查询数据
    # 通过 address 查 user
    ret = session.query(Address).join(User).filter(Address.user_id == User.id).filter(
        User.name == 'eric').all()
    for item in ret:
        print(item.email)
    
    # 通过 user 查 email
    ret1 = session.query(User).join(Address).filter(Address.user_id == User.id).filter(
        Address.email.like('eric1%')).all()
    for item in ret1:
        print(item.name)
    
    # 使用关系查询数据
    # 通过 user 查看 email
    user_obj = session.query(User).filter(User.name == 'eric').one()
    for obj in user_obj.addresses:
        print(obj.email)
    
    # 通过 email 查看 user
    email_obj = session.query(Address).filter(Address.email.like('eric1%')).one()
    print(email_obj.user.name)
    """多对多"""
    
    from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, ForeignKeyConstraint, UniqueConstraint
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker, relationship
    
    engine = create_engine("mysql+pymysql://wenchong:111111@10.211.55.5:3306/test", max_overflow=5)
    
    Base = declarative_base()
    
    
    class PageToTag(Base):
        __tablename__ = 'pagetotag'
        id = Column(Integer, primary_key=True, autoincrement=True)
        page_id = Column(Integer, ForeignKey('page.id'))
        tag_id = Column(Integer, ForeignKey('tag.id'))
    
    
    class Page(Base):
        __tablename__ = 'page'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(64))
        tags = relationship('Tag', secondary=PageToTag.__table__)
    
    
    class Tag(Base):
        __tablename__ = 'tag'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(32))
        pages = relationship('Page', secondary=PageToTag.__table__)
    
    # 创建表
    Base.metadata.bind = engine
    Base.metadata.create_all()
    
    # 创建session
    Session =sessionmaker(bind=engine)
    session = Session()
    
    # 添加数据
    page = Page(name='Python API Page')
    python_tag = Tag(name='Python')
    api_tag = Tag(name='API')
    
    page.tags.append(api_tag)
    page.tags.append(python_tag)
    
    session.add(page)
    session.commit()
    
    # 查询数据
    page_obj = session.query(Page).filter(Page.name == 'Python API Page').one()
    print([t.name for t in page_obj.tags])

    7、aliases

    当查询时需要涉及到多个表,当同一张表出现多次时,为了避免冲突,使用 aliases 为表创建一个别名

    from sqlalchemy.orm import aliased
    
    aliases1 = aliased(Address)
    aliases2 = aliased(Address)
    
    for u, e1, e2 in session.query(User.name, aliases1.email, aliases2.email).join(
            aliases1, User.addresses).join(
            aliases2, User.addresses).filter(
                    aliases1.email == 'eric1@126.com').filter(
                    aliases2.email == 'eric2@126.com'):
        print(u, e1, e2)
  • 相关阅读:
    [mysql练习]多行结果合并问题练习
    【Python】Python多进程练习
    【mysql练习】转置,总计,分组
    【Mysql】HDFS文件上传流程
    [Jmeter][基础]Jmeter连接IMPALA
    【Linux】 -bash-4.2#问题和Cannot allocate memory
    微服务学习之路
    好的东西一定要收藏-持续更新
    Python日期的加减等操作
    NGINX动态增加模块,平滑升级
  • 原文地址:https://www.cnblogs.com/wenchong/p/5974351.html
Copyright © 2011-2022 走看看