所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
在上一篇文章中,我们知道了DefaultCoordinator作为分布式事务的协调者承担了Server端的大部分功能实现。
那么,本文将阅读一下全局事务的begin请求,首先打开TCInboundHandler
public interface TCInboundHandler { // 处理全局事务的begin请求 GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext); // ... }
入口在这里,我们向下找到AbstractTCInboundHandler对它的实现
@Override public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) { GlobalBeginResponse response = new GlobalBeginResponse(); exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() { @Override public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException { try { // 内部begin方法 doGlobalBegin(request, response, rpcContext); } catch (StoreException e) { // ... } } }, request, response); return response; }
AbstractTCInboundHandler做了一层方法装饰,主要的抽象实现交付给了doGlobalBegin,我们跟进它
doGlobalBegin
doGlobalBegin方法是一个抽象类,我们再向下找到DefaultCoordinator对它的实现
private Core core = CoreFactory.get(); @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout())); }
委托给了Core来处理,我们看看CoreFactory的get方法会返回一个怎样的实现
private static class SingletonHolder { private static Core INSTANCE = new DefaultCore(); } public static final Core get() { return SingletonHolder.INSTANCE; }
默认是一个单例实现,DefaultCore。
DefaultCore
既然如此,我们就看看DefaultCore这个默认实现是怎么实现全局事务的begin方法的
@Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { // 创建一个session GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // begin这个session session.begin(); // 发送事件 eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), session.getBeginTime(), null, session.getStatus())); return session.getXid(); }
begin方法中最核心的就是创建一个GlobalSession以及session的begin。注意,这里addSessionLifecycleListener添加了一个监听器
先跟进createGlobalSession,看看一个GlobalSession是怎么创建的
public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) { GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout); return session; }
没有什么特别的就是简单构造了一个GlobalSession的实例,再跟进GlobalSession的begin方法
@Override public void begin() throws TransactionException { // 状态设置为begin this.status = GlobalStatus.Begin; this.beginTime = System.currentTimeMillis(); this.active = true; // 触发监听器 for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBegin(this); } }
除了基本的状态变化之类的数据修改,就是触发上面我们提到过的监听器。
找到AbstractSessionManager的onBegin方法,看看监听到begin事件后怎么处理
@Override public void onBegin(GlobalSession globalSession) throws TransactionException { addGlobalSession(globalSession); }
顾名思义,就是添加到一个统一管理的位置,跟进addGlobalSession
@Override public void addGlobalSession(GlobalSession session) throws TransactionException { writeSession(LogOperation.GLOBAL_ADD, session); }
这里我们注意:DefaultSessionManager继承了AbstractSessionManager并且实现了addGlobalSession方法。我们看看DefaultSessionManager做了什么扩展
protected Map<String, GlobalSession> sessionMap = new ConcurrentHashMap<>(); @Override public void addGlobalSession(GlobalSession session) throws TransactionException { super.addGlobalSession(session); sessionMap.put(session.getXid(), session); }
有着一个内存Map对象,保存了GlobalSession,KEY就是XID
再回到AbstractSessionManager的addGlobalSession方法,继续跟进writeSession方法
protected TransactionStoreManager transactionStoreManager; private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException { if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) { // ... } }
TransactionStoreManager将负责存储以及操作session,默认实现是FileBasedSessionManager,FileBasedSessionManager继承了DefaultSessionManager。我们跟进writeSession
@Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { writeSessionLock.lock(); long curFileTrxNum; try { if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) { return false; } lastModifiedTime = System.currentTimeMillis(); curFileTrxNum = FILE_TRX_NUM.incrementAndGet(); if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) { return saveHistory(); } } catch (Exception exx) { return false; } finally { writeSessionLock.unlock(); } flushDisk(curFileTrxNum, currFileChannel); return true; }
writeDataFile将会把session写入到文件当中,默认的文件地址在用户目录的sessionStore下,比如我的本机地址
在/Users/lay/sessionStore下有一个默认的root.data文件
总结
GlobalBeginRequest主要就是创建了一个GlobalSession然后进行持久化存储,默认是采用File的方式存储。当然你也可以选择db的方式等。