zoukankan      html  css  js  c++  java
  • HBase MapReduce 使用

    项目中需要用MapReduce来读取或者写Hbase,这样可以节省大量开发时间。

    Hbase本身的jar包里就有这样的API , 以下是我从官网上找的一些资料,和大家分享一下。

    原文地址:http://hbase.apache.org/book/mapreduce.example.html

    总体说明一下:TableMapper 主要是读hbase数据,TableReducer 主要是写hbase数据。可以结合一起用,也可以分开用。

    (一) 读Hbase实例

    public static class MyMapper extends TableMapper<Text, Text> {
    
      public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
        // process data for the row from the Result instance.
       }
    }
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleRead");
    job.setJarByClass(MyReadJob.class);     // class that contains mapper
    
    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    // set other scan attrs
    ...
    
    TableMapReduceUtil.initTableMapperJob(
      tableName,        // input HBase table name
      scan,             // Scan instance to control CF and attribute selection
      MyMapper.class,   // mapper
      null,             // mapper output key
      null,             // mapper output value
      job);
    job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper
    
    boolean b = job.waitForCompletion(true);
    if (!b) {
      throw new IOException("error with job!");
    }

    (二) 读写实例

    public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put>  {
    
    	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    		// this example is just copying the data from the source table...
       		context.write(row, resultToPut(row,value));
       	}
    
      	private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
      		Put put = new Put(key.get());
     		for (KeyValue kv : result.raw()) {
    			put.add(kv);
    		}
    		return put;
       	}
    }
        
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config,"ExampleReadWrite");
    job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper
    
    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    // set other scan attrs
    
    TableMapReduceUtil.initTableMapperJob(
    	sourceTable,      // input table
    	scan,	          // Scan instance to control CF and attribute selection
    	MyMapper.class,   // mapper class
    	null,	          // mapper output key
    	null,	          // mapper output value
    	job);
    TableMapReduceUtil.initTableReducerJob(
    	targetTable,      // output table
    	null,             // reducer class
    	job);
    job.setNumReduceTasks(0);
    
    boolean b = job.waitForCompletion(true);
    if (!b) {
        throw new IOException("error with job!");
    }

    (三) 做统计实例

    public static class MyMapper extends TableMapper<Text, IntWritable>  {
    
    	private final IntWritable ONE = new IntWritable(1);
       	private Text text = new Text();
    
       	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
            	String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1")));
              	text.set(val);     // we can only emit Writables...
    
            	context.write(text, ONE);
       	}
    }
      
    public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {
    
     	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        		int i = 0;
        		for (IntWritable val : values) {
        			i += val.get();
        		}
        		Put put = new Put(Bytes.toBytes(key.toString()));
        		put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i));
    
        		context.write(null, put);
       	}
    }
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config,"ExampleSummary");
    job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer
    
    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    // set other scan attrs
    
    TableMapReduceUtil.initTableMapperJob(
    	sourceTable,        // input table
    	scan,               // Scan instance to control CF and attribute selection
    	MyMapper.class,     // mapper class
    	Text.class,         // mapper output key
    	IntWritable.class,  // mapper output value
    	job);
    TableMapReduceUtil.initTableReducerJob(
    	targetTable,        // output table
    	MyTableReducer.class,    // reducer class
    	job);
    job.setNumReduceTasks(1);   // at least one, adjust as required
    
    boolean b = job.waitForCompletion(true);
    if (!b) {
    	throw new IOException("error with job!");
    }

    (四)混合实例,结果存在文件上

     public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {
    
    	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    		int i = 0;
    		for (IntWritable val : values) {
    			i += val.get();
    		}
    		context.write(key, new IntWritable(i));
    	}
    }
      
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config,"ExampleSummaryToFile");
    job.setJarByClass(MySummaryFileJob.class);     // class that contains mapper and reducer
    
    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    // set other scan attrs
    
    TableMapReduceUtil.initTableMapperJob(
    	sourceTable,        // input table
    	scan,               // Scan instance to control CF and attribute selection
    	MyMapper.class,     // mapper class
    	Text.class,         // mapper output key
    	IntWritable.class,  // mapper output value
    	job);
    job.setReducerClass(MyReducer.class);    // reducer class
    job.setNumReduceTasks(1);    // at least one, adjust as required
    FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile"));  // adjust directories as required
    
    boolean b = job.waitForCompletion(true);
    if (!b) {
    	throw new IOException("error with job!");
    }
      
  • 相关阅读:
    JAVA for(i = 0; i<a.length; i++) 解析
    3.2.2多维数组 3.3 排序
    3.2数组
    字符串和数组
    2.7.3与程序转移有关的跳转语句
    2.7.2 循环语句
    读书共享 Primer Plus C-part 4
    Linux 批量修改文件名
    关于/usr/local/lib/libz.a(zutil.o): relocation R_X86_64_32 against `.rodata.str1.1' can not be used when making a shared object; recompile with -fPIC解决办法
    做一个有深度的程序猿
  • 原文地址:https://www.cnblogs.com/bluecoder/p/3889614.html
Copyright © 2011-2022 走看看