zoukankan      html  css  js  c++  java
  • 【分布式事务】spring cloud集成lcn解决分布式事务

    参考地址:https://blog.csdn.net/u010882691/article/details/82256587

    参考地址:https://blog.csdn.net/oyh1203/article/details/82189445

    参考地址:https://blog.csdn.net/small_to_large/article/details/77836672 Spring Cloud Ribbon和Spring Cloud Feign

    参考地址:https://blog.csdn.net/5iasp/article/details/79881691

    事务等级:https://blog.csdn.net/gududedabai/article/details/82993700

    目前,在Spring cloud 中服务之间通过restful方式调用有两种方式 
    - restTemplate+Ribbon 
    - feign

    从实践上看,采用feign的方式更优雅(feign内部也使用了ribbon做负载均衡)。

    zuul也有负载均衡的功能,它是针对外部请求做负载,那客户端ribbon的负载均衡又是怎么一回事?

    客户端ribbon的负载均衡,解决的是服务发起方(在Eureka注册的服务)对被调用的服务的负载,比如我们查询商品服务要调用显示库存和商品明细服务,通过商品服务的接口将两个服务组合,可以减少外部应用的请求,比如手机App发起一次请求即可,可以节省网络带宽,也更省电。

    ribbon是对服务之间调用做负载,是服务之间的负载均衡,zuul是可以对外部请求做负载均衡。

    参考地址:https://blog.csdn.net/jrn1012/article/details/77837658/

    因为LCN实现分布式事务的回滚,需要在服务内部 微服务之间的 负载均衡的 请求操作,故而需要在配置文件中加上ribbon的相关配置,它不与使用feign冲突!!!

    lcn使用spring boot2.0 报错解决方案:https://www.jianshu.com/p/453741e0f28f

    lcn集成到自己到自己的spring cloud项目中:https://blog.csdn.net/zhangxing52077/article/details/81587988

    参考使用步骤1:

    https://m.wang1314.com/doc/webapp/topic/20308073.html

     修改LCN ,集成spring boot2.0

    注意:LCN 4.1.0版本 目前不支持spring boot 2.x的版本,所以需要进行更改!!

    【【因为我已经更改完成,打包了jar了,jar可以在百度网盘下载,然后直接走这一步的上传第三方jar包到本地maven仓库,然后在项目中直接引用即可】】

    第一步:

    先在lcn官网【http://www.txlcn.org/】 找到GitHub 地址【https://github.com/codingapi/tx-lcn】,拷下所有的源码

    第二步:

    解压下载的zip,放置在一个目录下,用IDEA打开【注意打开父层项目】

     导入完整的jar包,然后下面就要开始更改源码中不支持spring boot 2.X的部分

    第三步:

    修改

    transaction-springcloud 项目下com.codingapi.tx.springcloud.listener包中的ServerListener.java

    源码更改为:

    package com.codingapi.tx.springcloud.listener;
    
    import com.codingapi.tx.listener.service.InitService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.ApplicationEvent;
    import org.springframework.context.ApplicationListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Component注解会自动扫描配置文件中的server.port值
     */
    @Component
    public class ServerListener implements ApplicationListener<ApplicationEvent> {
    
        private Logger logger = LoggerFactory.getLogger(ServerListener.class);
    
        private int serverPort;
    
        @Value("${server.port}")
        private String port;
    
        @Autowired
        private InitService initService;
    
        @Override
        public void onApplicationEvent(ApplicationEvent event) {
            //        logger.info("onApplicationEvent -> onApplicationEvent. "+event.getEmbeddedServletContainer());
            //        this.serverPort = event.getEmbeddedServletContainer().getPort();
            //TODO Spring boot 2.0.0没有EmbeddedServletContainerInitializedEvent 此处写死;modify by young
            this.serverPort = Integer.parseInt(port);
    
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    // 若连接不上txmanager start()方法将阻塞
                    initService.start();
                }
            });
            thread.setName("TxInit-thread");
            thread.start();
        }
    
        public int getPort() {
            return this.serverPort;
        }
    
        public void setServerPort(int serverPort) {
            this.serverPort = serverPort;
        }
    }
    View Code

     第四步:

    修改tx-manager项目下com.codingapi.tm.listener包中的ApplicationStartListener.java

    package com.codingapi.tm.listener;
    
    import com.codingapi.tm.Constants;
    import org.springframework.context.ApplicationEvent;
    import org.springframework.context.ApplicationListener;
    import org.springframework.stereotype.Component;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    
    @Component
    public class ApplicationStartListener implements ApplicationListener<ApplicationEvent> {
    
    
        @Override
        public void onApplicationEvent(ApplicationEvent event) {
            //TODO Spring boot 2.0.0没有EmbeddedServletContainerInitializedEvent 此处写死;modify by young
    //        int serverPort = event.getEmbeddedServletContainer().getPort();
            String ip = getIp();
            Constants.address = ip+":48888";//写死端口号,反正TxManager端口也是配置文件配好的(●′ω`●)
        }
    
        private String getIp(){
            String host = null;
            try {
                host = InetAddress.getLocalHost().getHostAddress();
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
            return host;
        }
    }
    View Code

    第五步:

    修改

    tx-manager项目下com.codingapi.tm.manager.service.impl包中MicroServiceImpl.java类的getState()方法

    package com.codingapi.tm.manager.service.impl;
    
    import com.codingapi.tm.Constants;
    import com.codingapi.tm.config.ConfigReader;
    import com.codingapi.tm.framework.utils.SocketManager;
    import com.codingapi.tm.manager.service.MicroService;
    import com.codingapi.tm.model.TxServer;
    import com.codingapi.tm.model.TxState;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.cloud.client.discovery.DiscoveryClient;
    import org.springframework.stereotype.Service;
    import org.springframework.web.client.RestTemplate;
    
    import java.net.InetAddress;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    /**
     * create by lorne on 2017/11/11
     */
    @Service
    public class MicroServiceImpl implements MicroService {
    
    
        @Autowired
        private RestTemplate restTemplate;
    
        @Autowired
        private ConfigReader configReader;
    
    
        @Autowired
        private DiscoveryClient discoveryClient;
    
    
    
        private boolean isIp(String ipAddress) {
            String ip = "([1-9]|[1-9]\d|1\d{2}|2[0-4]\d|25[0-5])(\.(\d|[1-9]\d|1\d{2}|2[0-4]\d|25[0-5])){3}";
            Pattern pattern = Pattern.compile(ip);
            Matcher matcher = pattern.matcher(ipAddress);
            return matcher.matches();
        }
    
    
    
        @Override
        public TxState getState() {
            TxState state = new TxState();
            String ipAddress = "";
            //TODO Spring boot 2.0.0没有discoveryClient.getLocalServiceInstance() 用InetAddress获取host;modify by young
            //String ipAddress = discoveryClient.getLocalServiceInstance().getHost();
            try {
                ipAddress = InetAddress.getLocalHost().getHostAddress();
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (!isIp(ipAddress)) {
                ipAddress = "127.0.0.1";
            }
            state.setIp(ipAddress);
            state.setPort(Constants.socketPort);
            state.setMaxConnection(SocketManager.getInstance().getMaxConnection());
            state.setNowConnection(SocketManager.getInstance().getNowConnection());
            state.setRedisSaveMaxTime(configReader.getRedisSaveMaxTime());
            state.setTransactionNettyDelayTime(configReader.getTransactionNettyDelayTime());
            state.setTransactionNettyHeartTime(configReader.getTransactionNettyHeartTime());
            state.setNotifyUrl(configReader.getCompensateNotifyUrl());
            state.setCompensate(configReader.isCompensateAuto());
            state.setCompensateTryTime(configReader.getCompensateTryTime());
            state.setCompensateMaxWaitTime(configReader.getCompensateMaxWaitTime());
            state.setSlbList(getServices());
            return state;
        }
    
    
        private List<String> getServices(){
            List<String> urls = new ArrayList<>();
            List<ServiceInstance>  serviceInstances = discoveryClient.getInstances(tmKey);
            for (ServiceInstance instanceInfo : serviceInstances) {
                urls.add(instanceInfo.getUri().toASCIIString());
            }
            return urls;
        }
    
        @Override
        public TxServer getServer() {
            List<String> urls= getServices();
            List<TxState> states = new ArrayList<>();
            for(String url:urls){
                try {
                    TxState state = restTemplate.getForObject(url + "/tx/manager/state", TxState.class);
                    states.add(state);
                } catch (Exception e) {
                }
    
            }
            if(states.size()<=1) {
                TxState state = getState();
                if (state.getMaxConnection() > state.getNowConnection()) {
                    return TxServer.format(state);
                } else {
                    return null;
                }
            }else{
                //找默认数据
                TxState state = getDefault(states,0);
                if (state == null) {
                    //没有满足的默认数据
                    return null;
                }
                return TxServer.format(state);
            }
        }
    
        private TxState getDefault(List<TxState> states, int index) {
            TxState state = states.get(index);
            if (state.getMaxConnection() == state.getNowConnection()) {
                index++;
                if (states.size() - 1 >= index) {
                    return getDefault(states, index);
                } else {
                    return null;
                }
            } else {
                return state;
            }
        }
    
    }
    View Code

    第六步:

     修改

    tx-client下的com.codingapi.tx.aop.service.impl下的TransactionServerFactoryServiceImpl.java

    修改这一截代码:

    //分布式事务已经开启,业务进行中 **/
            if (info.getTxTransactionLocal() != null || StringUtils.isNotEmpty(info.getTxGroupId())) {
                //检查socket通讯是否正常 (第一次执行时启动txRunningTransactionServer的业务处理控制,然后嵌套调用其他事务的业务方法时都并到txInServiceTransactionServer业务处理下)
                if (SocketManager.getInstance().isNetState()) {
                    if (info.getTxTransactionLocal() != null) {
                        return txDefaultTransactionServer;
                    } else {
    //                    if(transactionControl.isNoTransactionOperation() // 表示整个应用没有获取过DB连接
    //                        || info.getTransaction().readOnly()) { //无事务业务的操作
    //                        return txRunningNoTransactionServer;
    //                    }else {
    //                        return txRunningTransactionServer;
    //                    }
                        if(!transactionControl.isNoTransactionOperation()) { //TODO 有事务业务的操作 
                            return txRunningTransactionServer;
                        }else {
                            return txRunningNoTransactionServer;
                        }
                    }
                } else {
                    logger.warn("tx-manager not connected.");
                    return txDefaultTransactionServer;
                }
            }
            //分布式事务处理逻辑*结束***********/
    View Code

    第七步:

    现在都更改完成了,然后需要将所有的项目打包,注意我将本组项目中所有pom文件中所有的的4.2.0-SNAPSHOT 都更改成了4.2.0,注意  是所有

    改成了

    然后点击右侧maven插件对每一个ms挨个进行打包【打包可能报错,解决方案:https://www.cnblogs.com/sxdcgaq8080/p/9841635.html   https://www.cnblogs.com/sxdcgaq8080/p/9841701.html

    按照报错后的两个解决方案,进行打包完成后,可以看到

    这些ms都已经打包完成了

    第八步:

    最后需要将所有修改完成的打包好的jar上传到自己本地的maven仓库中 【操作地址:https://www.cnblogs.com/sxdcgaq8080/p/7583767.html 最下方可以进行第三方jar包上传到自己的maven仓库中】

    【jar可以在百度网盘下载,然后直接走这一步的上传第三方jar包到本地maven仓库,然后在项目中直接引用即可】【最新jar的使用参见https://www.cnblogs.com/sxdcgaq8080/p/7583767.html最下方】

     【这里把上传第三方jar到本地仓库的命令给出来,也就是这两个jar】

    mvn deploy:deploy-file -DgroupId=com.codingapi -DartifactId=transaction-springcloud -Dversion=4.2.0 -Dpackaging=jar -Dfile=D:documentIdeaProjectsmyTestDocumentjar	ransaction-springcloud-4.2.0.jar -Durl=http://localhost:8081/repository/myself_hosted/ -DrepositoryId=myself_hosted
    
    
    mvn deploy:deploy-file -DgroupId=com.codingapi -DartifactId=tx-plugins-db -Dversion=4.2.0 -Dpackaging=jar -Dfile=D:documentIdeaProjectsmyTestDocumentjar	x-plugins-db-4.2.0.jar -Durl=http://localhost:8081/repository/myself_hosted/ -DrepositoryId=myself_hosted

    上传完了以后的位置如下:

    注意 这里的版本都修改成了4.2.0

    所以在引用的时候,在服务中引用的版本是4.2.0

     这次启动引用了这两个jar的spring boot2.0的微服务,就可以成功了 

     

    将tx-Manager服务加入项目组,并启用【此时是eureka服务已经启动的情况下了,也就是说,微服务组引入修改以后的LCN jar依赖,已经可以成功启动的情况下】

    1.同上面的第一步一样,进入官网,进入GitHub,拷贝所有源码

    先在lcn官网【http://www.txlcn.org/】 找到GitHub 地址【https://github.com/codingapi/tx-lcn】,拷下所有的源码

    2.解压缩,取出tx-manager服务,拷贝至项目组根目录下

     3.将tx-Manager修改为本微服务组可以识别的子模块module

    导入更新

    4.更改tx-manager的application.properties,修改eureka的配置和redis的相关配置

    #######################################txmanager-start#################################################
    #服务端口
    server.port=7000
    
    #tx-manager不得修改
    spring.application.name=tx-manager
    
    spring.mvc.static-path-pattern=/**
    spring.resources.static-locations=classpath:/static/
    #######################################txmanager-end#################################################
    
    
    #zookeeper地址
    #spring.cloud.zookeeper.connect-string=127.0.0.1:2181
    #spring.cloud.zookeeper.discovery.preferIpAddress = true
    
    #eureka 地址
    eureka.client.service-url.defaultZone=http://127.0.0.1:8000/eureka/
    eureka.instance.prefer-ip-address=true
    
    #######################################redis-start#################################################
    #redis 配置文件,根据情况选择集群或者单机模式
    
    ##redis 集群环境配置
    ##redis cluster
    #spring.redis.cluster.nodes=127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003
    #spring.redis.cluster.commandTimeout=5000
    
    ##redis 单点环境配置
    #redis
    #redis主机地址
    spring.redis.host=127.0.0.1
    #redis主机端口
    spring.redis.port=6379
    #redis链接密码
    spring.redis.password=
    spring.redis.pool.maxActive=10
    spring.redis.pool.maxWait=-1
    spring.redis.pool.maxIdle=5
    spring.redis.pool.minIdle=0
    spring.redis.timeout=0
    #####################################redis-end###################################################
    
    
    
    
    #######################################LCN-start#################################################
    #业务模块与TxManager之间通讯的最大等待时间(单位:秒)
    #通讯时间是指:发起方与响应方之间完成一次的通讯时间。
    #该字段代表的是Tx-Client模块与TxManager模块之间的最大通讯时间,超过该时间未响应本次请求失败。
    tm.transaction.netty.delaytime = 5
    
    #业务模块与TxManager之间通讯的心跳时间(单位:秒)
    tm.transaction.netty.hearttime = 15
    
    #存储到redis下的数据最大保存时间(单位:秒)
    #该字段仅代表的事务模块数据的最大保存时间,补偿数据会永久保存。
    tm.redis.savemaxtime=30
    
    #socket server Socket对外服务端口
    #TxManager的LCN协议的端口
    tm.socket.port=9999
    
    #最大socket连接数
    #TxManager最大允许的建立连接数量
    tm.socket.maxconnection=100
    
    #事务自动补偿 (true:开启,false:关闭)
    # 说明:
    # 开启自动补偿以后,必须要配置 tm.compensate.notifyUrl 地址,仅当tm.compensate.notifyUrl 在请求补偿确认时返回success或者SUCCESS时,才会执行自动补偿,否则不会自动补偿。
    # 关闭自动补偿,当出现数据时也会 tm.compensate.notifyUrl 地址。
    # 当tm.compensate.notifyUrl 无效时,不影响TxManager运行,仅会影响自动补偿。
    tm.compensate.auto=false
    
    #事务补偿记录回调地址(rest api 地址,post json格式)
    #请求补偿是在开启自动补偿时才会请求的地址。请求分为两种:1.补偿决策,2.补偿结果通知,可通过通过action参数区分compensate为补偿请求、notify为补偿通知。
    #*注意当请求补偿决策时,需要补偿服务返回"SUCCESS"字符串以后才可以执行自动补偿。
    #请求补偿结果通知则只需要接受通知即可。
    #请求补偿的样例数据格式:
    #{"groupId":"TtQxTwJP","action":"compensate","json":"{"address":"133.133.5.100:8081","className":"com.example.demo.service.impl.DemoServiceImpl","currentTime":1511356150413,"data":"C5IBLWNvbS5leGFtcGxlLmRlbW8uc2VydmljZS5pbXBsLkRlbW9TZXJ2aWNlSW1wbAwSBHNhdmUbehBqYXZhLmxhbmcuT2JqZWN0GAAQARwjeg9qYXZhLmxhbmcuQ2xhc3MYABABJCo/cHVibGljIGludCBjb20uZXhhbXBsZS5kZW1vLnNlcnZpY2UuaW1wbC5EZW1vU2VydmljZUltcGwuc2F2ZSgp","groupId":"TtQxTwJP","methodStr":"public int com.example.demo.service.impl.DemoServiceImpl.save()","model":"demo1","state":0,"time":36,"txGroup":{"groupId":"TtQxTwJP","hasOver":1,"isCompensate":0,"list":[{"address":"133.133.5.100:8899","isCompensate":0,"isGroup":0,"kid":"wnlEJoSl","methodStr":"public int com.example.demo.service.impl.DemoServiceImpl.save()","model":"demo2","modelIpAddress":"133.133.5.100:8082","channelAddress":"/133.133.5.100:64153","notify":1,"uniqueKey":"bc13881a5d2ab2ace89ae5d34d608447"}],"nowTime":0,"startTime":1511356150379,"state":1},"uniqueKey":"be6eea31e382f1f0878d07cef319e4d7"}"}
    #请求补偿的返回数据样例数据格式:
    #SUCCESS
    #请求补偿结果通知的样例数据格式:
    #{"resState":true,"groupId":"TtQxTwJP","action":"notify"}
    tm.compensate.notifyUrl=http://ip:port/path
    
    #补偿失败,再次尝试间隔(秒),最大尝试次数3次,当超过3次即为补偿失败,失败的数据依旧还会存在TxManager下。
    tm.compensate.tryTime=30
    
    #各事务模块自动补偿的时间上限(毫秒)
    #指的是模块执行自动超时的最大时间,该最大时间若过段会导致事务机制异常,该时间必须要模块之间通讯的最大超过时间。
    #例如,若模块A与模块B,请求超时的最大时间是5秒,则建议改时间至少大于5秒。
    tm.compensate.maxWaitTime=5000
    #######################################LCN-end#################################################
    
    
    
    
    logging.level.com.codingapi=debug
    View Code

    5.启动启动类,即可访问tx-manager主页面

    地址:http://localhost:7000/

    ========

  • 相关阅读:
    研究人员用数据统计的方法来做文学研究
    导致大数据项目失败的4大痛点及应对策略
    导致大数据项目失败的4大痛点及应对策略
    excel怎么制作三维圆环图表
    excel怎么制作三维圆环图表
    ios开发之Swift新手入门
    ZOJ3629 Treasure Hunt IV(找规律,推公式)
    nginx源代码分析--进程间通信机制 &amp; 同步机制
    &lt;LeetCode OJ&gt; 326. Power of Three
    二进制整数的乘除运算
  • 原文地址:https://www.cnblogs.com/sxdcgaq8080/p/9776695.html
Copyright © 2011-2022 走看看