zoukankan      html  css  js  c++  java
  • spring boot:shardingsphere+druid整合seata分布式事务(spring boot 2.3.3)

    一,shardingshpere为什么要整合seata?

    分库分表是数据库扩展中最常用的处理方法,

    shardingshpere作为使用最广泛的分表中间件,

    如果不支持分布式事务,则它的数据一致性就会打很大的折扣了

    shardingsphere实现了对分布式事务seata的支持,

    对于数据有高要求的应用来说,当然需要整合seata

    说明:刘宏缔的架构森林是一个专注架构的博客,地址:https://www.cnblogs.com/architectforest

             对应的源码可以访问这里获取: https://github.com/liuhongdi/

    说明:作者:刘宏缔 邮箱: 371125307@qq.com

    二,演示项目的相关信息

    1,项目地址:

    https://github.com/liuhongdi/shardingseata3

    2,项目功能说明:

            演示了shardingsphere整合seata,

           数据源使用了druid

    3,项目结构:如图:

    三,配置文件说明

    1,pom.xml

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
               <!--exclude log-->
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-logging</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--mybatis begin-->
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>2.1.3</version>
            </dependency>
            <!--druid begin-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.1.23</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-log4j2</artifactId>
            </dependency>
            <dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>3.4.2</version>
            </dependency>
            <!--mysql begin-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <scope>runtime</scope>
            </dependency>
            <!--pagehelper begin-->
            <dependency>
                <groupId>com.github.pagehelper</groupId>
                <artifactId>pagehelper-spring-boot-starter</artifactId>
                <version>1.3.0</version>
            </dependency>
            <!--thymeleaf begin-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-thymeleaf</artifactId>
            </dependency>
            <!--seata begin-->
            <dependency>
                <groupId>io.seata</groupId>
                <artifactId>seata-all</artifactId>
                <version>1.3.0</version>
            </dependency>
            <!--shardingsphere begin-->
            <dependency>
                <groupId>org.apache.shardingsphere</groupId>
                <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
                <version>4.1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.shardingsphere</groupId>
                <artifactId>sharding-jdbc-spring-namespace</artifactId>
                <version>4.1.1</version>
            </dependency>
                    <dependency>
                        <groupId>org.apache.shardingsphere</groupId>
                        <artifactId>sharding-transaction-base-seata-at</artifactId>
                        <version>4.1.1</version>
                    </dependency>
            <!--aop begin-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-aop</artifactId>
            </dependency>

    2,application.properties

    #error
    server.error.include-stacktrace=always
    #error
    logging.level.org.springframework.web=trace
    #thymeleaf
    spring.thymeleaf.cache=false
    spring.thymeleaf.encoding=UTF-8
    spring.thymeleaf.mode=HTML
    spring.thymeleaf.prefix=classpath:/templates/
    spring.thymeleaf.suffix=.html
    
    #shardingsphere
    spring.shardingsphere.datasource.names=store,saleorder01,saleorder02
    
    spring.shardingsphere.datasource.store.type=com.alibaba.druid.pool.DruidDataSource
    spring.shardingsphere.datasource.store.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.shardingsphere.datasource.store.url=jdbc:mysql://127.0.0.1:3306/store?characterEncoding=utf-8
    spring.shardingsphere.datasource.store.username=root
    spring.shardingsphere.datasource.store.password=lhddemo
    spring.shardingsphere.datasource.store.filters=stat,wall,log4j2
    spring.shardingsphere.datasource.store.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
    
    spring.shardingsphere.datasource.saleorder01.type=com.alibaba.druid.pool.DruidDataSource
    spring.shardingsphere.datasource.saleorder01.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.shardingsphere.datasource.saleorder01.url=jdbc:mysql://127.0.0.1:3306/saleorder01?characterEncoding=utf-8
    spring.shardingsphere.datasource.saleorder01.username=root
    spring.shardingsphere.datasource.saleorder01.password=lhddemo
    spring.shardingsphere.datasource.saleorder01.filters=stat,wall,log4j2
    spring.shardingsphere.datasource.saleorder01.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
    
    spring.shardingsphere.datasource.saleorder02.type=com.alibaba.druid.pool.DruidDataSource
    spring.shardingsphere.datasource.saleorder02.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.shardingsphere.datasource.saleorder02.url=jdbc:mysql://127.0.0.1:3306/saleorder02?characterEncoding=utf-8
    spring.shardingsphere.datasource.saleorder02.username=root
    spring.shardingsphere.datasource.saleorder02.password=lhddemo
    spring.shardingsphere.datasource.saleorder02.filters=stat,wall,log4j2
    spring.shardingsphere.datasource.saleorder02.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
    
    spring.shardingsphere.sharding.default-data-source-name=store
    spring.shardingsphere.sharding.default-database-strategy.standard.sharding-column=orderId
    spring.shardingsphere.sharding.default-database-strategy.standard.precise-algorithm-class-name=com.shardingseata3.demo.algorithm.DatabasePreciseShardingAlgorithm
    
    spring.shardingsphere.sharding.binding-tables=t_order
    spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=saleorder0$->{1..1}.t_order_$->{1..2},saleorder0$->{2..2}.t_order_$->{3..4}
    spring.shardingsphere.sharding.tables.t_order.table-strategy.standard.sharding-column=orderId
    spring.shardingsphere.sharding.tables.t_order.table-strategy.standard.precise-algorithm-class-name=com.shardingseata3.demo.algorithm.OrderTablePreciseShardingAlgorithm
    spring.shardingsphere.props.sql.show=true
    
    #mybatis
    mybatis.mapper-locations=classpath:/mapper/*Mapper.xml
    mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl

    3,数据库:

     共3个:salesorder01/saleorder02/store

    建表sql:

    我们创建两个库:saleorder01,saleorder02

    然后在各个库内各创建两个数据表:

    saleorder01库包括t_order_1,t_order_2

    saleorder02库包括t_order_3,t_order_4

    CREATE TABLE `t_order_4` (
     `orderId` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
     `goodsName` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT 'name',
     PRIMARY KEY (`orderId`)
    ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='order4'

    store库的goods表:

    CREATE TABLE `goods` (
     `goodsId` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
     `goodsName` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT 'name',
     `subject` varchar(200) NOT NULL DEFAULT '' COMMENT '标题',
     `price` decimal(15,2) NOT NULL DEFAULT '0.00' COMMENT '价格',
     `stock` int(11) NOT NULL DEFAULT '0' COMMENT 'stock',
     PRIMARY KEY (`goodsId`)
    ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='商品表'

    插入一条示例数据:

    INSERT INTO `goods` (`goodsId`, `goodsName`, `subject`, `price`, `stock`) VALUES
    (3, '100分电动牙刷', '好用到让你爱上刷牙', '59.00', 96);

    4,file.conf

    transport {
      # tcp udt unix-domain-socket
      type = "TCP"
      #NIO NATIVE
      server = "NIO"
      #enable heartbeat
      heartbeat = true
      #thread factory for netty
      thread-factory {
        boss-thread-prefix = "NettyBoss"
        worker-thread-prefix = "NettyServerNIOWorker"
        server-executor-thread-prefix = "NettyServerBizHandler"
        share-boss-worker = false
        client-selector-thread-prefix = "NettyClientSelector"
        client-selector-thread-size = 1
        client-worker-thread-prefix = "NettyClientWorkerThread"
        # netty boss thread size,will not be used for UDT
        boss-thread-size = 1
        #auto default pin or 8
        worker-thread-size = 8
      }
    }
    service {
      vgroupMapping.my_test_tx_group = "default"
      #only support when registry.type=file, please don't set multiple addresses
      default.grouplist = "127.0.0.1:8091"
      #degrade, current not support
      enableDegrade = false
      #disable seata
      disableGlobalTransaction = false
    }
    
    client {
      async.commit.buffer.limit = 10000
      lock {
        retry.internal = 10
        retry.times = 30
      }
    }

    5,registry.conf

    registry {
      # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
      type = "file"
    
      nacos {
        application = "seata-server"
        serverAddr = "127.0.0.1:8848"
        group = "SEATA_GROUP"
        namespace = ""
        cluster = "default"
        username = ""
        password = ""
      }
      eureka {
        serviceUrl = "http://localhost:8761/eureka"
        application = "default"
        weight = "1"
      }
      redis {
        serverAddr = "localhost:6379"
        db = 0
        password = ""
        cluster = "default"
        timeout = 0
      }
      zk {
        cluster = "default"
        serverAddr = "127.0.0.1:2181"
        sessionTimeout = 6000
        connectTimeout = 2000
        username = ""
        password = ""
      }
      consul {
        cluster = "default"
        serverAddr = "127.0.0.1:8500"
      }
      etcd3 {
        cluster = "default"
        serverAddr = "http://localhost:2379"
      }
      sofa {
        serverAddr = "127.0.0.1:9603"
        application = "default"
        region = "DEFAULT_ZONE"
        datacenter = "DefaultDataCenter"
        cluster = "default"
        group = "SEATA_GROUP"
        addressWaitTime = "3000"
      }
      file {
        name = "file.conf"
      }
    }
    
    config {
      # file、nacos 、apollo、zk、consul、etcd3
      type = "file"
    
      nacos {
        serverAddr = "127.0.0.1:8848"
        namespace = ""
        group = "SEATA_GROUP"
        username = ""
        password = ""
      }
      consul {
        serverAddr = "127.0.0.1:8500"
      }
      apollo {
        appId = "seata-server"
        apolloMeta = "http://192.168.1.204:8801"
        namespace = "application"
      }
      zk {
        serverAddr = "127.0.0.1:2181"
        sessionTimeout = 6000
        connectTimeout = 2000
        username = ""
        password = ""
      }
      etcd3 {
        serverAddr = "http://localhost:2379"
      }
      file {
        name = "file.conf"
      }
    }

    6,seata.conf

    client {
        application.id = my_test_tx
        transaction.service.group = my_test_tx_group
    }

    四,java代码说明

    1,DruidConfig.java

    @Configuration
    public class DruidConfig {
        /**
         * 配置Druid监控
         * 后台管理Servlet
         * @return
         */
        @Bean
        public ServletRegistrationBean statViewServlet(){
            ServletRegistrationBean bean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*");
            Map<String,String> initParams = new HashMap<>();//这是配置的druid监控的登录密码
            initParams.put("loginUsername","root");
            initParams.put("loginPassword","root");
            //默认就是允许所有访问
            initParams.put("allow","");
            initParams.put("deny","192.168.15.21");
            //黑名单的IP
            bean.setInitParameters(initParams);
            return bean;
        }
    
        /**
         * 配置web监控的filter
         */
        @Bean
        public FilterRegistrationBean webStatFilter(){
            FilterRegistrationBean bean = new FilterRegistrationBean();
            bean.setFilter(new WebStatFilter());
            Map<String,String> initParams = new HashMap<>();
            initParams.put("exclusions","/static/*,*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");//过滤掉需要监控的文件
            bean.setInitParameters(initParams);
            bean.setUrlPatterns(Arrays.asList("/*"));
            return  bean;
        }
    }

    配置druid

    2,SeataConfig.java

    @Aspect
    @Configuration
    public class SeataConfig {
        private static final String AOP_POINTCUT_EXPRESSION = "@annotation(io.seata.spring.annotation.GlobalTransactional)";
    
        @Bean
        public GlobalTransactionalInterceptor globalTransactionalInterceptor(){
            GlobalTransactionalInterceptor globalTransactionalInterceptor = new GlobalTransactionalInterceptor(null);
            return globalTransactionalInterceptor;
        }
    
        @Bean
        public Advisor seataAdviceAdvisor() {
            AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
            pointcut.setExpression(AOP_POINTCUT_EXPRESSION);
            return new DefaultPointcutAdvisor(pointcut,globalTransactionalInterceptor());
        }
    }

    配置seata

    3,SeataFilter.java

    @Component
    public class SeataFilter implements Filter {
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
        }
        @Override
        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
            HttpServletRequest req = (HttpServletRequest) servletRequest;
            String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
            System.out.println("xid:"+xid);
            boolean isBind = false;
            if (StringUtils.isNotBlank(xid)) {
                //如果xid不为空,则RootContext需要绑定xid,供给seata识别这是同一个分布式事务
                RootContext.bind(xid);
                isBind = true;
            }
            try {
                filterChain.doFilter(servletRequest, servletResponse);
            } finally {
                if (isBind) {
                    RootContext.unbind();
                }
            }
        }
        @Override
        public void destroy() {
        }
    }

    接收传递的全局事务标志xid

    4,GoodsController.java

    @RestController
    @RequestMapping("/goods")
    public class GoodsController {
    
        private static final String SUCCESS = "SUCCESS";
        private static final String FAIL = "FAIL";
    
        @Resource
        private GoodsMapper goodsMapper;
    
        //更新商品库存 参数:商品id
        @RequestMapping("/goodsstock/{goodsId}/{count}")
        @ResponseBody
        public String goodsStock(@PathVariable Long goodsId,
                                @PathVariable int count) {
    
            TransactionTypeHolder.set(TransactionType.BASE);
             int res = goodsMapper.updateGoodsStock(goodsId,count);
             System.out.println("res:"+res);
    
             if (res>0) {
                 return SUCCESS;
             } else {
                 return FAIL;
             }
        }
        //商品详情 参数:商品id
        @GetMapping("/goodsinfo")
        @ResponseBody
        public Goods goodsInfo(@RequestParam(value="goodsid",required = true,defaultValue = "0") Long goodsId) {
            Goods goods = goodsMapper.selectOneGoods(goodsId);
            return goods;
        }
    }

    5,HomeController.java

    @Controller
    @RequestMapping("/home")
    public class HomeController {
    
        private static final String SUCCESS = "SUCCESS";
        private static final String FAIL = "FAIL";
    
        @Resource
        private OrderShardingMapper orderShardingMapper;
    
        @Resource
        private GoodsMapper goodsMapper;
    
        //订单列表,列出分库分表的数据
        @GetMapping("/orderlist")
        public String list(Model model, @RequestParam(value="currentPage",required = false,defaultValue = "1") Integer currentPage){
            PageHelper.startPage(currentPage, 5);
            List<OrderSharding> orderList = orderShardingMapper.selectAllOrder();
            model.addAttribute("orderlist",orderList);
            PageInfo<OrderSharding> pageInfo = new PageInfo<>(orderList);
            model.addAttribute("pageInfo", pageInfo);
            System.out.println("------------------------size:"+orderList.size());
            return "order/list";
        }
    
        //添加一个订单,访问一个数据库和分库分表的两个数据库
        @GetMapping("/addorder")
        @ResponseBody
        @GlobalTransactional(timeoutMills = 300000,rollbackFor = Exception.class)
        public String addOrder(@RequestParam(value="orderid",required = true,defaultValue = "0") Long orderId,
                @RequestParam(value="isfail",required = true,defaultValue = "0") int isFail
                               )  throws SQLException, IOException {
            String goodsId = "3";
            String goodsNum = "1";
            String goodsName = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date());
            OrderSharding orderOne = new OrderSharding();
            orderOne.setOrderId(orderId);
            orderOne.setGoodsName(goodsName);
            TransactionTypeHolder.set(TransactionType.BASE);
            int resIns = orderShardingMapper.insertOneOrder(orderOne);
            System.out.println("orderId:"+orderOne.getOrderId());
            TransactionTypeHolder.set(TransactionType.BASE);
            int count = -1;
            int res = goodsMapper.updateGoodsStock(Long.parseLong(goodsId),count);
            System.out.println("res:"+res);
            if (isFail == 1) {
                int divide = 0;
                int resul = 100 / divide;
            }
            if (res>0) {
                return SUCCESS;
            } else {
                return FAIL;
            }
        }
    
    
        //添加一个订单,访问一个数据库和分库分表的两个数据库,rest方式
        @GetMapping("/addorderrest")
        @ResponseBody
        @GlobalTransactional(timeoutMills = 300000,rollbackFor = Exception.class)
        public String addOrderrest(@RequestParam(value="orderid",required = true,defaultValue = "0") Long orderId,
                               @RequestParam(value="isfail",required = true,defaultValue = "0") int isFail
        )  throws SQLException, IOException {
            String goodsId = "3";
            String goodsNum = "1";
            String goodsName = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date());
            OrderSharding orderOne = new OrderSharding();
            orderOne.setOrderId(orderId);
            orderOne.setGoodsName(goodsName);
            TransactionTypeHolder.set(TransactionType.BASE);
            int resIns = orderShardingMapper.insertOneOrder(orderOne);
            System.out.println("orderId:"+orderOne.getOrderId());
            RestTemplate restTemplate = new RestTemplate();
            String xid = RootContext.getXID();
            System.out.println("xid before send:"+xid);
            HttpHeaders headers = new HttpHeaders();
            headers.add(RootContext.KEY_XID, xid);
            String goodsUPNum = "-1";
            String urlUpStock = "http://127.0.0.1:8080/goods/goodsstock/"+goodsId+"/"+goodsUPNum+"/";
            String resultUp = restTemplate.postForObject(urlUpStock,new HttpEntity<String>(headers),String.class);
            if (!SUCCESS.equals(resultUp)) {
                throw new RuntimeException();
            }
            if (isFail == 1) {
                int divide = 0;
                int resul = 100 / divide;
            }
            return SUCCESS;
        }
    }

    6,DatabasePreciseShardingAlgorithm.java

    @Component
    public class DatabasePreciseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
        @Override
        public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
            System.out.println("------------------select database name");
            Long curValue = shardingValue.getValue();
            String curBase = "";
            if (curValue > 0 && curValue<=200) {
                curBase = "saleorder01";
            } else {
                curBase = "saleorder02";
            }
            return curBase;
        }
    }

    分库算法

    7,OrderTablePreciseShardingAlgorithm.java

    @Component
    public class OrderTablePreciseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
        @Override
        public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
            Long curValue = shardingValue.getValue();
            String curTable = "";
            if (curValue > 0 && curValue<=100) {
                curTable = "t_order_1";
            } else if (curValue > 100 && curValue<=200) {
                curTable = "t_order_2";
            } else if (curValue > 200 && curValue<=300) {
                curTable = "t_order_3";
            } else {
                curTable = "t_order_4";
            }
            return curTable;
        }
    }

    分表算法

    8,其他非关键代码可以从github查看

    五,测试效果

    1,查看shardingjdbc查询数据库的效果:

    访问:

    http://127.0.0.1:8080/home/orderlist

    返回:

    可以确定shardingsphere工作正常

    2,测试同一个数据源(shardingsphere)不同数据库之间的分布式事务

    store库,goods表:设置goodsid为3的商品stock值为100

     store库,undolog表的下一个自增值:50

     saleorder02库,undo_log表的下一个自增值:39

     访问url:

    http://127.0.0.1:8080/home/addorder?orderid=400

    返回:

    SUCCESS

    查看数据库:saleorder02库t_order_4表

    查看数据库:store库goods表

     store库undo_log的自增值:

    saleorder02库undo_log的自增值:

     可见事务是生效的

    测试发生异常时事务的回滚:

    访问:

    http://127.0.0.1:8080/home/addorder?orderid=401&isfail=1

    返回:发生了除0错:

     查看数据库:saleorder02:

    t_order_4表:

    找不到orderid为401的记录

    查看undo_log表的下一个自增值:

    查看数据库:store:

    goods表中查看stock:

     未改变

    查看undo_log表的下一个自增值:

    可见事务已生效:

    查看控制台的输出:

     可以看到:

    Begin new global transaction。。。

    Branch Rollbacked result: PhaseTwo_Rollbacked 。。。

    Branch Rollbacked result: PhaseTwo_Rollbacked 。。。

    有两次rollback,因为提交到了两个库

    3,测试通过resttemplate访问url的分布式事务

    访问不同url的分布式事务涉及到xid的传递:

    可以用以下两个url进行测试,效果同上,不再一一贴出:

    http://127.0.0.1:8080/home/addorderrest?orderid=402
    http://127.0.0.1:8080/home/addorderrest?orderid=403&isfail=1

    4,访问druid的ui,可以看到连接到了3个数据库:

    六,查看spring boot的版本:

      .   ____          _            __ _ _
     /\ / ___'_ __ _ _(_)_ __  __ _    
    ( ( )\___ | '_ | '_| | '_ / _` |    
     \/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::        (v2.3.3.RELEASE)
  • 相关阅读:
    Matlab画图-非常具体,非常全面
    PostgreSQL代码分析,查询优化部分,pull_ands()和pull_ors()
    Windows内核
    [WebGL入门]十,矩阵计算和外部库
    HOG特征-理解篇
    hdu 5035 概率论
    Hibernate对象持久化框架
    Thinkpad X200 屏幕备案
    64地点 Windows 8/7 根据系统 32地点PLSQL 耦合 64 地点 Oracle 11g
    阐述php(四) 流量控制
  • 原文地址:https://www.cnblogs.com/architectforest/p/13639203.html
Copyright © 2011-2022 走看看