在Spark的RDD中引入过lineage这一概念。指的是RDD之间的依赖。而Alluxio则使用lineage来表示文件之间的依赖。在代码层面,指的是fileID之间的依赖。
代码中的注释指出:
* A lineage tracks the dependencies imposed by a job, including the input files the job depends on,
* and the output files the job generates.
内部数据结构:
@NotThreadSafe public final class Lineage implements JournalEntryRepresentable { private final long mId; private final List<Long> mInputFiles; private final List<Long> mOutputFiles; private final Job mJob; private final long mCreationTimeMs; }
有了lineage之后,如何使用lineage来实现文件的容错呢?
在源码中,有2个关键类解决了这个问题,分别是LineageMaster和RecomputeExecutor。
有几个问题:
1. 什么时候启动LineageMaster?
按道理,在Alluxio集群启动时,LineageMaster进程就应该已经“启动”了。
查找LineageMaster的调用情况,可以发现在alluxio.master.AlluxioMaster对象中,main方法里会调用master.start();
顺着这个思路,找到startMasters方法,就可以发现如下代码:
if (LineageUtils.isLineageEnabled(MasterContext.getConf())) { mLineageMaster.start(isLeader); }
也就是说,在lineage使能的情况下,才会启动mLineageMaster.
LineageMaster的依赖:
LineageMaster的启动过程是这样的:
@Override public void start(boolean isLeader) throws IOException { super.start(isLeader); if (isLeader) { mCheckpointExecutionService = getExecutorService() .submit(new HeartbeatThread(HeartbeatContext.MASTER_CHECKPOINT_SCHEDULING, new CheckpointSchedulingExcecutor(this, mFileSystemMaster), mConfiguration.getInt(Constants.MASTER_LINEAGE_CHECKPOINT_INTERVAL_MS))); mRecomputeExecutionService = getExecutorService() .submit(new HeartbeatThread(HeartbeatContext.MASTER_FILE_RECOMPUTATION, new RecomputeExecutor(new RecomputePlanner(mLineageStore, mFileSystemMaster), mFileSystemMaster), mConfiguration.getInt(Constants.MASTER_LINEAGE_RECOMPUTE_INTERVAL_MS))); } }
先分析LineageMaster.start()除了super.start(isLeader)的代码。
根据这部分代码,可以分析出,LineageMaster只会在leader上启动,而不会在standby master上启动。
启动包含2个Executor服务的运行,运行在这2个Executor服务上的是2个心跳线程(HeartBeatThread)。
这2个心跳线程所做的事情分别是:
checkpoint执行服务所做的是Master Checkpoint Scheduling。executor是CheckpointSchedulingExcecutor。对应的Timer的interval是由“alluxio.master.lineage.checkpoint.interval.ms”所设定。
而RecomputeExecutionService所做的事情则是文件的重计算(MASTER_FILE_RECOMPUTATION).
在启动这2个服务之前,还会调用父类的start方法:
先了解下AbstractMaster类的功能:
* This is the base class for all masters, and contains common functionality. Common functionality
* mostly consists of journal operations, like initialization, journal tailing when in standby mode,
* or journal writing when the master is the leader.
分析start方法的代码,首先会起一个固定线程数的线程池。然后,如果是leader,那么就会做如下事情:
1.判断对应的日志是不是读写日志;
2.将所有的日志标记为“Complete”;
3.执行所有的日志文件中的Entry;
4.初始化日志,把所有完成日志的状态写入检查点文件。
AbstractMaster在此处会调用子类的streamToJournalCheckpoint方法。
LineageMaster实现了JournalCheckpointStreamable接口,对应的streamToJournalCheckpoint方法调用LineStore的streamToJournalCheckpoint方法,将依赖转换成日志条目写入JournalOutputStream.
2.RecomputeExecutor已经在LineageMaster中例化过了,那么它的作用是什么?
RecomputeExecutor包含一个内部类:RecomputeLauncher。
这个内部类实现了runnable接口,这意味着这个类的实例将会被一个线程执行。
该类的run方法做的事情是根据RecomputePlan类对象的依赖,循环地执行对应的任务:
a 初始化文件
b getJob().run()
总结,可以看出,lineage的实现与通常的预期较为相符。Master提供了稳定的重算服务,重算服务则由包含有RecomputeExecutor功能的HeartbeatThread实现。
Master通用的功能在AbstractMaster中提出甚至实现。从当前的线索出发,alluxio更多的方面可以挖掘出来。