zoukankan      html  css  js  c++  java
  • 三、全局事务begin请求GlobalBeginRequest

    所有文章

    https://www.cnblogs.com/lay2017/p/12485081.html

    正文

    上一篇文章中,我们知道了DefaultCoordinator作为分布式事务的协调者承担了Server端的大部分功能实现。

    那么,本文将阅读一下全局事务的begin请求,首先打开TCInboundHandler

    public interface TCInboundHandler {
        // 处理全局事务的begin请求
        GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);
    
        // ...
    }

    入口在这里,我们向下找到AbstractTCInboundHandler对它的实现

    @Override
    public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
        GlobalBeginResponse response = new GlobalBeginResponse();
        exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                try {
                    // 内部begin方法
                    doGlobalBegin(request, response, rpcContext);
                } catch (StoreException e) {
                    // ...
                }
            }
        }, request, response);
        return response;
    }

    AbstractTCInboundHandler做了一层方法装饰,主要的抽象实现交付给了doGlobalBegin,我们跟进它

    doGlobalBegin

    doGlobalBegin方法是一个抽象类,我们再向下找到DefaultCoordinator对它的实现

    private Core core = CoreFactory.get();
    
    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()));
    }

    委托给了Core来处理,我们看看CoreFactory的get方法会返回一个怎样的实现

    private static class SingletonHolder {
        private static Core INSTANCE = new DefaultCore();
    }
    
    public static final Core get() {
        return SingletonHolder.INSTANCE;
    }

    默认是一个单例实现,DefaultCore。

    DefaultCore

    既然如此,我们就看看DefaultCore这个默认实现是怎么实现全局事务的begin方法的

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
        // 创建一个session
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // begin这个session
        session.begin();
    
        // 发送事件
        eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
    
        return session.getXid();
    }

    begin方法中最核心的就是创建一个GlobalSession以及session的begin。注意,这里addSessionLifecycleListener添加了一个监听器

    先跟进createGlobalSession,看看一个GlobalSession是怎么创建的

    public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
        GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);
        return session;
    }

    没有什么特别的就是简单构造了一个GlobalSession的实例,再跟进GlobalSession的begin方法

    @Override
    public void begin() throws TransactionException {
        // 状态设置为begin
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        // 触发监听器
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onBegin(this);
        }
    }

    除了基本的状态变化之类的数据修改,就是触发上面我们提到过的监听器。

    找到AbstractSessionManager的onBegin方法,看看监听到begin事件后怎么处理

    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
        addGlobalSession(globalSession);
    }

    顾名思义,就是添加到一个统一管理的位置,跟进addGlobalSession

    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        writeSession(LogOperation.GLOBAL_ADD, session);
    }

    这里我们注意:DefaultSessionManager继承了AbstractSessionManager并且实现了addGlobalSession方法。我们看看DefaultSessionManager做了什么扩展

    protected Map<String, GlobalSession> sessionMap = new ConcurrentHashMap<>();
    
    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        super.addGlobalSession(session);
        sessionMap.put(session.getXid(), session);
    }

    有着一个内存Map对象,保存了GlobalSession,KEY就是XID

    再回到AbstractSessionManager的addGlobalSession方法,继续跟进writeSession方法

    protected TransactionStoreManager transactionStoreManager;
    
    private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
        if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
            // ...
        }
    }

    TransactionStoreManager将负责存储以及操作session,默认实现是FileBasedSessionManager,FileBasedSessionManager继承了DefaultSessionManager。我们跟进writeSession

    @Override
    public boolean writeSession(LogOperation logOperation, SessionStorable session) {
        writeSessionLock.lock();
        long curFileTrxNum;
        try {
            if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
                return false;
            }
            lastModifiedTime = System.currentTimeMillis();
            curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
            if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
                return saveHistory();
            }
        } catch (Exception exx) {
            return false;
        } finally {
            writeSessionLock.unlock();
        }
        flushDisk(curFileTrxNum, currFileChannel);
        return true;
    }

    writeDataFile将会把session写入到文件当中,默认的文件地址在用户目录的sessionStore下,比如我的本机地址

    在/Users/lay/sessionStore下有一个默认的root.data文件

    总结

    GlobalBeginRequest主要就是创建了一个GlobalSession然后进行持久化存储,默认是采用File的方式存储。当然你也可以选择db的方式等。

  • 相关阅读:
    Compression algorithm (deflate)
    tcpip数据包编码解析(chunk and gzip)_space of Jialy_百度空间
    What's the difference between the "gzip" and "deflate" HTTP 1.1 encodings?
    gzip压缩算法: gzip 所使用压缩算法的基本原理
    Decompressing a GZip Stream with Zlib
    Frequently Asked Questions about zlib
    how to decompress gzip stream with zlib
    自己动手写web服务器四(web服务器是如何通过压缩数据,web服务器的gzip模块的实现)
    What's the difference between the "gzip" and "deflate" HTTP 1.1 encodings?
    C语言抓http gzip包并解压 失败 C/C++ ChinaUnix.net
  • 原文地址:https://www.cnblogs.com/lay2017/p/12498190.html
Copyright © 2011-2022 走看看