官方手册:http://hbase.apache.org/book.html#mapreduce.example
简单的操作,将hbase表中的数据写入到文件中。
RunJob 源码:
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.FileSystem; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.hbase.HBaseConfiguration; 5 import org.apache.hadoop.hbase.client.Scan; 6 import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 7 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 12 import org.apache.hadoop.util.Tool; 13 import org.apache.hadoop.util.ToolRunner; 14 15 /** 16 * Created by Edward on 2016/6/29. 17 */ 18 public class RunJob implements Tool { 19 20 private Configuration conf = null; 21 22 @Override 23 public int run(String[] strings) throws Exception { 24 25 Configuration conf = this.getConf(); 26 27 FileSystem fs = FileSystem.get(conf); 28 29 Job job = Job.getInstance(conf,"etl"); 30 job.setJarByClass(RunJob.class); 31 32 job.setInputFormatClass(TableInputFormat.class); 33 job.setOutputFormatClass(TextOutputFormat.class); 34 job.setOutputKeyClass(TextOutputFormat.class); 35 36 Scan scan = new Scan(); 37 scan.setCaching(1024); 38 scan.setCacheBlocks(false); 39 40 TableMapReduceUtil.initTableMapperJob("test1", 41 scan, 42 MyMapper.class, 43 Text.class, 44 Text.class, 45 job); 46 47 Path path = new Path("/hbase_out"); 48 if(fs.exists(path)) 49 { 50 fs.delete(path,true); 51 } 52 53 FileOutputFormat.setOutputPath(job, new Path("/hbase_out")); 54 55 boolean b = job.waitForCompletion(true); 56 if(b) 57 { 58 System.out.println("执行成功"); 59 } 60 return 0; 61 } 62 63 @Override 64 public void setConf(Configuration configuration) { 65 66 System.setProperty("HADOOP_USER_NAME","root"); 67 configuration.set("hbase.zookeeper.quorum","node1,node2,node3"); 68 configuration.set("mapred.jar","D:\etl.jar"); 69 70 this.conf = HBaseConfiguration.create(configuration); 71 } 72 73 @Override 74 public Configuration getConf() { 75 return this.conf; 76 } 77 78 public static void main(String[] args) 79 { 80 try { 81 ToolRunner.run(new Configuration(), new RunJob(), args); 82 } catch (Exception e) { 83 e.printStackTrace(); 84 } 85 } 86 }
MyMapper代码:
1 import org.apache.hadoop.hbase.client.Result; 2 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 3 import org.apache.hadoop.hbase.mapreduce.TableMapper; 4 import org.apache.hadoop.io.Text; 5 6 import java.io.IOException; 7 8 /** 9 * Created by Edward on 2016/6/29. 10 */ 11 public class MyMapper extends TableMapper<Text, Text>{ 12 @Override 13 protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { 14 15 String val = new String(value.getValue("info".getBytes(),"name".getBytes())); 16 String row = new String(value.getRow()); 17 context.write(new Text(row), new Text(val)); 18 } 19 }
MyReducer代码:
1 import org.apache.hadoop.io.Text; 2 import org.apache.hadoop.mapreduce.Reducer; 3 4 import java.io.IOException; 5 6 /** 7 * Created by Edward on 2016/6/29. 8 */ 9 public class MyReducer extends Reducer<Text,Text,Text,Text>{ 10 11 @Override 12 protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 13 for(Text t:values) { 14 context.write(key, t); 15 } 16 } 17 }