zoukankan      html  css  js  c++  java
  • SQLAlchemy 嵌套事务的解决方案

     sqlachemy 是python的orm框架,在使用一段时间后,我们通常会出现事务嵌套的情况,看到很多人写代码的时候,居然是session到处传递,这无疑是加大了代码之间的耦合度。
        案例:
        def save(session):
            # TODO
    
        def update(session):
            # TODO
    
        def service():
            session = getSession();
            try:
                save(session);
                update(session);
                session.commit();
            except Exception as e:
                session.rollback();
            finally:
                if not session:
                    session.close();
    
        假设save和update是同一个事务,但是上述的实践缺强制了save和update的 session相耦合来达成,好的实践应该是:save和update无任何关系,只是在实现业务逻辑时,组合到一个事务,确保事务性即可,即:
    
        def service():
            try:
                save();
                update();
            except Exception as e:
                # TODO 
        因此如何解决这个问题是本文需要阐述的主题。
    

    解决方案是:

    ACID是事务的四个基本特征,通常我们的理解事务是一个操作单元,要么一起成功要么一起失败(原子性);通过一个例子来直接说明如何解决的。
        场景:注册用户(添加一个用户,会未用户送10个积分)

    创建表

    create table user(
            id int not null auto_increment,
            name varchar(255) not null default '' comment '用户名',
            created_at datetime not null default current_timestamp comment '创建时间',
            updated_at datetime not null default current_timestamp comment '更新时间',
            primary key (`id`)
        )engine innodb charset=utf8 comment '用户表';  
    
        create table user_credits(
            id int not null auto_increment,
            user_id int not null default 0 comment '用户ID',
            user_name varchar(255) not null default '' comment '用户名',
            score int not null default 0 comment '积分',
            created_at datetime not null default current_timestamp comment '创建时间',
            updated_at datetime not null default current_timestamp comment '更新时间',
            primary key (`id`),
            unique key `uk_user_id` (`user_id`)
        )engine innodb charset=utf8 comment '用户积分表';

    sqlalchemy 实现

    # -*- coding:utf-8 -*-
    
    import datetime
    from sqlalchemy import create_engine, Column, Integer, String, DateTime
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker, scoped_session
    
    engine = create_engine("mysql+pymysql://root:root@localhost:3306/csdn", echo=True)
    
    # 必须使用scoped_session,域session可以将session进行共享
    DBSession = scoped_session(sessionmaker(bind=engine))
    
    BaseModel = declarative_base()
    
    
    # ----------- Relation Model Object---------------- #
    
    class User(BaseModel):
    
        __tablename__ = "user"
    
        id = Column(Integer, primary_key=True)
        name = Column(String)
        created_at = Column(DateTime, default=datetime.datetime.now)
        updated_at = Column(DateTime, default=datetime.datetime.now)
    
    
    class UserCredits(BaseModel):
    
        __tablename__ = "user_credits"
    
        id = Column(Integer, primary_key=True)
        user_id = Column(Integer)
        user_name = Column(String)
        score = Column(Integer)
        created_at = Column(DateTime, default=datetime.datetime.now)
        updated_at = Column(DateTime, default=datetime.datetime.now)
    
    # ----------- Service implements---------------- #
    
    
    def add_user(user):
        " 添加用户 "
        session = DBSession()
        try:
            session.add(user)
            session.commit()
        except Exception as e:
            session.rollback()
            print("AddUser: ======={}=======".format(e))
        finally:
            if not session:
                session.close()
    
    
    def add_user_credits(userCredits, interrupt=True):
        " 添加用户积分记录 "
        session = DBSession()
        try:
            if interrupt:
                raise Exception("--- interrupt ---")
    
            session.add(userCredits)
            session.commit()
        except Exception as e:
            session.rollback()
            print("AddUserCredits: ======={}=======".format(e))
        finally:
            if not session:
                session.close()
    
    
    def regist_user():
    
        session = DBSession()
        try:
            # 开启子事务
            session.begin(subtransactions=True)
    
            # TODO Service
            user = User(name='wangzhiping')
            add_user(user)
            add_user_credits(UserCredits(
                user_id=user.id,
                user_name=user.name,
                score=10
            ), False)
    
            session.commit()
        except Exception as e:
            session.rollback()
            print("AddUserCredits: ======={}=======".format(e))
        finally:
            if not session:
                session.close()
    
    # ---------- exec -----------
    regist_user()
     1,设置session时,需要指定为scoped_session,目的是session可以共享(ThreadLocal);
        2,session.begin(subtransactions=True) 开启子事务管理;
        这是实际上regist_user是在同一个线程中的session,这是add_user,add_user_credits实际上session是同一个,所以可以实现。其实这个可以更进一步扩展,把事务隔离级别,传播属性,这里不做介绍
    
    ---------------------
    作者:紫守笨 
    来源:CSDN 
    原文:https://blog.csdn.net/program_red/article/details/55194130?utm_source=copy 
    版权声明:本文为博主原创文章,转载请附上博文链接!

    转载:https://blog.csdn.net/program_red/article/details/55194130

  • 相关阅读:
    sharepoint 2013 configure my site
    格式化xml
    斗罗大陆
    spring的beans.xml的配置
    jdom学习:读取xml文件
    java中加载xml文件方法
    struts2中IOC控制反转应用
    struts2.xml的配置与技巧
    struts2中的路径问题
    struts.xml详细配置
  • 原文地址:https://www.cnblogs.com/1a2a/p/9766548.html
Copyright © 2011-2022 走看看