package com.wyh.parctise; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class HDFStoHFile { /** * 编写map段 */ public static class HdfsToHFileMap extends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String[] split = v1.toString().split(","); String id = split[0]; //创建输入类型数据 ImmutableBytesWritable key = new ImmutableBytesWritable(id.getBytes()); //创建输出类型 KeyValue name = new KeyValue(id.getBytes(), "info".getBytes(), "name".getBytes(), split[1].getBytes()); KeyValue age = new KeyValue(id.getBytes(), "info".getBytes(), "age".getBytes(), split[2].getBytes()); KeyValue gender = new KeyValue(id.getBytes(), "info".getBytes(), "gender".getBytes(), split[3].getBytes()); KeyValue clazz = new KeyValue(id.getBytes(), "info".getBytes(), "clazz".getBytes(), split[4].getBytes()); //写入到磁盘 context.write(key,name); context.write(key,age); context.write(key,gender); context.write(key,clazz); } } public static void main(String[] args) throws Exception { //创建配置文件实例 Configuration conf = HBaseConfiguration.create(); Job job = Job.getInstance(conf); //创建Job job.setJobName("HDFStoHfile"); job.setJarByClass(HDFStoHFile.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); //设置job的map段 job.setMapperClass(HdfsToHFileMap.class); //设置reduce段,是Hbase给我们写好的一个类 job.setReducerClass(KeyValueSortReducer.class); //创建HTable HTable stu4 = new HTable(conf, "stu4"); //将这个表加入到输出中去 HFileOutputFormat2.configureIncrementalLoad(job,stu4); //设置HDFS文件的输入路径 FileInputFormat.addInputPath(job,new Path("/data/students.txt")); FileOutputFormat.setOutputPath(job,new Path("/data/hfile1")); //将其关闭 job.waitForCompletion(true); } }
前提:现在Hbase中创建好表和原本HDFS中存在数据
2、将产生的Hfile在hbase中添加索引
package com.wyh.parctise; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; public class LoadHfileToHbase { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "testmaster:2181,testnode1:2181.testnode2:2181,testnode3:2181"); HTable stu4 = new HTable(conf, "stu4"); LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(conf); loadIncrementalHFiles.doBulkLoad(new Path("/data/hfile1"),stu4); } }
注意:两个执行方式都是将其打包,注意使用整个项目进行打包,不然在Hadoop的环境中没有添加Hbase的依赖会报错,在pom.xml中添加如下代码(这里不是依赖)
<build>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<!-- 带依赖jar 插件-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
在将项目打包,在hadoop的环境中,指定类名进行运行。