zoukankan      html  css  js  c++  java
  • 多线程事务控制

    spring无法对多线程进行事务控制,原因是:

    多线程底层连接数据库的时候,是使用的线程变量(TheadLocal),所以,开多少线程理论上就会建立多少个连接,每个线程有自己的连接,事务肯定不是同一个了。
    解决办法:我强制手动把每个线程的事务状态放到一个同步集合里面。然后如果有单个异常,循环回滚每个线程。

    假如service中的一个方法由以下逻辑构成:

    1.前面的是调用多线程前的操作

    2.调用多线程的操作

    假设其中任何一个与数据库的更新操作发生了异常,想要整体回滚怎么办?那么就要用到以下的方式了:

    List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
     // 在每组逻辑操作之前加入以下代码
    // 使用这种方式将事务状态都放在同一个事务里面
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
    TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态

    详细DEMO:

    TestServiceImpl:

    package com.test.impl;
    
    
    import com.test.entity.User2;
    import com.test.entity.User3;
    import com.test.mapper.User2Mapper;
    import com.test.mapper.User3Mapper;
    import com.test.service.TestBService;
    import com.test.service.TestService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.TransactionDefinition;
    import org.springframework.transaction.TransactionStatus;
    import org.springframework.transaction.annotation.Propagation;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.transaction.interceptor.TransactionAspectSupport;
    import org.springframework.transaction.support.DefaultTransactionDefinition;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    /**
     * Created by liuyachao on 2018/9/3.
     */
    @Slf4j
    @Service
    public class TestServiceImpl implements TestService {
        @Autowired
        private User2Mapper user2Mapper;
    
        @Autowired
        private User3Mapper user3Mapper;
    
        @Autowired
        private TestBService testBService;
        @Autowired
        private PlatformTransactionManager transactionManager;
    
        List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
    
        int count = 112;
    
        static int countTest = 0;
    
        @Override
        @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
        public int saveUser2(User2 user2) {
            Integer result = 0;
            try{
                result = user2Mapper.insertSelective(user2);
                //int i = 1/0;
                if(user2.getId() == 114){
                    int i = 1/0;
                }
            }catch (Exception e){
                log.error("插入异常",e);
                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                return result;
            }
            return result;
        }
    
        @Override
        public User3 getUser3List(User3 user3) {
            User3 result =user3Mapper.selectByPrimaryKey(user3.getId());
            return result;
        }
    
        @Override
        @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
        public void threadMethod(){
            User2 user1 = new User2();
            user1.setId(111);
            user1.setPassword("1");
            user1.setUsername("1");
            try{
                // 使用这种方式将事务状态都放在同一个事务里面
                DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
                TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
                transactionStatuses.add(status);
                testBService.saveUser2(user1);
            }catch (Exception e){
                e.printStackTrace();
                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            }
    
            System.out.println("main insert is over");
            try{
                for(int a=0 ;a<3;a++){
                    ThreadOperation threadOperation= new ThreadOperation();
                    Thread innerThread = new Thread(threadOperation);
                    /*innerThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                        @Override
                        public void uncaughtException(Thread t, Throwable e){
                            *//*throw new RuntimeException();
                            log.error("###内部线程发生异常");
                            e.printStackTrace();*//*
                            // 这边回滚不好使,需要用逻辑删除处理增加的数据
                            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                        }
                    });*/
                    innerThread.start();
                }
            }catch (Exception e){
                log.error("###线程异常");
                e.printStackTrace();
                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            }
        }
    
        public class ThreadOperation implements Runnable {
            @Override
            public void run() {
                try{
                    // 使用这种方式将事务状态都放在同一个事务里面
                    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
                    TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
                    transactionStatuses.add(status);
                    User2 user2 = new User2();
                    user2.setId(count++);
                    user2.setPassword("10");
                    user2.setUsername("10");
                    /**
                     * 1.这里如果用其他类的saveUser2方法,在这个线程内事务生效,其他线程不受影响
                     * 2.如果是用本类的方法,这个线程内的事务不生效,其他线程也不受影响
                     */
                    testBService.saveUser2(user2); // testBService.
                    System.out.println("thread insert is over");
                }catch (Exception e){
                    TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                    //throw new RuntimeException();
                    // 事务回滚不管用
                    /*TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                    throw new RuntimeException();*/
                    /*for (TransactionStatus transactionStatus:transactionStatuses) {
                        transactionStatus.setRollbackOnly();
                    }*/
                }
            }
        }
    
        /**
         * 多线程争夺全局资源
         * @param args
         */
        public static void main(String[] args){
            for(int a=0 ;a<100;a++){
                ThreadOperation2 threadOperation2 = new ThreadOperation2();
                Thread innerThread = new Thread(threadOperation2);
                innerThread.start();
            }
            System.out.println(countTest);
        }
    
        public static class ThreadOperation2 implements Runnable {
            @Override
            public void run() {
                countTest++;
            }
        }
    
    }

    TestService:

    package com.test.service;
    
    import com.test.entity.User2;
    import com.test.entity.User3;
    
    /**
     * Created by liuyachao on 2018/9/3.
     */
    public interface TestService {
        int saveUser2(User2 user2);
    
        User3 getUser3List(User3 user3);
    
        void threadMethod();
    }

    TestBService:

    package com.test.service;
    
    import com.test.entity.User2;
    import com.test.entity.User3;
    
    /**
     * Created by liuyachao on 2018/9/3.
     */
    public interface TestBService {
        int saveUser2 (User2 user2);
    
        User3 getUser3List(User3 user3);
    }

    TestBServiceImpl:

    package com.test.impl;
    
    
    import com.test.entity.User2;
    import com.test.entity.User3;
    import com.test.mapper.User2Mapper;
    import com.test.mapper.User3Mapper;
    import com.test.service.TestBService;
    import com.test.service.TestService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Propagation;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.transaction.interceptor.TransactionAspectSupport;
    
    /**
     * Created by liuyachao on 2018/9/3.
     */
    @Slf4j
    @Service
    public class TestBServiceImpl implements TestBService {
        @Autowired
        private User2Mapper user2Mapper;
    
        @Autowired
        private User3Mapper user3Mapper;
    
        int count = 112;
    
        static int countTest = 0;
    
        @Override
        @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
        public int saveUser2(User2 user2){
            Integer result = 0;
            /*try{*/
                result = user2Mapper.insertSelective(user2);
                if(user2.getId() == 114){
                    int i = 1/0;
                }
            /*}catch (Exception e){
                log.error("插入异常",e);
                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                return result;
            }*/
            return result;
        }
    
        @Override
        public User3 getUser3List(User3 user3) {
            User3 result =user3Mapper.selectByPrimaryKey(user3.getId());
            return result;
        }
    
    }

    User2:

    package com.test.entity;
    
    import lombok.Data;
    
    import java.io.Serializable;
    
    @Data
    public class User2 implements Serializable{
        private static final long serialVersionUID = 9085886691811169694L;
        private Integer id;
    
        private String username;
    
        private String password;
    }


    具体的mapper等方法自己可以做一个属于自己的demo来验证事务是否整体回滚:

    此demo操作均为新增数据的操作,调用多线程前、调用多线程均为新增数据。

    在多线程中的testBService.saveUser2(user2); 中saveUser2方法中模拟一个异常如:int i = 1/0;,来验证当其中一个线程满足条件下发生异常的时候,事务整体回滚,数据库中并没有新增数据

  • 相关阅读:
    Vue请求参数转换(qs的使用---对象序列化)
    Vue中使用async/await解决异步请求问题
    数据结构:set
    数据结构:map (不是数组的map方法)
    数组方法-reduce 和 ES6扩展运算符
    数据分析相关
    Hadoop委任和解除节点
    Oozie调度Sqoop报错
    Oozie调度Sqoop的两种方式
    MySql数据表直接到Hive表操作
  • 原文地址:https://www.cnblogs.com/super-chao/p/11177558.html
Copyright © 2011-2022 走看看