zoukankan      html  css  js  c++  java
  • 我的物联网项目(十一) 单数据库事务也需谨慎

    单体架构模式下的数据库基本都是单数据库,所以应用层通过spring事务控制的本质其实就是数据库对事务的支持,没有数据库的事务支持,spring是无法提供事务功能的。通过spring实现事务的方式也有声明式事务编程式事务两种,不管哪一种实现起来都比较简单。像一般的业务,类型下面这种方式编程就行:

    1.配置文件

        <!-- 事务控制 -->
    	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    		<property name="dataSource" ref="dataSource" />
    	</bean>
    
        <!--  配置事务传播特性 -->
    	<tx:advice id="txAdvice" transaction-manager="transactionManager">
    	    <tx:attributes>
    	      <tx:method name="save*" propagation="REQUIRED"/>
    	      <tx:method name="update*" propagation="REQUIRED"/>
    	      <tx:method name="delete*" propagation="REQUIRED"/>
    	      <tx:method name="*" read-only="true" />
    	    </tx:attributes>
    	</tx:advice>
    
    	<!--  配置参与事务的类 -->
    	<aop:config>
    		<aop:pointcut id="interceptorPointCuts" expression="execution(* com.orange.adminweb.service.*.*(..))"/>
    		<aop:advisor pointcut-ref="interceptorPointCuts" advice-ref="txAdvice" />
    	</aop:config>

    2.com.orange.adminweb.service包下的java代码

    public void updateBinding(String userId,String merchantId,Order order) throws Exception {
         //删除用户
         userService.delete(userId);
         //删除商家
         merchantService.delete(merchantId);
         //更新订单
         orderService.update(order);
    		
    }

    像简单事务类似上面编程只需注意两个事情:

    1.上面xml配置的save*,update*,delete*表示通配符以save,update,delete开头的方法(com.orange.adminweb.service包下)都是启用事务的。

    2.以save,update,delete开头的方法(com.orange.adminweb.service包下)必须继续将异常往外抛。

    所以,很多刚开始入行的同事基本按照这种方式写代码,也没什么问题,但是最近测试有人反映,有个业务(类似上面)数据不一致,简单来说就是几张表的数据原子性不一致,我找到方法类,打开看了,场景确实和上面的稍微不太一样,以下为模拟代码。

    private void saveSubscribe(){
    		StringBuilder clientBuilder = new StringBuilder();
    		clientBuilder.append(GlobalConstant.GROUPID);
    		clientBuilder.append("@@@");
    		clientBuilder.append("ClientID_");
    		clientBuilder.append(UuidUtil.get32UUID());
    		String clientId=clientBuilder.toString();
    		MemoryPersistence persistence = new MemoryPersistence();
    		try {
    			final MqttClient sampleClient = new MqttClient(GlobalConstant.BROKER, clientId, persistence);
                final MqttConnectOptions connOpts = new MqttConnectOptions();
                System.out.println("Connecting to broker: " + GlobalConstant.BROKER);
                String sign=MacSignature.macSignature(clientId.split("@@@")[0], GlobalConstant.SECRETKEY);
                final String[] topicFilters=new String[]{GlobalConstant.TOPIC + "/#"};
                final int[]qos={1};
                connOpts.setUserName(GlobalConstant.ACESSKEY);
                connOpts.setServerURIs(new String[] { GlobalConstant.BROKER });
                connOpts.setPassword(sign.toCharArray());
                connOpts.setCleanSession(false);
                connOpts.setKeepAliveInterval(100);
                sampleClient.setCallback(new MqttCallback() {
                    public void connectionLost(Throwable throwable) {
                        log.info("mqtt connection lost");
                        throwable.printStackTrace();
                        while(!sampleClient.isConnected()){
                            try {
                                sampleClient.connect(connOpts);
                                sampleClient.subscribe(topicFilters,qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    public void messageArrived(String topic, MqttMessage mqttMessage){
                    	try {
                    		saveOrder(new String(mqttMessage.getPayload()));
    				 } catch (Exception e) {
    						e.printStackTrace();
    				 }
                    }
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        log.ifo("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                    }
                });
                sampleClient.connect(connOpts);
                sampleClient.subscribe(topicFilters,qos);
    		}catch(Exception ex){
    			ex.printStackTrace();
    		}
    	}

    操作三张表的saveOrder方法如下:

    private void saveOrder(String message) throws Exception{
         //修改用户
         userService.updateUser(......);
    
         //修改商家
         merchantService.updateMerchant(......);
         
         //下订单
         orderService.saveOrder(.......);
    }

    因为业务本身原因,当saveOrder方法里面的修改用户,修改商家,下订单任何一个方法出现异常时候,saveSubscribe方法并没有回滚数据。

    重点看saveSubscribe方法里面的代码片段:

    sampleClient.setCallback(new MqttCallback() {
                    public void connectionLost(Throwable throwable) {
                        log.info("mqtt connection lost");
                        throwable.printStackTrace();
                        while(!sampleClient.isConnected()){
                            try {
                                sampleClient.connect(connOpts);
                                sampleClient.subscribe(topicFilters,qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    public void messageArrived(String topic, MqttMessage mqttMessage){
                    	try {
                    		saveOrder(new String(mqttMessage.getPayload()));
    				 } catch (Exception e) {
    						e.printStackTrace();
    				 }
                    }
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        log.ifo("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                    }
                });

    这个里面有setCallback回调,它里面的异常是没法接着往外抛的,所以不会回滚数据,简单来说saveSubscribe方法就是一个没有事务控制的方法。

    其实这种业务场景有点类似我们之前的业务需求:

    有个AService和BService都配置了事务,AService调用了BService,BService需要记录日志,但是当BService出现异常的时候,发现没有记录日志,原因是AService和BService配置事务的时候有个参数propagation,默认都配置了REQUIRED

     <tx:method name="save*" propagation="REQUIRED"/>

    使用这种策略时BService将使用Aservice的事务,所以AService回滚将整个方法体内的任何东西都回滚了。所以解决这种业务场景就需要BService配置独立的事务,不管业务逻辑的Aservice是否有异常,BService日志都应该能够记录成功。

    所以解决上面setCallback回调不抛异常出去的问题,配置修改成saveOrder配置独立事务可以解决问题。

    <tx:method name="save*" propagation="REQUIRED"/>
    <tx:method name="update*" propagation="REQUIRED"/>
    <tx:method name="delete*" propagation="REQUIRED"/>
    <tx:method name="saveOrder" propagation="REQUIRES_NEW" rollback-for="java.lang.Exception"/>

    通过这次问题的解决回顾,说到底还是对Spring事务类型并没有引起重视,具体的业务场景应该使用不同的事务类型,而并不是一味的使用REQUIRED,最后贴下Spring的七种事务传播行为类型:

    PROPAGATION_REQUIRED--支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。

    PROPAGATION_SUPPORTS--支持当前事务,如果当前没有事务,就以非事务方式执行。

    PROPAGATION_MANDATORY--支持当前事务,如果当前没有事务,就抛出异常。

    PROPAGATION_REQUIRES_NEW--新建事务,如果当前存在事务,把当前事务挂起。

    PROPAGATION_NOT_SUPPORTED--以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。

    PROPAGATION_NEVER--以非事务方式执行,如果当前存在事务,则抛出异常。

    PROPAGATION_NESTED--如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。

  • 相关阅读:
    提升工作效率的方法
    Spark Streaming 实现思路与模块概述
    Reduce Side Join实现
    File file:/data1/hadoop/yarn/local/usercache/hp/appcache/application_* does not exi
    Caused by: java.io.IOException: Filesystem closed的处理
    linux下nproc的作用
    Spark Streaming 的一些问题
    php代码审计7审计csrf漏洞
    php代码审计6审计xss漏洞
    php代码审计5审计命令执行漏洞
  • 原文地址:https://www.cnblogs.com/dgcjiayou/p/7886715.html
Copyright © 2011-2022 走看看