package dataimport; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class ip_to_hfile2 { public static class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> { private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); protected void map(LongWritable key, Text value, Context context) { try { String[] strs=value.toString().split("\t"); if(strs.length<2) return; //long ip = Long.parseLong(strs[0]); // if(strs.length!=19) // return; immutableBytesWritable.set(Bytes.toBytes(Long.parseLong(strs[0]))); context.write(immutableBytesWritable,new Text(strs[1])); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class HBaseHFileReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> { protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String rt = values.iterator().next().toString(); KeyValue kv =new KeyValue(key.get(),Bytes.toBytes("ids"),null,0,Bytes.toBytes(rt)); context.write(key, kv); } } // private KeyValue createKeyValue(String str) { // String[] strs = str.split(":"); // if (strs.length < 2) // return null; // String row = strs[0]; // String family = "f1"; // String qualifier = "k1"; // String value = strs[1]; // return new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), // Bytes.toBytes(qualifier), System.currentTimeMillis(), // Bytes.toBytes(value)); // } //} public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create(); conf.set("fs.default.name","hdfs://h1:9000");//与conf/core-site.xml里的值对应,必须 conf.set("mapred.job.tracker","h1:9001");//mapred-site.xml conf.set("hbase.zookeeper.quorum", "h2"); conf.set("hbase.zookeeper.property.clientPort","2181"); Job job = new Job(conf, "index_ip_hfile_test"); job.setJarByClass(ip_to_hfile2.class); job.setMapperClass(HBaseHFileMapper.class); job.setReducerClass(HBaseHFileReducer.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); FileInputFormat.addInputPath(job, new Path("hdfs://h1:9000/user/hadoop/data/index_ip/")); HFileOutputFormat.setOutputPath(job, new Path("hdfs://h1:9000/user/hadoop/data/index_ip_hfile")); //String tableName = "index"; HTable htable = new HTable(conf, "index_ip"); HFileOutputFormat.configureIncrementalLoad(job, htable); System.exit(job.waitForCompletion(true) ? 0 : 1); } }