前言
通过spark获取hbase数据的过程中,遇到了InputFormat。文章主要围绕InputFormat介绍。会牵扯到spark,mapreduce,hbase相关内容
InputFormat
InputFormat是mapreduce提供的数据源格式接口,也就是说,通过该接口可以支持读取各种各样的数据源(文件系统,数据库等),从而进行mapreduce计算。
在有这个概念的基础上分析InputFormat的源码。
public abstract class InputFormat<K, V> { /* * 获取数据的分区信息,每个分区包装成InputSplit,返回一个List<InputSplit> * 注意这里的分区是逻辑分区 * 比如一个文件,一共有100个字符,假如安装每个分区10个字符,那么一共有10个分区 */ public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; /* * 根据分区信息,获取RecordReader,RecordReader其实就是一个加强版的迭代器,只不过返回的是kv格式的数据 * 可以看到,这里只有一个InputSplit,也就是只有一个分区,也就是说是分区内部的迭代 */ public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
这样大概就理解了这个接口的定位,一个是how to defined partition,一个是how to get data from partition,下面再实例化到spark的应用场景。
TableInputFormat
Spark篇
通过spark的mapreduce接口取hbase数据一定会用到下面的代码
//hbaseConfig HBaseConfiguration //TableInputFormat InputFormat的子类 表示输入数据源 //ImmutableBytesWritable 数据源的key //Result 数据源的value //如果写过mapreduce任务,这个方法和mapreduce的启动配置类似,只不过输出都是rdd,所以就不用声明了 val hBaseRDD = sc.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
那这个是怎么个流程呢
首先,SparkContext会创建一个RDD
new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
然后就over了…
这其实是spark的调度机制,只有遇到action操作的时候才会真正提交一个job,这里就不详述了。跳过这一段,直接看NewHadoopRDD中的方法,最关键的两个方法,compute()和getPartitions(),是和InputFormat的两个方法一一对应的。
·getPartitions()
override def getPartitions: Array[Partition] = { //实例化InputFormat对象 也就是我们传入的TableInputFormat(可能是其它InputFormat,这里只是举个例子) val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => configurable.setConf(_conf) case _ => } val jobContext = new JobContextImpl(_conf, jobId) //拿到所有split val rawSplits = inputFormat.getSplits(jobContext).toArray //拿到总分区数,并转换为spark的套路 val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { //把每个split封装成partition result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } result }
·compute()
由于代码太多会引起不适,贴一点关键代码
//一样的,实例化InputFormat对象 private val format = inputFormatClass.newInstance format match { case configurable: Configurable => configurable.setConf(conf) case _ => } //满足mapreduce的一切要求... private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) private var finished = false private var reader = try { //拿到关键的RecordReader val _reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) _reader } catch { case e: IOException if ignoreCorruptFiles => logWarning( s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", e) finished = true null } //喜闻乐见的hasNext和next override def hasNext: Boolean = { if (!finished && !havePair) { try { finished = !reader.nextKeyValue } catch { case e: IOException if ignoreCorruptFiles => logWarning( s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", e) finished = true } if (finished) { // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the // resources early. close() } havePair = !finished } !finished } override def next(): (K, V) = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false if (!finished) { inputMetrics.incRecordsRead(1) } if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { updateBytesRead() } (reader.getCurrentKey, reader.getCurrentValue) }
省略了无数代码,大概就是把RecordReader封装成Iterator(这坑爹的mapreduce不能直接拿Iterator作接口吗)
Spark做的大概就是这样事情,剩下的是hbase做的
Hbase篇
TableInputFormat是hbase提供的接口,用来兼容mapreduce,没想到被spark这个浓眉大眼的截去了。
直奔主题找TableInputFormat的关键代码
·getSplits()
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin()); TableName tableName = getTable().getName(); Pair<byte[][], byte[][]> keys = getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { HRegionLocation regLoc = getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); if (null == regLoc) { throw new IOException("Expecting at least one region."); } List<InputSplit> splits = new ArrayList<>(1); //拿到region的数量,用来做为partitin的数量 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); //创建TableSplit,也就是InputSplit TableSplit split = new TableSplit(tableName, scan, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); splits.add(split);
·createRecordReader()
final TableRecordReader trr = this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); Scan sc = new Scan(this.scan); sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); trr.setTable(getTable()); return new RecordReader<ImmutableBytesWritable, Result>() { @Override public void close() throws IOException { trr.close(); closeTable(); } @Override public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { return trr.getCurrentKey(); } @Override public Result getCurrentValue() throws IOException, InterruptedException { return trr.getCurrentValue(); } @Override public float getProgress() throws IOException, InterruptedException { return trr.getProgress(); } @Overrid public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException { trr.initialize(inputsplit, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return trr.nextKeyValue(); } };
这个应该挺清楚的,花式创建RecordReader..
总结
Spark为了兼容mapreduce,给出了类似hadoopRDD()的接口,hbase为了兼容mapreduce,给出了TableInputFormat之类的接口。从而使得spark可以通过hbase获取数据,当然方法不只这一种。