zoukankan      html  css  js  c++  java
  • 三、全局事务begin请求GlobalBeginRequest

    所有文章

    https://www.cnblogs.com/lay2017/p/12485081.html

    正文

    上一篇文章中,我们知道了DefaultCoordinator作为分布式事务的协调者承担了Server端的大部分功能实现。

    那么,本文将阅读一下全局事务的begin请求,首先打开TCInboundHandler

    public interface TCInboundHandler {
        // 处理全局事务的begin请求
        GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);
    
        // ...
    }

    入口在这里,我们向下找到AbstractTCInboundHandler对它的实现

    @Override
    public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
        GlobalBeginResponse response = new GlobalBeginResponse();
        exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                try {
                    // 内部begin方法
                    doGlobalBegin(request, response, rpcContext);
                } catch (StoreException e) {
                    // ...
                }
            }
        }, request, response);
        return response;
    }

    AbstractTCInboundHandler做了一层方法装饰,主要的抽象实现交付给了doGlobalBegin,我们跟进它

    doGlobalBegin

    doGlobalBegin方法是一个抽象类,我们再向下找到DefaultCoordinator对它的实现

    private Core core = CoreFactory.get();
    
    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()));
    }

    委托给了Core来处理,我们看看CoreFactory的get方法会返回一个怎样的实现

    private static class SingletonHolder {
        private static Core INSTANCE = new DefaultCore();
    }
    
    public static final Core get() {
        return SingletonHolder.INSTANCE;
    }

    默认是一个单例实现,DefaultCore。

    DefaultCore

    既然如此,我们就看看DefaultCore这个默认实现是怎么实现全局事务的begin方法的

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
        // 创建一个session
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // begin这个session
        session.begin();
    
        // 发送事件
        eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
    
        return session.getXid();
    }

    begin方法中最核心的就是创建一个GlobalSession以及session的begin。注意,这里addSessionLifecycleListener添加了一个监听器

    先跟进createGlobalSession,看看一个GlobalSession是怎么创建的

    public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
        GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);
        return session;
    }

    没有什么特别的就是简单构造了一个GlobalSession的实例,再跟进GlobalSession的begin方法

    @Override
    public void begin() throws TransactionException {
        // 状态设置为begin
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        // 触发监听器
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onBegin(this);
        }
    }

    除了基本的状态变化之类的数据修改,就是触发上面我们提到过的监听器。

    找到AbstractSessionManager的onBegin方法,看看监听到begin事件后怎么处理

    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
        addGlobalSession(globalSession);
    }

    顾名思义,就是添加到一个统一管理的位置,跟进addGlobalSession

    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        writeSession(LogOperation.GLOBAL_ADD, session);
    }

    这里我们注意:DefaultSessionManager继承了AbstractSessionManager并且实现了addGlobalSession方法。我们看看DefaultSessionManager做了什么扩展

    protected Map<String, GlobalSession> sessionMap = new ConcurrentHashMap<>();
    
    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        super.addGlobalSession(session);
        sessionMap.put(session.getXid(), session);
    }

    有着一个内存Map对象,保存了GlobalSession,KEY就是XID

    再回到AbstractSessionManager的addGlobalSession方法,继续跟进writeSession方法

    protected TransactionStoreManager transactionStoreManager;
    
    private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
        if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
            // ...
        }
    }

    TransactionStoreManager将负责存储以及操作session,默认实现是FileBasedSessionManager,FileBasedSessionManager继承了DefaultSessionManager。我们跟进writeSession

    @Override
    public boolean writeSession(LogOperation logOperation, SessionStorable session) {
        writeSessionLock.lock();
        long curFileTrxNum;
        try {
            if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
                return false;
            }
            lastModifiedTime = System.currentTimeMillis();
            curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
            if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
                return saveHistory();
            }
        } catch (Exception exx) {
            return false;
        } finally {
            writeSessionLock.unlock();
        }
        flushDisk(curFileTrxNum, currFileChannel);
        return true;
    }

    writeDataFile将会把session写入到文件当中,默认的文件地址在用户目录的sessionStore下,比如我的本机地址

    在/Users/lay/sessionStore下有一个默认的root.data文件

    总结

    GlobalBeginRequest主要就是创建了一个GlobalSession然后进行持久化存储,默认是采用File的方式存储。当然你也可以选择db的方式等。

  • 相关阅读:
    HDU 5105 Math Problem(BestCoder Round #18)
    HDU 5101 Select(BestCoder Round #17)
    HDU 5100 Chessboard(BestCoder Round #17)(找规律)
    HDU 5087 Revenge of LIS II(BestCoder Round #16)(次长上升子序列)
    HDU 5067 Harry And Dig Machine(BestCoder Round #14)
    HDU 5063 Operation the Sequence(BestCoder Round #13)
    2015ACM/ICPC亚洲区长春站-重现赛 1006 Almost Sorted Array
    2015ACM/ICPC亚洲区沈阳站-重现赛 1004 Pagodas
    Loadrunner 问题总结
    如何解决LodRunner中报错关于Error -10489
  • 原文地址:https://www.cnblogs.com/lay2017/p/12498190.html
Copyright © 2011-2022 走看看