zoukankan      html  css  js  c++  java
  • SQLAlchemy基本操作和常用技巧

    点击打开链接

    Python的ORM框架SQLAlchemy基本操作和常用技巧,包含大量实例,非常好的一个学习SQLAlchemy的教程,需要的朋友可以参考下

    python编程语言下的一款开源软件。提供了SQL工具包及对象关系映射(ORM)工具,使用MIT许可证发行。

    MySQL InnoDB 使用,所以使用其他数据库的也不能完全照搬本文。

    mysql

    [python] view plain copy
    1. apt-get install mysql-server  
    2. apt-get install mysql-client  
    3. apt-get install libmysqlclient15-dev  
    [python] view plain copy
    1. apt-get install python-mysqldb  
    •   
    • python ez_setup.py  
    [python] view plain copy
    1. easy_install MySQL-Python  
    [python] view plain copy
    1. easy_install SQLAlchemy  


    操作系统,遇到问题就 Google 一下吧。我是在 Mac OS X 上开发的,途中也遇到些问题,不过当时没记下来……
    值得一提的是我用了 MySQL-Python 来连 MySQL,因为不支持异步调用,所以和 Tornado 不是很搭。不过性能其实很好,因此以后再去研究下其他方案吧……

    import create_engine  

    • from sqlalchemy.orm import sessionmaker  
    •   
    • DB_CONNECT_STRING = 'mysql+mysqldb://root:123@localhost/ooxx?charset=utf8'  
    • engine = create_engine(DB_CONNECT_STRING, echo=True)  
    • DB_Session = sessionmaker(bind=engine)  
    • session = DB_Session()  


    •     def initialize(self):  
    •         self.session = models.DB_Session()  
    •    
    •   
    •     def on_finish(self):  
    •         self.session.close()  

    • 拿到 session 后,就可以执行 SQL 了:

      [python] view plain copy
      1. session.execute('create database abc')  
      2. print session.execute('show databases').fetchall()  
      3. session.execute('use abc')  
      4. # 建 user 表的过程略  
      5. print session.execute('select * from user where id = 1').first()  
      6. print session.execute('select * from user where id = :id', {'id': 1}).first()  

      于是来定义一个表:

      [python] view plain copy
      1. from sqlalchemy import Column  
      2. from sqlalchemy.types import CHAR, Integer, String  
      3. from sqlalchemy.ext.declarative import declarative_base  
      4.   
      5. BaseModel = declarative_base()  
      6.   
      7. def init_db():  
      8.     BaseModel.metadata.create_all(engine)  
      9.   
      10. def drop_db():  
      11.     BaseModel.metadata.drop_all(engine)  
      12.   
      13.   
      14. class User(BaseModel):  
      15.     __tablename__ = 'user'  
      16.   
      17.     id = Column(Integer, primary_key=True)  
      18.     name = Column(CHAR(30)) # or Column(String(30))  
      19.   
      20. init_db()  

      接着就开始使用这个表吧:

      [python] view plain copy
      1. from sqlalchemy import func, or_, not_  
      2.   
      3. user = User(name='a')  
      4. session.add(user)  
      5. user = User(name='b')  
      6. session.add(user)  
      7. user = User(name='a')  
      8. session.add(user)  
      9. user = User()  
      10. session.add(user)  
      11. session.commit()  
      12.   
      13. query = session.query(User)  
      14. print query # 显示SQL 语句  
      15. print query.statement # 同上  
      16. for user in query: # 遍历时查询  
      17.     print user.name  
      18. print query.all() # 返回的是一个类似列表的对象  
      19. print query.first().name # 记录不存在时,first() 会返回 None  
      20. # print query.one().name # 不存在,或有多行记录时会抛出异常  
      21. print query.filter(User.id == 2).first().name  
      22. print query.get(2).name # 以主键获取,等效于上句  
      23. print query.filter('id = 2').first().name # 支持字符串  
      24.   
      25. query2 = session.query(User.name)  
      26. print query2.all() # 每行是个元组  
      27. print query2.limit(1).all() # 最多返回 1 条记录  
      28. print query2.offset(1).all() # 从第 2 条记录开始返回  
      29. print query2.order_by(User.name).all()  
      30. print query2.order_by('name').all()  
      31. print query2.order_by(User.name.desc()).all()  
      32. print query2.order_by('name desc').all()  
      33. print session.query(User.id).order_by(User.name.desc(), User.id).all()  
      34.   
      35. print query2.filter(User.id == 1).scalar() # 如果有记录,返回第一条记录的第一个元素  
      36. print session.query('id').select_from(User).filter('id = 1').scalar()  
      37. print query2.filter(User.id > 1, User.name != 'a').scalar() # and  
      38. query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and  
      39. query3 = query3.filter(User.name != 'a')  
      40. print query3.scalar()  
      41. print query2.filter(or_(User.id == 1, User.id == 2)).all() # or  
      42. print query2.filter(User.id.in_((1, 2))).all() # in  
      43.   
      44. query4 = session.query(User.id)  
      45. print query4.filter(User.name == None).scalar()  
      46. print query4.filter('name is null').scalar()  
      47. print query4.filter(not_(User.name == None)).all() # not  
      48. print query4.filter(User.name != None).all()  
      49.   
      50. print query4.count()  
      51. print session.query(func.count('*')).select_from(User).scalar()  
      52. print session.query(func.count('1')).select_from(User).scalar()  
      53. print session.query(func.count(User.id)).scalar()  
      54. print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表  
      55. print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == # 可以用 limit() 限制 count() 的返回数  
      56. print session.query(func.sum(User.id)).scalar()  
      57. print session.query(func.now()).scalar() # func 后可以跟任意函数名,只要该数据库支持  
      58. print session.query(func.current_timestamp()).scalar()  
      59. print session.query(func.md5(User.name)).filter(User.id == 1).scalar()  
      60.   
      61. query.filter(User.id == 1).update({User.name: 'c'})  
      62. user = query.get(1)  
      63. print user.name  
      64.   
      65. user.name = 'd'  
      66. session.flush() # 写数据库,但并不提交  
      67. print query.get(1).name  
      68.   
      69. session.delete(user)  
      70. session.flush()  
      71. print query.get(1)  
      72.   
      73. session.rollback()  
      74. print query.get(1).name  
      75. query.filter(User.id == 1).delete()  
      76. session.commit()  
      77. print query.get(1)  

      下面开始介绍一些进阶的知识。

       

      [python] view plain copy
      1. session.execute(  
      2.     User.__table__.insert(),  
      3.     [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]  
      4. )  
      5. session.commit()  
      如何让执行的 SQL 语句增加前缀?
      使用 query 对象的 prefix_with() 方法:

       

    • session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})   1, name='ooxx')  
    • session.merge(user)  
    • session.commit()  
    • 如何使用无符号整数?
      可以使用 MySQL 的方言:

      import INTEGER  

    • id = Column(INTEGER(unsigned=True), primary_key=True)  
    • 'from', CHAR(10))  
      [python] view plain copy
      1. User.name.property.columns[0].type.length  
      [python] view plain copy
      1. class User(BaseModel):  
      2.     __table_args__ = {  
      3.         'mysql_engine': 'InnoDB',  
      4.         'mysql_charset': 'utf8'  
      5.     }  

      不建议全用 utf8mb4 代替 utf8,因为前者更慢,索引会占用更多空间。

      import randint  

    • from sqlalchemy import ForeignKey  
    •   
    • class User(BaseModel):  
    •     __tablename__ = 'user'  
    •     id = Column(Integer, primary_key=True)  
    •     age = Column(Integer)  
    •   
    • class Friendship(BaseModel):  
    •     __tablename__ = 'friendship'  
    •     id = Column(Integer, primary_key=True)  
    •     user_id1 = Column(Integer, ForeignKey('user.id'))  
    •     user_id2 = Column(Integer, ForeignKey('user.id'))  
    •   
    • for i in xrange(100):  
    •     session.add(User(age=randint(1, 100)))  
    • session.flush() # 或 session.commit(),执行完后,user 对象的 id 属性才可以访问(因为 id 是自增的)  
    • for i in xrange(100):  
    •     session.add(Friendship(user_id1=randint(1, 100), user_id2=randint(1, 100)))  
    • session.commit()  
    • session.query(User).filter(User.age < 50).delete()  
    • 执行这段代码时,你应该会遇到一个错误:

      1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`ooxx`.`friendship`, CONSTRAINT `friendship_ibfk_1` FOREIGN KEY (`user_id1`) REFERENCES `user` (`id`))') 'DELETE FROM user WHERE user.age < %s' (50,)  原因是删除 user 表的数据,可能会导致 friendship 的外键不指向一个真实存在的记录。在默认情况下,MySQL 会拒绝这种操作,也就是 RESTRICT。InnoDB 还允许指定 ON DELETE 为 CASCADE 和 SET NULL,前者会删除 friendship 中无效的记录,后者会将这些记录的外键设为 NULL。
      除了删除,还有可能更改主键,这也会导致 friendship 的外键失效。于是相应的就有 ON UPDATE 了。其中 CASCADE 变成了更新相应的外键,而不是删除。
      而在 SQLAlchemy 中是这样处理的:

      [python] view plain copy
      1. class Friendship(BaseModel):  
      2.     __tablename__ = 'friendship'  
      3.     id = Column(Integer, primary_key=True)  
      4.     user_id1 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))  
      5.     user_id2 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))  
      如何连接表?
      [python] view plain copy
      1. from sqlalchemy import distinct  
      2. from sqlalchemy.orm import aliased  
      3.   
      4. Friend = aliased(User, name='Friend')  
      5. print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).all() # 所有有朋友的用户  
      6. print session.query(distinct(User.id)).join(Friendship, User.id == Friendship.user_id1).all() # 所有有朋友的用户(去掉重复的)  
      7. print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).distinct().all() # 同上  
      8. print session.query(Friendship.user_id2).join(User, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 所有被别人当成朋友的用户  
      9. print session.query(Friendship.user_id2).select_from(User).join(Friendship, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 同上,join 的方向相反,但因为不是 STRAIGHT_JOIN,所以 MySQL 可以自己选择顺序  
      10. print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).all() # 用户及其朋友  
      11. print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).filter(User.id < 10).all() # id 小于 10 的用户及其朋友  
      12. print session.query(User.id, Friend.id).join(Friendship, User.id == Friendship.user_id1).join(Friend, Friend.id == Friendship.user_id2).all() # 两次 join,由于使用到相同的表,因此需要别名  
      13. print session.query(User.id, Friendship.user_id2).outerjoin(Friendship, User.id == Friendship.user_id1).all() # 用户及其朋友(无朋友则为 None,使用左连接)  
      这里我没提到 relationship,虽然它看上去很方便,但需要学习的内容实在太多,还要考虑很多性能上的问题,所以干脆自己 join 吧。

      为什么无法删除 in 操作查询出来的记录?

      [python] view plain copy
      1. session.query(User).filter(User.id.in_((1, 2, 3))).delete()  

      抛出这样的异常:

      not evaluate current criteria in Python.  Specify 'fetch' or False for the synchronize_session parameter.  但这样是没问题的:

      1, User.id == 2, User.id == 3)).delete()  搜了下找到《Sqlalchemy delete subquery》这个问题,提到了 delete 的一个注意点:删除记录时,默认会尝试删除 session 中符合条件的对象,而 in 操作估计还不支持,于是就出错了。解决办法就是删除时不进行同步,然后再让 session 里的所有实体都过期:

      [python] view plain copy
      1. session.query(User).filter(User.id.in_((1, 2, 3))).delete(synchronize_session=False)  
      2. session.commit() # or session.expire_all()  
      此外,update 操作也有同样的参数,如果后面立刻提交了,那么加上 synchronize_session=False 参数会更快。
    •     @classmethod  
    •     def get_by_id(cls, session, id, columns=None, lock_mode=None):  
    •         if hasattr(cls, 'id'):  
    •             scalar = False  
    •             if columns:  
    •                 if isinstance(columns, (tuple, list)):  
    •                     query = session.query(*columns)  
    •                 else:  
    •                     scalar = True  
    •                     query = session.query(columns)  
    •             else:  
    •                 query = session.query(cls)  
    •             if lock_mode:  
    •                 query = query.with_lockmode(lock_mode)  
    •             query = query.filter(cls.id == id)  
    •             if scalar:  
    •                 return query.scalar()  
    •             return query.first()  
    •         return None  
    •     BaseModel.get_by_id = get_by_id  
    •     @classmethod  
    •     def get_all(cls, session, columns=None, offset=None, limit=None, order_by=None, lock_mode=None):  
    •         if columns:  
    •             if isinstance(columns, (tuple, list)):  
    •                 query = session.query(*columns)  
    •             else:  
    •                 query = session.query(columns)  
    •                 if isinstance(columns, str):  
    •                     query = query.select_from(cls)  
    •         else:  
    •             query = session.query(cls)  
    •         if order_by is not None:  
    •             if isinstance(order_by, (tuple, list)):  
    •                 query = query.order_by(*order_by)  
    •             else:  
    •                 query = query.order_by(order_by)  
    •         if offset:  
    •             query = query.offset(offset)  
    •         if limit:  
    •             query = query.limit(limit)  
    •         if lock_mode:  
    •             query = query.with_lockmode(lock_mode)  
    •         return query.all()  
    •     BaseModel.get_all = get_all  
    •     @classmethod  
    •     def count_all(cls, session, lock_mode=None):  
    •         query = session.query(func.count('*')).select_from(cls)  
    •         if lock_mode:  
    •             query = query.with_lockmode(lock_mode)  
    •         return query.scalar()  
    •     BaseModel.count_all = count_all  
    •     @classmethod  
    •     def exist(cls, session, id, lock_mode=None):  
    •         if hasattr(cls, 'id'):  
    •             query = session.query(func.count('*')).select_from(cls).filter(cls.id == id)  
    •             if lock_mode:  
    •                 query = query.with_lockmode(lock_mode)  
    •             return query.scalar() > 0  
    •         return False  
    •     BaseModel.exist = exist  
    •     @classmethod  
    •     def set_attr(cls, session, id, attr, value):  
    •         if hasattr(cls, 'id'):  
    •             session.query(cls).filter(cls.id == id).update({  
    •                 attr: value  
    •             })  
    •             session.commit()  
    •     BaseModel.set_attr = set_attr  
    •     @classmethod  
    •     def set_attrs(cls, session, id, attrs):  
    •         if hasattr(cls, 'id'):  
    •             session.query(cls).filter(cls.id == id).update(attrs)  
    •             session.commit()  
    •     BaseModel.set_attrs = set_attrs  
    • 虽然很拙劣,但确实能用。顺便还附送了一些有用的玩意,你懂的。
      2.设置 declarative_base() 的 cls 参数:
      [python] view plain copy
      1. BaseModel = declarative_base(cls=ModelMixin)  
      这种方法不需要执行“BaseModel.get_by_id = get_by_id”之类的代码。不足之处就是 PyCharm 仍然无法找到这些方法的位置。
      3.设置 __abstract__ 属性:
      [python] view plain copy
      1. class BaseModel(BaseModel):  
      2.     __abstract__ = True  
      3.     __table_args__ = { # 可以省掉子类的 __table_args__ 了  
      4.         'mysql_engine': 'InnoDB',  
      5.         'mysql_charset': 'utf8'  
      6.     }  
      7.     # ...  
      这种方法最简单,也可以继承出多个类。

      如何正确使用事务?

      假设有一个简单的银行系统,一共两名用户:

      [python] view plain copy
      1. class User(BaseModel):  
      2.     __tablename__ = 'user'  
      3.     id = Column(Integer, primary_key=True)  
      4.     money = Column(DECIMAL(10, 2))  
      5. class TanseferLog(BaseModel):  
      6.     __tablename__ = 'tansefer_log'  
      7.     id = Column(Integer, primary_key=True)  
      8.     from_user = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))  
      9.     to_user = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))  
      10.     amount = Column(DECIMAL(10, 2))  
      11. user = User(money=100)  
      12. session.add(user)  
      13. user = User(money=0)  
      14. session.add(user)  
      15. session.commit()  
      然后开两个 session,同时进行两次转账操作:
    • session2 = DB_Session()  
    • user1 = session1.query(User).get(1)  
    • user2 = session1.query(User).get(2)  
    • if user1.money >= 100:  
    •     user1.money -= 100  
    •     user2.money += 100  
    •     session1.add(TanseferLog(from_user=1, to_user=2, amount=100))  
    • user1 = session2.query(User).get(1)  
    • user2 = session2.query(User).get(2)  
    • if user1.money >= 100:  
    •     user1.money -= 100  
    •     user2.money += 100  
    •     session2.add(TanseferLog(from_user=1, to_user=2, amount=100))  
    • session1.commit()  
    • session2.commit()  
    • 现在看看结果:

      > user1.money  

    • Decimal('0.00')  
    • >>> user2.money  
    • Decimal('100.00')  
    • >>> session.query(TanseferLog).count()  
    • 2L  

    两次转账都成功了,但是只转走了一笔钱,这明显不科学。

    可见 MySQL InnoDB 虽然支持事务,但并不是那么简单的,还需要手动加锁。
    首先来试试读锁:

    [python] view plain copy
    1. user1 = session1.query(User).with_lockmode('read').get(1)  
    2. user2 = session1.query(User).with_lockmode('read').get(2)  
    3. if user1.money >= 100:  
    4.     user1.money -= 100  
    5.     user2.money += 100  
    6.     session1.add(TanseferLog(from_user=1, to_user=2, amount=100))  
    7. user1 = session2.query(User).with_lockmode('read').get(1)  
    8. user2 = session2.query(User).with_lockmode('read').get(2)  
    9. if user1.money >= 100:  
    10.     user1.money -= 100  
    11.     user2.money += 100  
    12.     session2.add(TanseferLog(from_user=1, to_user=2, amount=100))  
    13. session1.commit()  
    14. session2.commit()  

    现在在执行 session1.commit() 的时候,因为 user1 和 user2 都被 session2 加了读锁,所以会等待锁被释放。超时以后,session1.commit() 会抛出个超时的异常,如果捕捉了的话,或者 session2 在另一个进程,那么 session2.commit() 还是能正常提交的。这种情况下,有一个事务是肯定会提交失败的,所以那些更改等于白做了。

    接下来看看写锁,把上段代码中的 'read' 改成 'update' 即可。这次在执行 select 的时候就会被阻塞了:
    user1 = session2.query(User).with_lockmode('update').get(1)
    这样只要在超时期间内,session1 完成了提交或回滚,那么 session2 就能正常判断 user1.money >= 100 是否成立了。
    由此可见,如果需要更改数据,最好加写锁。

    那么什么时候用读锁呢?如果要保证事务运行期间内,被读取的数据不被修改,自己也不去修改,加读锁即可。
    举例来说,假设我查询一个用户的开支记录(同时包含余额和转账记录),可以直接把 user 和 tansefer_log 做个内连接。
    但如果用户的转账记录特别多,我在查询前想先验证用户的密码(假设在 user 表中),确认相符后才查询转账记录。而这两次查询的期间内,用户可能收到了一笔转账,导致他的 money 字段被修改了,但我在展示给用户时,用户的余额仍然没变,这就不正常了。
    而如果我在读取 user 时加了读锁,用户是无法收到转账的(因为无法被另一个事务加写锁来修改 money 字段),这就保证了不会查出额外的 tansefer_log 记录。等我查询完两张表,释放了读锁后,转账就可以继续进行了,不过我显示的数据在当时的确是正确和一致的。

    另外要注意的是,如果被查询的字段没有加索引的话,就会变成锁整张表了:

    [python] view plain copy
    1. session1.query(User).filter(User.id > 50).with_lockmode('update').all()  
    2. session2.query(User).filter(User.id < 40).with_lockmode('update').all() # 不会被锁,因为 id 是主键  
    3. session1.rollback()  
    4. session2.rollback()  
    5. session1.query(User).filter(User.money == 50).with_lockmode('update').all()  
    6. session2.query(User).filter(User.money == 40).with_lockmode('update').all() # 会等待解锁,因为 money 上没有索引  

    要避免的话,可以这样:

    10, 2), index=True)  另一个注意点是子事务。
    InnoDB 支持子事务(savepoint 语句),可以简化一些逻辑。
    例如有的方法是用于改写数据库的,它执行时可能提交了事务,但在后续的流程中却执行失败了,却没法回滚那个方法中已经提交的事务。这时就可以把那个方法当成子事务来运行了:

    [python] view plain copy
    1. def step1():  
    2.     # ...  
    3.     if success:  
    4.         session.commit()  
    5.         return True  
    6.     session.rollback()  
    7.     return False  
    8. def step2():  
    9.     # ...  
    10.     if success:  
    11.         session.commit()  
    12.         return True  
    13.     session.rollback()  
    14.     return False  
    15. session.begin_nested()  
    16. if step1():  
    17.     session.begin_nested()  
    18.     if step2():  
    19.         session.commit()  
    20.     else:  
    21.         session.rollback()  
    22. else:  
    23.     session.rollback()  


    此外,rollback 一个子事务,可以释放这个子事务中获得的锁,提高并发性和降低死锁概率。

    如何对一个字段进行自增操作?
    最简单的办法就是获取时加上写锁:

    [python] view plain copy
    1. user = session.query(User).with_lockmode('update').get(1)  
    2. user.age += 1  
    3. session.commit()  

    如果不想多一次读的话,这样写也是可以的:

    [python] view plain copy
    1. session.query(User).filter(User.id == 1).update({  
    2.     User.age: User.age + 1  
    3. })  
    4. session.commit()  
    5. # 其实字段之间也可以做运算:  
    6. session.query(User).filter(User.id == 1).update({  
    7.     User.age: User.age + User.id  
    8. })  



     
  • 相关阅读:
    Codeforces 1265A Beautiful String
    1039 Course List for Student (25)
    1038 Recover the Smallest Number (30)
    1037 Magic Coupon (25)
    1024 Palindromic Number (25)
    1051 Pop Sequence (25)
    1019 General Palindromic Number (20)
    1031 Hello World for U (20)
    1012 The Best Rank (25)
    1011 World Cup Betting (20)
  • 原文地址:https://www.cnblogs.com/wt11/p/7164066.html
Copyright © 2011-2022 走看看