zoukankan      html  css  js  c++  java
  • MapReduce-读取HBase

    MapReduce读取HBase数据

    代码如下

    package com.hbase.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
    * @author:FengZhen
    * @create:2018年9月17日
    * MapReduce读取HBase中数据
    */
    public class AnalyzeData extends Configured implements Tool{
    	
    	private static String addr="HDP233,HDP232,HDP231";
    	private static String port="2181";
    	
    	public enum Counters { ROWS, COLS, VALID, ERROR }
    	
    	static class AnalyzeMapper extends TableMapper<Text, IntWritable>{
    		private IntWritable ONE = new IntWritable(1);
    		@Override
    		protected void map(ImmutableBytesWritable key, Result value,
    				Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			context.getCounter(Counters.ROWS).increment(1);
    			try {
    				for (Cell cell : value.listCells()) {
    					context.getCounter(Counters.COLS).increment(1);
    					String hbaseValue = Bytes.toString(CellUtil.cloneValue(cell));
    					context.write(new Text(hbaseValue), ONE);
    					context.getCounter(Counters.VALID).increment(1);
    				}
    			} catch (Exception e) {
    				e.printStackTrace();
    				context.getCounter(Counters.ERROR).increment(1);
    			}
    		}
    	}
    	
    	static class AnalyzeReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values,
    				Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    			int count = 0;
    			for (IntWritable intWritable : values) {
    				count = count + intWritable.get();
    			}
    			context.write(key, new IntWritable(count));
    		}
    	}
    	
    	public int run(String[] arg0) throws Exception {
    		String table = arg0[0];
    		String column = arg0[1];
    		String outPath = arg0[2];
    		
    		Scan scan = new Scan();
    		if (null != column) {
    			byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
    			if (colkey.length > 1) {
    				scan.addColumn(colkey[0], colkey[1]);
    			}else {
    				scan.addFamily(colkey[0]);
    			}
    		}
    		
    		Configuration configuration = HBaseConfiguration.create();
    		configuration.set("hbase.zookeeper.quorum",addr);
    		configuration.set("hbase.zookeeper.property.clientPort", port);
    		configuration.set(TableInputFormat.INPUT_TABLE, table);
    		
    		Job job = Job.getInstance(configuration);
    		job.setJobName("AnalyzeData");
    		job.setJarByClass(AnalyzeData.class);
    		
    		job.setMapperClass(AnalyzeMapper.class);
    		job.setInputFormatClass(TableInputFormat.class);
    		TableInputFormat.addColumns(scan, KeyValue.parseColumn(Bytes.toBytes(column)));
    		
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		
    		//使用TableMapReduceUtil会报类找不到错误
    		//Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry
    		//TableMapReduceUtil.initTableMapperJob(table, scan, AnalyzeMapper.class, Text.class, IntWritable.class, job);
    		
    		job.setReducerClass(AnalyzeReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		job.setNumReduceTasks(1);
    		FileOutputFormat.setOutputPath(job, new Path(outPath));
    		
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    	
    	public static void main(String[] args) throws Exception {
    		String[] params = new String[] {"test_table_mr","data:info","hdfs://fz/data/fz/output/mrReadHBase"};
    		int exitCode = ToolRunner.run(new AnalyzeData(), params);
    		System.exit(exitCode);
    	}
    }
  • 相关阅读:
    分布式爬虫
    前端页面展示
    fillter根据value来匹配字段
    element ui 怎么去修改el-date-picker的时间
    element ui,input框输入时enter健进行搜索
    element ui 里面的table怎么弹出一个框让表中数据点击出现弹框
    修改数据结构记录,将同级数据改成父子集数据
    h5的复制功能的使用,Clipboard.js的使用,主要是在app里面使用
    在安卓手机下按钮会悬浮在键盘上,怎么解决vue.js
    last-child为啥不生效
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/9662472.html
Copyright © 2011-2022 走看看