zoukankan      html  css  js  c++  java
  • MapReduce-从HBase读取数据处理后再写入HBase

    MapReduce-从HBase读取处理后再写入HBase

    代码如下

    package com.hbase.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
    * @author:FengZhen
    * @create:2018年9月17日
    * 从HBase读写入HBase
    * zip -d HBaseToHBase.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF'
    */
    public class HBaseToHBase extends Configured implements Tool{
    
    	private static String addr="HDP233,HDP232,HDP231";
    	private static String port="2181";
    	
    	public enum Counters { ROWS, COLS, VALID, ERROR, EMPTY, NOT_EMPTY}
    	
    	static class ParseMapper extends TableMapper<ImmutableBytesWritable, Put>{
    		private byte[] columnFamily = null;
    		@Override
    		protected void setup(Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
    				throws IOException, InterruptedException {
    			columnFamily = Bytes.toBytes(context.getConfiguration().get("conf.columnfamily"));
    		}
    		@Override
    		protected void map(ImmutableBytesWritable key, Result value,
    				Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
    				throws IOException, InterruptedException {
    			context.getCounter(Counters.ROWS).increment(1);
    			String hbaseValue = null;
    			
    			Put put = new Put(key.get());
    			for (Cell cell : value.listCells()) {
    				context.getCounter(Counters.COLS).increment(1);
    				hbaseValue = Bytes.toString(CellUtil.cloneValue(cell));
    				if (hbaseValue.length() > 0) {
    					String top = hbaseValue.substring(0, hbaseValue.length()/2);
    					String detail = hbaseValue.substring(hbaseValue.length()/2, hbaseValue.length() - 1);
    					put.addColumn(columnFamily, Bytes.toBytes("top"), Bytes.toBytes(top));
    					put.addColumn(columnFamily, Bytes.toBytes("detail"), Bytes.toBytes(detail));
    					context.getCounter(Counters.NOT_EMPTY).increment(1);
    				}else {
    					put.addColumn(columnFamily, Bytes.toBytes("empty"), Bytes.toBytes(hbaseValue));
    					context.getCounter(Counters.EMPTY).increment(1);
    				}
    			}
    			try {
    				context.write(key, put);
    				context.getCounter(Counters.VALID).increment(1);
    			} catch (Exception e) {
    				e.printStackTrace();
    				context.getCounter(Counters.ERROR).increment(1);
    			}
    		}
    	}
    	
    	static class ParseTableReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{
    		@Override
    		protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
    				Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Mutation>.Context context)
    				throws IOException, InterruptedException {
    			for (Put put : values) {
    				context.write(key, put);
    			}
    		}
    	}
    	
    	public int run(String[] arg0) throws Exception {
    		String table = arg0[0];
    		String column = arg0[1];
    		String destTable = arg0[2];
    		
    		Configuration configuration = HBaseConfiguration.create();
    		configuration.set("hbase.zookeeper.quorum",addr);
    		configuration.set("hbase.zookeeper.property.clientPort", port);
    		
    		Scan scan = new Scan();
    		if (null != column) {
    			byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
    			if (colkey.length > 1) {
    				scan.addColumn(colkey[0], colkey[1]);
    				configuration.set("conf.columnfamily", Bytes.toString(colkey[0]));
    				configuration.set("conf.columnqualifier", Bytes.toString(colkey[1]));
    			}else {
    				scan.addFamily(colkey[0]);
    				configuration.set("conf.columnfamily", Bytes.toString(colkey[0]));
    			}
    		}
    		
    		Job job = Job.getInstance(configuration);
    		job.setJobName("HBaseToHBase2");
    		job.setJarByClass(HBaseToHBase2.class);
    		
    		job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
    		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, destTable);
    		
    		job.setMapperClass(ParseMapper.class);
    		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    		job.setMapOutputValueClass(Put.class);
    		
    //		job.setReducerClass(ParseTableReducer.class);
    		job.setOutputKeyClass(ImmutableBytesWritable.class);
    		job.setOutputValueClass(Put.class);
    		
    		job.setInputFormatClass(TableInputFormat.class);
    		TableInputFormat.addColumns(scan, KeyValue.parseColumn(Bytes.toBytes(column)));
    		job.setOutputFormatClass(TableOutputFormat.class);
    		
    		job.setNumReduceTasks(0);
    		
    		//使用TableMapReduceUtil会报类找不到错误
    		//Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry
    //		TableMapReduceUtil.initTableMapperJob(table, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class, job);
    //		TableMapReduceUtil.initTableReducerJob(table, IdentityTableReducer.class, job);
    		
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    	public static void main(String[] args) throws Exception {
    		String[] params = new String[] {"test_table_mr", "data:info", "test_table_dest"};
    		int exitCode = ToolRunner.run(new HBaseToHBase2(), params);
    		System.exit(exitCode);
    	}
    }
    

     打包测试

    zip -d HBaseToHBase.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF'
    hadoop jar HBaseToHBase.jar com.hbase.mapreduce.HBaseToHBase

    出现的问题

    一开始使用额TableMapReduceUtil,但是报下面这个错

    Exception in thread "main" java.lang.NoClassDefFoundError: com/yammer/metrics/core/MetricsRegistry
    	at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(TableMapReduceUtil.java:732)
    	at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(TableMapReduceUtil.java:777)
    	at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:212)
    	at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:168)
    	at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:291)
    	at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:92)
    	at com.hbase.mapreduce.HBaseToHBase.run(HBaseToHBase.java:108)
    	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
    	at com.hbase.mapreduce.HBaseToHBase.main(HBaseToHBase.java:115)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.hadoop.util.RunJar.run(RunJar.java:233)
    	at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
    Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 16 more
    

     解决,不使用TableMapReduceUtil,分布设置便可解决此问题

  • 相关阅读:
    RESTful规范
    Vuex以及axios
    npm webpack vue-cli
    Vue生命周期
    Vue-Router
    Vue组件
    Vue基础以及指令
    1.JavaCC安装与测试
    10.InfluxDB-InfluxQL基础语法教程--OFFSET 和SOFFSET子句
    9.InfluxDB-InfluxQL基础语法教程--LIMIT and SLIMIT 子句
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/9716961.html
Copyright © 2011-2022 走看看