zoukankan      html  css  js  c++  java
  • hadoop 把text 文件转成 Hfile 文件

    package dataimport;
    
    
    
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    
    
    public class ip_to_hfile2 {
    	public static class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> 
    	{
    		private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
    
    
    		protected void map(LongWritable key, Text value, Context context)
    		{
    			try 
    			{
    				String[] strs=value.toString().split("\t");
    				if(strs.length<2)
    					return;
    				//long ip = Long.parseLong(strs[0]);
    //				if(strs.length!=19)
    //					return;
    				immutableBytesWritable.set(Bytes.toBytes(Long.parseLong(strs[0])));
    				context.write(immutableBytesWritable,new Text(strs[1]));
    			} 
    			catch (IOException e) 
    			{
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (InterruptedException e)
    			{
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    		}
    	}
    
    
    	public static class HBaseHFileReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> 
    	{
    		protected void reduce(ImmutableBytesWritable key,
    				Iterable<Text> values, Context context) throws IOException,
    				InterruptedException 
    			{
    				String rt = values.iterator().next().toString(); 
    				KeyValue kv =new KeyValue(key.get(),Bytes.toBytes("ids"),null,0,Bytes.toBytes(rt));
    				context.write(key, kv);
    			}
    		}
    
    
    //		private KeyValue createKeyValue(String str) {
    //			String[] strs = str.split(":");
    //			if (strs.length < 2)
    //				return null;
    //			String row = strs[0];
    //			String family = "f1";
    //			String qualifier = "k1";
    //			String value = strs[1];
    //			return new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
    //					Bytes.toBytes(qualifier), System.currentTimeMillis(),
    //					Bytes.toBytes(value));
    //		}
    	//}
    
    
    	public static void main(String[] args) throws IOException,
    			InterruptedException, ClassNotFoundException {
    		Configuration conf = HBaseConfiguration.create();
    		conf.set("fs.default.name","hdfs://h1:9000");//与conf/core-site.xml里的值对应,必须 
    		conf.set("mapred.job.tracker","h1:9001");//mapred-site.xml
    		conf.set("hbase.zookeeper.quorum", "h2");
    		conf.set("hbase.zookeeper.property.clientPort","2181");
    		Job job = new Job(conf, "index_ip_hfile_test");
    		job.setJarByClass(ip_to_hfile2.class);
    		job.setMapperClass(HBaseHFileMapper.class);
    		job.setReducerClass(HBaseHFileReducer.class);
    		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    		job.setMapOutputValueClass(Text.class);
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputFormatClass(HFileOutputFormat.class);
    		FileInputFormat.addInputPath(job, new Path("hdfs://h1:9000/user/hadoop/data/index_ip/"));
    		HFileOutputFormat.setOutputPath(job, new Path("hdfs://h1:9000/user/hadoop/data/index_ip_hfile"));
    		//String tableName = "index";
    		HTable htable = new HTable(conf, "index_ip");
    		HFileOutputFormat.configureIncrementalLoad(job, htable);
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }

  • 相关阅读:
    careercup-高等难度 18.1
    面试——网络
    堆和栈的区别(转过无数次的文章)
    Linux用户空间与内核空间(理解高端内存)
    Linux内存管理
    位操作实现加减乘除四则运算
    栈的压入和弹出序列
    DG gap sequence修复一例
    ORACLE 11gR2 DG(Physical Standby)日常维护02
    oracle的特殊权限s bit丢失
  • 原文地址:https://www.cnblogs.com/java20130726/p/3218277.html
Copyright © 2011-2022 走看看