zoukankan      html  css  js  c++  java
  • 分布式事务_02_2PC框架raincat源码解析-启动过程

    一、前言

    上一节已经将raincat demo工程运行起来了,这一节来分析下raincat启动过程的源码

    主要包括:

    事务协调者启动过程
    事务参与者启动过程

    二、协调者启动过程

    主要就是在启动类中通过如下代码来启动 netty 服务端

    nettyService.start()
    

    三、参与者启动过程概览

    参与者在启动过程中,主要做了如下5件事:

    (1)保存SpringContext上下文
    (2)通过加载spi,来使用用户自定义配置(序列化方式、日志存储方式)
    (3)启动Netty客户端,与txManager进行连接,并且维持心跳。
    (4)启动事务补偿日志,建表,定时补偿。
    (5)启动事务日志事件生产者。将事务补偿日志放入disruptor的环形队列中,由disruptor去异步执行。

    时序图如下:
    在这里插入图片描述

    InitServiceImpl

        @Override
        public void initialization(final TxConfig txConfig) {
            try {
                loadSpi(txConfig);
                nettyClientService.start(txConfig);
                txCompensationService.start(txConfig);
                txTransactionEventPublisher.start(txConfig.getBufferSize());
            } catch (Exception e) {
                throw new TransactionRuntimeException("tx transaction ex:{}:" + e.getMessage());
            }
            LogUtil.info(LOGGER, () -> "tx transaction init success!");
    
        }
    

    四、保存Spring上下文

    源码见 SpringBeanUtils 类

    • 设置Spring 上下文
    • 提供spring bean 的注册与获取方法。
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.raincat.core.helper;
    
    import org.springframework.context.ConfigurableApplicationContext;
    
    /**
     * SpringBeanUtils.
     * @author xiaoyu
     */
    public final class SpringBeanUtils {
    
        private static final SpringBeanUtils INSTANCE = new SpringBeanUtils();
    
        private ConfigurableApplicationContext cfgContext;
    
        private SpringBeanUtils() {
            if (INSTANCE != null) {
                throw new Error("error");
            }
        }
    
        /**
         * get SpringBeanUtils.
         * @return SpringBeanUtils
         */
        public static SpringBeanUtils getInstance() {
            return INSTANCE;
        }
    
        /**
         * acquire spring bean.
         *
         * @param type type
         * @param <T>  class
         * @return bean
         */
        public <T> T getBean(final Class<T> type) {
            return cfgContext.getBean(type);
        }
    
        /**
         * register bean in spring ioc.
         * @param beanName bean name
         * @param obj bean
         */
        public void registerBean(final String beanName, final Object obj) {
            cfgContext.getBeanFactory().registerSingleton(beanName, obj);
        }
    
        /**
         * set application context.
         * @param cfgContext application context
         */
        public void setCfgContext(final ConfigurableApplicationContext cfgContext) {
            this.cfgContext = cfgContext;
        }
    }
    

    五、加载spi

    1.主要操作

    1. 获取序列化方式
    2. 获取 TransactionRecoverRepository(事务恢复的存储方式,示例中其实现是 JdbcTransactionRecoverRepository),并设置其序列化方式。
    3. 将TransactionRecoverRepository注入Spring容器,以便在事务补偿器中使用

    InitServiceImpl

    /**
         * load spi.
         *
         * @param txConfig {@linkplain TxConfig}
         */
        private void loadSpi(final TxConfig txConfig) {
            //spi  serialize
            final SerializeProtocolEnum serializeProtocolEnum
                    = SerializeProtocolEnum.acquireSerializeProtocol(txConfig.getSerializer());
            final ServiceLoader<ObjectSerializer> objectSerializers
                    = ServiceBootstrap.loadAll(ObjectSerializer.class);
            final ObjectSerializer serializer =
                    StreamSupport.stream(objectSerializers.spliterator(), false)
                            .filter(s -> Objects.equals(s.getScheme(), serializeProtocolEnum.getSerializeProtocol()))
                            .findFirst().orElse(new KryoSerializer());
    
            //spi  RecoverRepository support
            final CompensationCacheTypeEnum compensationCacheTypeEnum
                    = CompensationCacheTypeEnum.acquireCompensationCacheType(txConfig.getCompensationCacheType());
    
            final ServiceLoader<TransactionRecoverRepository> recoverRepositories
                    = ServiceBootstrap.loadAll(TransactionRecoverRepository.class);
            final TransactionRecoverRepository repository =
                    StreamSupport.stream(recoverRepositories.spliterator(), false)
                            .filter(r -> Objects.equals(r.getScheme(), compensationCacheTypeEnum.getCompensationCacheType()))
                            .findFirst().orElse(new JdbcTransactionRecoverRepository());
            //将compensationCache实现注入到spring容器
            repository.setSerializer(serializer);
            SpringBeanUtils.getInstance().registerBean(TransactionRecoverRepository.class.getName(), repository);
        }
    
    

    2. 作用

    SPI的全名为Service Provider Interface,该机制其实就是为接口寻找服务实现类

    3. 如何使用

    当服务的提供者,提供了服务接口的一种实现之后,在jar包的META-INF/services/目录里同时创建一个以服务接口命名的文件
    该文件里就是实现该服务接口的具体实现类
    而当外部程序装配这个模块的时候,就能通过该jar包META-INF/services/里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。

    4. 优点

    基于这样一个约定就能很好的找到服务接口的实现类,而不需要再代码里制定。

    5. 示例

    服务接口命名文件

    事务补偿存储

    六、启动netty客户端

    1 主要操作

    Netty客户端启动过程中,主要做了以下几件事:

    • 定期更新TxMagager的信息

    启动一个单线程的调度线程池,定期向TxMagager发送post请求,获取Eureka服务注册表中TxMagager的信息(appName,instanceId,homepageUrl)

    • 设置 Bootstrap

    安装ChannelInitializer,并将一个ChannelHandler加入管道中

    • 连接Netty服务端

    (1)获取前面更新的TxManager的信息(appName,instanceId,homepageUrl)
    (2)向TxManager发送Post请求,获取Netty服务器连接信息(host,port)
    (3)连接到Netty服务端
    (4)通过ChannelFutureListener监听连接成功或失败的事件,若连接失败则定期重试。

    2 事件监听

    客户端除了连接服务端成功或失败事件监听,还有监听了以下事件。

    前面向Netty管道中填充了一个ChannelHandler,这样就能通过ChannelHandler监控Netty生命周期中的消息入站事件:

    channelRead
    exceptionCaught
    channelInactive
    channelActive
    userEventTriggered

    3.Netty客户端启动时序图

    在这里插入图片描述

    七、事务补偿日志启动

    1.主要操作

    事务补偿日志启动过程中,主要做了以下几件事:

    • 事务补偿日志数据库准备

    创建用来存储日志的数据表

    • 定时进行事务补偿

    开启线程池,进行定时事务补偿
    (1)获取到所有的事务补偿日志,并进行遍历
    (2)根据每个日志的事务组ID,向协调者获取到对应的事务组信息
    (3)如果整个事务组状态是提交的,而事务参与者自己不是提交的,则进行补偿。----不确定
    (4)事务补偿:
    反射执行事务参与者的事务,然后向事务协调者发送事务完成消息,最后事务参与者提交事务。

    2.时序图

    在这里插入图片描述

    八、事件生产者启动

    这里主要使用 disruptor 作为一个高性能环形缓存队列。
    补偿日志是异步的,先把日志扔到环形队列,然后由disruptor 的事件消费者进行事务日志补偿的增删改和补偿操作

    1.disruptor中的角色

    角色 描述 raincat中对应角色
    Event 事件 TxTransactionEvent
    EventFactory 事件工厂 TxTransactionEventFactory
    EventHandler 事件消费者 TxTransactionEventHandler
    EventProducer 事件生产者 TxTransactionEventPublisher
    EventTranslatorOneArg 3.0版本的Translator,可用来填充RingBuffer的事件槽 TxTransactionEventTranslator

    2.主要操作

    事件生产者启动过程是一个标准的 disruptor 启动过程,主要是设置事件工厂、事件消费者、设置线程池,然后启动disruptor

       /**
         * disruptor start.
         *
         * @param bufferSize this is disruptor buffer size.
         */
        public void start(final int bufferSize) {
            disruptor = new Disruptor<>(new TxTransactionEventFactory(), bufferSize, r -> {
                AtomicInteger index = new AtomicInteger(1);
                return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement());
            }, ProducerType.MULTI, new YieldingWaitStrategy());
            disruptor.handleEventsWith(txTransactionEventHandler);
            disruptor.setDefaultExceptionHandler(new ExceptionHandler<TxTransactionEvent>() {
                @Override
                public void handleEventException(Throwable ex, long sequence, TxTransactionEvent event) {
                    LogUtil.error(LOGGER, () -> "Disruptor handleEventException:"
                            + event.getType() + event.getTransactionRecover().toString());
                }
    
                @Override
                public void handleOnStartException(Throwable ex) {
                    LogUtil.error(LOGGER, () -> "Disruptor start exception");
                }
    
                @Override
                public void handleOnShutdownException(Throwable ex) {
                    LogUtil.error(LOGGER, () -> "Disruptor close Exception ");
                }
            });
            executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    TxTransactionThreadFactory.create("raincat-log-disruptor", false),
                    new ThreadPoolExecutor.AbortPolicy());
            disruptor.start();
        }
    

    3.事件消费者

    这里的事件消费者主要是监听事件,进行事务日志补偿的增删改和补偿操作。

    /*
     *
     *  * Licensed to the Apache Software Foundation (ASF) under one or more
     *  * contributor license agreements.  See the NOTICE file distributed with
     *  * this work for additional information regarding copyright ownership.
     *  * The ASF licenses this file to You under the Apache License, Version 2.0
     *  * (the "License"); you may not use this file except in compliance with
     *  * the License.  You may obtain a copy of the License at
     *  *
     *  *     http://www.apache.org/licenses/LICENSE-2.0
     *  *
     *  * Unless required by applicable law or agreed to in writing, software
     *  * distributed under the License is distributed on an "AS IS" BASIS,
     *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     *  * See the License for the specific language governing permissions and
     *  * limitations under the License.
     *
     */
    
    package com.raincat.core.disruptor.handler;
    
    import com.lmax.disruptor.EventHandler;
    import com.raincat.common.enums.CompensationActionEnum;
    import com.raincat.core.compensation.TxCompensationService;
    import com.raincat.core.disruptor.event.TxTransactionEvent;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * Disroptor handler.
     *
     * @author xiaoyu(Myth)
     */
    @Component
    public class TxTransactionEventHandler implements EventHandler<TxTransactionEvent> {
    
        @Autowired
        private TxCompensationService txCompensationService;
    
        @Override
        public void onEvent(final TxTransactionEvent txTransactionEvent, final long sequence, final boolean endOfBatch) {
            if (txTransactionEvent.getType() == CompensationActionEnum.SAVE.getCode()) {
                txCompensationService.save(txTransactionEvent.getTransactionRecover());
            } else if (txTransactionEvent.getType() == CompensationActionEnum.DELETE.getCode()) {
                txCompensationService.remove(txTransactionEvent.getTransactionRecover().getId());
            } else if (txTransactionEvent.getType() == CompensationActionEnum.UPDATE.getCode()) {
                txCompensationService.update(txTransactionEvent.getTransactionRecover());
            } else if (txTransactionEvent.getType() == CompensationActionEnum.COMPENSATE.getCode()) {
                txCompensationService.compensation(txTransactionEvent.getTransactionRecover());
            }
            txTransactionEvent.clear();
        }
    }
    
    

    4.时序图

    在这里插入图片描述

    九、参考资料

    1. Raincat-github地址
    2. Raincat-源码解析视频
    3. Java之SPI机制
    4. Disruptor_学习_00_资源帖
  • 相关阅读:
    在eclipse中安装 Activiti Designer插件
    Maven settings.xml配置(指定本地仓库、阿里云镜像设置)
    unity调用MMBilling_2.4.2 Android SDK.
    unity与Android相互调用
    Unity3D研究院之与Android相互传递消息
    Unity3D研究院之打开Activity与调用JAVA代码传递参数
    Objec c 字符串比较
    判断不同IOS设备
    Unity3D研究院之IOS本地消息通知LocalNotification的使用
    【Unity3D】iOS 推送实现
  • 原文地址:https://www.cnblogs.com/shirui/p/10751895.html
Copyright © 2011-2022 走看看