zoukankan      html  css  js  c++  java
  • SQLAlchemy 嵌套事务的解决方案

     sqlachemy 是python的orm框架,在使用一段时间后,我们通常会出现事务嵌套的情况,看到很多人写代码的时候,居然是session到处传递,这无疑是加大了代码之间的耦合度。
        案例:
        def save(session):
            # TODO
    
        def update(session):
            # TODO
    
        def service():
            session = getSession();
            try:
                save(session);
                update(session);
                session.commit();
            except Exception as e:
                session.rollback();
            finally:
                if not session:
                    session.close();
    
        假设save和update是同一个事务,但是上述的实践缺强制了save和update的 session相耦合来达成,好的实践应该是:save和update无任何关系,只是在实现业务逻辑时,组合到一个事务,确保事务性即可,即:
    
        def service():
            try:
                save();
                update();
            except Exception as e:
                # TODO 
        因此如何解决这个问题是本文需要阐述的主题。
    

    解决方案是:

    ACID是事务的四个基本特征,通常我们的理解事务是一个操作单元,要么一起成功要么一起失败(原子性);通过一个例子来直接说明如何解决的。
        场景:注册用户(添加一个用户,会未用户送10个积分)

    创建表

    create table user(
            id int not null auto_increment,
            name varchar(255) not null default '' comment '用户名',
            created_at datetime not null default current_timestamp comment '创建时间',
            updated_at datetime not null default current_timestamp comment '更新时间',
            primary key (`id`)
        )engine innodb charset=utf8 comment '用户表';  
    
        create table user_credits(
            id int not null auto_increment,
            user_id int not null default 0 comment '用户ID',
            user_name varchar(255) not null default '' comment '用户名',
            score int not null default 0 comment '积分',
            created_at datetime not null default current_timestamp comment '创建时间',
            updated_at datetime not null default current_timestamp comment '更新时间',
            primary key (`id`),
            unique key `uk_user_id` (`user_id`)
        )engine innodb charset=utf8 comment '用户积分表';

    sqlalchemy 实现

    # -*- coding:utf-8 -*-
    
    import datetime
    from sqlalchemy import create_engine, Column, Integer, String, DateTime
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker, scoped_session
    
    engine = create_engine("mysql+pymysql://root:root@localhost:3306/csdn", echo=True)
    
    # 必须使用scoped_session,域session可以将session进行共享
    DBSession = scoped_session(sessionmaker(bind=engine))
    
    BaseModel = declarative_base()
    
    
    # ----------- Relation Model Object---------------- #
    
    class User(BaseModel):
    
        __tablename__ = "user"
    
        id = Column(Integer, primary_key=True)
        name = Column(String)
        created_at = Column(DateTime, default=datetime.datetime.now)
        updated_at = Column(DateTime, default=datetime.datetime.now)
    
    
    class UserCredits(BaseModel):
    
        __tablename__ = "user_credits"
    
        id = Column(Integer, primary_key=True)
        user_id = Column(Integer)
        user_name = Column(String)
        score = Column(Integer)
        created_at = Column(DateTime, default=datetime.datetime.now)
        updated_at = Column(DateTime, default=datetime.datetime.now)
    
    # ----------- Service implements---------------- #
    
    
    def add_user(user):
        " 添加用户 "
        session = DBSession()
        try:
            session.add(user)
            session.commit()
        except Exception as e:
            session.rollback()
            print("AddUser: ======={}=======".format(e))
        finally:
            if not session:
                session.close()
    
    
    def add_user_credits(userCredits, interrupt=True):
        " 添加用户积分记录 "
        session = DBSession()
        try:
            if interrupt:
                raise Exception("--- interrupt ---")
    
            session.add(userCredits)
            session.commit()
        except Exception as e:
            session.rollback()
            print("AddUserCredits: ======={}=======".format(e))
        finally:
            if not session:
                session.close()
    
    
    def regist_user():
    
        session = DBSession()
        try:
            # 开启子事务
            session.begin(subtransactions=True)
    
            # TODO Service
            user = User(name='wangzhiping')
            add_user(user)
            add_user_credits(UserCredits(
                user_id=user.id,
                user_name=user.name,
                score=10
            ), False)
    
            session.commit()
        except Exception as e:
            session.rollback()
            print("AddUserCredits: ======={}=======".format(e))
        finally:
            if not session:
                session.close()
    
    # ---------- exec -----------
    regist_user()
     1,设置session时,需要指定为scoped_session,目的是session可以共享(ThreadLocal);
        2,session.begin(subtransactions=True) 开启子事务管理;
        这是实际上regist_user是在同一个线程中的session,这是add_user,add_user_credits实际上session是同一个,所以可以实现。其实这个可以更进一步扩展,把事务隔离级别,传播属性,这里不做介绍
    
    ---------------------
    作者:紫守笨 
    来源:CSDN 
    原文:https://blog.csdn.net/program_red/article/details/55194130?utm_source=copy 
    版权声明:本文为博主原创文章,转载请附上博文链接!

    转载:https://blog.csdn.net/program_red/article/details/55194130

  • 相关阅读:
    leetcode Super Ugly Number
    leetcode Find Median from Data Stream
    leetcode Remove Invalid Parentheses
    leetcode Range Sum Query
    leetcode Range Sum Query
    leetcode Minimum Height Trees
    hdu 3836 Equivalent Sets
    hdu 1269 迷宫城堡
    hud 2586 How far away ?
    poj 1330 Nearest Common Ancestors
  • 原文地址:https://www.cnblogs.com/1a2a/p/9766548.html
Copyright © 2011-2022 走看看