标签:
今天在从文件中读取数据在写入到hbase的时候,使用到了TableMapReduceUtil工具类,使用过程只需要简单的设置之后工具类会帮我们生成写入到HBase的任务,工作类封装了许多MapReduce写入到HBase的操作,无需我们自己再去设置,下面大致看看内部的实现机制,对TableMapReduceUtil有个比较深入的了解
使用过程:在map端生成了<ImmutableBytesWritable , put>的输出类型,key和value分别为key和put对象,然后使用如下设置
1 TableMapReduceUtil.initTableReducerJob("Test_MR_HBase", // output table
2 null, // reducer class
3 job); // TableMapReduceUtil是HBase提供的工具类,会自动设置mapreuce提交到hbase任务的各种配置,封装了操作
4 job.setNumReduceTasks(0);// 设置reduce过程,这里由map端的数据直接提交,不要使用reduce类,因而设置成null,并设置reduce的个数为0
5 FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输入文件路径
追踪方法的具体细节,查看具体的实现过程,发现最核心的任务设置的方法
1 public static void initTableReducerJob(String table,
2 Class<? extends TableReducer> reducer, Job job,
3 Class partitioner, String quorumAddress, String serverClass,
4 String serverImpl, boolean addDependencyJars) throws IOException {
5
6 Configuration conf = job.getConfiguration();
7 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
8 job.setOutputFormatClass(TableOutputFormat.class);//可见输出使用的TableOutputFormat类,结果会通过网络请求到HBase集群
9 if (reducer != null) job.setReducerClass(reducer);//设置reudce
10 conf.set(TableOutputFormat.OUTPUT_TABLE, table);//设置输出hbase的表
11 conf.setStrings("io.serializations", conf.get("io.serializations"),
12 MutationSerialization.class.getName(), ResultSerialization.class.getName());
13 // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
14 if (quorumAddress != null) {
15 // Calling this will validate the format
16 ZKUtil.transformClusterKey(quorumAddress);
17 conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
18 }
19 if (serverClass != null && serverImpl != null) {
20 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
21 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
22 }
23 job.setOutputKeyClass(ImmutableBytesWritable.class);//设置输出key的类型
24 job.setOutputValueClass(Writable.class);//输出的value类型
25 if (partitioner == HRegionPartitioner.class) {
26 job.setPartitionerClass(HRegionPartitioner.class);//设置partitioner算法,partition算法会决定map输出的结果输出到哪一个reduce上
27 int regions = MetaReader.getRegionCount(conf, table);//获得region的数量,并根据region的数量设置reduce的个数
28 if (job.getNumReduceTasks() > regions) {//将reudce的数目设置成region的个数
29 job.setNumReduceTasks(regions);
30 }
31 } else if (partitioner != null) {
32 job.setPartitionerClass(partitioner);
33 }
34
35 if (addDependencyJars) {
36 addDependencyJars(job);
37 }
38
39 initCredentials(job);
40 }
值得注意的一点是工具类最终的实现是通过HBase的put方法通过网络请求提交数据,在大批量写入时需要考虑对hbase带来的负载
driver程序如下:
1 private int excuteHFile(String[] args) throws Exception {
2 Configuration conf = HBaseConfiguration.create();// 任务的配置设置,configuration是一个任务的配置对象,封装了任务的配置信息
3
4 Job job = Job.getInstance(conf, "HFile bulk load test");// 生成一个新的任务对象并设置dirver类
5
6 job.setJarByClass(HfileToHBaseDriver.class);
7 job.setMapperClass(HfileToHBaseMapper.class); // 设置任务的map类和 ,map类输出结果是ImmutableBytesWritable和put类型
8
9 TableMapReduceUtil.initTableReducerJob("Test_MR_HBase", // output table
10
11 null, // reducer class
12
13 job);// TableMapReduceUtil是HBase提供的工具类,会自动设置mapreuce提交到hbase任务的各种配置,封装了操作,只需要简单的设置即可
14
15 job.setNumReduceTasks(0);// 设置reduce过程,这里由map端的数据直接提交,不要使用reduce类,因而设置成null,并设置reduce的个数为0
16
17 FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输入文件路径
18
19 return (job.waitForCompletion(true) ? 0 : -1);
20 }