zoukankan      html  css  js  c++  java
  • 利用CombineFileInputFormat把netflix data set 导入到Hbase里

    版权声明:本文为博主原创文章。未经博主同意不得转载。 https://blog.csdn.net/xiewenbo/article/details/25637931
    package com.mr.test;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    
    public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> {
    
    	@Override
    	public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
    
    		CombineFileSplit combineFileSplit = (CombineFileSplit) split;
    		CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);
    		try {
    			recordReader.initialize(combineFileSplit, context);
    		} catch (InterruptedException e) {
    			new RuntimeException("Error to initialize CombineSmallfileRecordReader.");
    		}
    		return recordReader;
    	}
    
    }


    package com.mr.test;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    
    public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> {
    
    	private CombineFileSplit combineFileSplit;
    	private LineRecordReader lineRecordReader = new LineRecordReader();
    	private Path[] paths;
    	private int totalLength;
    	private int currentIndex;
    	private float currentProgress = 0;
    	private LongWritable currentKey;
    	private BytesWritable currentValue = new BytesWritable();
    
    	public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException {
    		super();
    		this.combineFileSplit = combineFileSplit;
    		this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引
    	}
    
    	@Override
    	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    		this.combineFileSplit = (CombineFileSplit) split;
    		// 处理CombineFileSplit中的一个小文件Block,由于使用LineRecordReader,须要构造一个FileSplit对象,然后才可以读取数据
    		FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
    		lineRecordReader.initialize(fileSplit, context);
    
    		this.paths = combineFileSplit.getPaths();
    		totalLength = paths.length;
    		context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());
    	}
    
    	@Override
    	public LongWritable getCurrentKey() throws IOException, InterruptedException {
    		currentKey = lineRecordReader.getCurrentKey();
    		return currentKey;
    	}
    
    <strong><span style="color:#ff0000;">	@Override
    	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
    		System.out.println("lineRecordReader:"+lineRecordReader.getCurrentValue().toString());
    		byte[] content = lineRecordReader.getCurrentValue().toString().getBytes();
    		System.out.println("content:"+new String(content));
    		currentValue = new BytesWritable();
    		currentValue.set(content, 0, content.length);
    		System.out.println("currentValue:"+new String(currentValue.getBytes()));
    		return currentValue;
    	}</span></strong>
    	public static void main(String args[]){
    		BytesWritable cv = new BytesWritable();
    		String str1 = "1234567";
    		String str2 = "123450";
    		cv.set(str1.getBytes(), 0, str1.getBytes().length);
    		System.out.println(new String(cv.getBytes()));
    		
    		cv.setCapacity(0);
    		
    		cv.set(str2.getBytes(), 0, str2.getBytes().length);
    		System.out.println(new String(cv.getBytes()));
    	}
    	@Override
    	public boolean nextKeyValue() throws IOException, InterruptedException {
    		if (currentIndex >= 0 && currentIndex < totalLength) {
    			return lineRecordReader.nextKeyValue();
    		} else {
    			return false;
    		}
    	}
    
    	@Override
    	public float getProgress() throws IOException {
    		if (currentIndex >= 0 && currentIndex < totalLength) {
    			currentProgress = (float) currentIndex / totalLength;
    			return currentProgress;
    		}
    		return currentProgress;
    	}
    
    	@Override
    	public void close() throws IOException {
    		lineRecordReader.close();
    	}
    }

    package com.mr.test;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class BulkImportData {
    
    	public static class TokenizerMapper extends
    			Mapper<Object, BytesWritable, Text, Text> {
    		public Text _key = new Text();
    		public Text _value = new Text();
    		public void map(Object key, BytesWritable value, Context context)
    				throws IOException, InterruptedException {
    			_value.set(value.getBytes());
    			String tmp = _value.toString().trim();
    			System.out.println(tmp);
    			tmp = tmp.replace("\x00", "");
    			_value.set(tmp);
    			String filename = context.getConfiguration().get("map.input.file.name");
    			String[] splits = _value.toString().split(",");
    			if(splits.length==3){
    				filename = filename.replace("mv_", "");
    				filename = filename.replace(".txt", "");
    				_key.set(splits[0]+"_"+filename);
    				context.write(_key, _value);
    			}
    		}
    	}
    	public static class IntSumReducer extends
    			TableReducer<Text, Text, ImmutableBytesWritable> {
    		public void reduce(Text key, Iterable<Text> values,
    				Context context) throws IOException, InterruptedException {
    			Iterator<Text> itr = values.iterator();
    			while(itr.hasNext()){
    				Text t = itr.next();
    				String[] strs = t.toString().split(",");
    				if(strs.length!=3)continue;
    				Put put = new Put(key.getBytes());
    				put.add(Bytes.toBytes("content"), Bytes.toBytes("score"), Bytes.toBytes(strs[1].trim())); 
    				put.add(Bytes.toBytes("content"), Bytes.toBytes("date"), Bytes.toBytes(strs[2].trim()));  
    				context.write(new ImmutableBytesWritable(key.getBytes()), put);  
    			}
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		String tablename = "ntf_data";
    		Configuration conf = HBaseConfiguration.create();
    		HBaseAdmin admin = new HBaseAdmin(conf);
    		if (admin.tableExists(tablename)) {
    			admin.disableTable(tablename);
    			admin.deleteTable(tablename);
    		}
    		HTableDescriptor htd = new HTableDescriptor(tablename);
    		HColumnDescriptor hcd = new HColumnDescriptor("content");
    		htd.addFamily(hcd);
    		admin.createTable(htd);
    		String[] otherArgs = new GenericOptionsParser(conf, args)
    				.getRemainingArgs();
    		if (otherArgs.length != 1) {
    			System.err
    					.println("Usage: wordcount <in> <out>" + otherArgs.length);
    			System.exit(2);
    		}
    		Job job = new Job(conf, "h");
    		job.setMapperClass(TokenizerMapper.class);
    		job.setJarByClass(BulkImportData.class);
    		job.setInputFormatClass(CombineSmallfileInputFormat.class);
    		job.setNumReduceTasks(5);
    		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    		TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class,
    				job);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		System.exit(job.waitForCompletion(true) ?

    0 : 1); } }



查看全文
  • 相关阅读:
    L208
    L207
    L206
    L205 EE
    L204
    监控glusterfs
    监控elssticSearch健康状态
    防火墙
    创建逻辑卷
    编译安装nginx,并使用systemd管理nginx
  • 原文地址:https://www.cnblogs.com/ldxsuanfa/p/10714698.html
  • Copyright © 2011-2022 走看看