zoukankan      html  css  js  c++  java
  • hbase MapReduce程序样例入门

    1、先看一个标准的hbase作为数据读取源和输出源的样例:

    1
    2
    3
    4
    5
    6
    7
    8
    
    Configuration conf = HBaseConfiguration.create();
    Job job = new Job(conf, "job name ");
    job.setJarByClass(test.class);
    Scan scan = new Scan();
    TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,
    		Writable.class, Writable.class, job);
    TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job);
    job.waitForCompletion(true);

    首先创建配置信息和作业对象,设置作业的类。这些和正常的mapreduce一样,唯一不一样的就是数据源的说明部分,TableMapReduceUtil的initTableMapperJob和initTableReducerJob方法来实现。

    用如上代码:
    数据输入源是hbase的inputTable表,执行mapper.class进行map过程,输出的key/value类型是 ImmutableBytesWritable和Put类型,最后一个参数是作业对象。需要指出的是需要声明一个扫描读入对象scan,进行表扫描读取数 据用,其中scan可以配置参数,这里为了例子简单不再详述。
    数据输出目标是hbase的outputTable表,输出执行的reduce过程是reducer.class类,操作的作业目标是job。与map比 缺少输出类型的标注,因为他们不是必要的,看过源代码就知道mapreduce的TableRecordWriter中write(key,value) 方法中,key值是没有用到的。value只能是Put或者Delete两种类型,write方法会自行判断并不用用户指明。

    接下来就是mapper类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    public class mapper extends
    		TableMapper<KEYOUT, VALUEOUT> {
     
    	public void map(Writable key, Writable value, Context context)
    			throws IOException, InterruptedException {
    		        //mapper逻辑
    			context.write(key, value);
    		}
     
    	}
    }

    继承的是hbase中提供的TableMapper类,其实这个类也是继承的MapReduce类。后边跟的两个泛型参数指定类型是mapper输 出的数据类型,该类型必须继承自Writable类,例如可能用到的put和delete就可以。需要注意的是要和initTableMapperJob 方法指定的数据类型一直。该过程会自动从指定hbase表内一行一行读取数据进行处理。

    然后reducer类:

    1
    2
    3
    4
    5
    6
    7
    8
    
    public class countUniteRedcuer extends
    		TableReducer<KEYIN, VALUEIN, KEYOUT> {
    	public void reduce(Text key, Iterable<VALUEIN> values, Context context)
    			throws IOException, InterruptedException {
                    //reducer逻辑
    		context.write(null, put or delete);
    	}
    }

    reducer继承的是TableReducer类。后边指定三个泛型参数,前两个必须对应map过程的输出key/value类型,第三个必须是 put或者delete。write的时候可以把key写null,它是不必要的。这样reducer输出的数据会自动插入outputTable指定的 表内。

    2、有时候我们需要数据源是hdfs的文本,输出对象是hbase。这时候变化也很简单:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    Configuration conf = HBaseConfiguration.create();
    Job job = new Job(conf, "job name ");
    job.setJarByClass(test.class);
     
    job.setMapperClass(mapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    FileInputFormat.setInputPaths(job, path);
     
    TableMapReduceUtil.initTableReducerJob(tableName,
    				reducer.class, job);

    你会发现只需要像平常的mapreduce的作业声明过程一样,指定mapper的执行类和输出key/value类型,指定 FileInputFormat.setInputPaths的数据源路径,输出声明不变。便完成了从hdfs文本读取数据输出到hbase的命令声明过 程。 mapper和reducer如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    
    public class mapper extends Mapper<LongWritable,Writable,Writable,Writable> {
    	public void map(LongWritable key, Text line, Context context) {
    		 //mapper逻辑
    		 context.write(k, one);
    	}
    }
    public class redcuer extends
    		TableReducer<KEYIN, VALUEIN, KEYOUT> {
    	public void reduce(Writable key, Iterable<Writable> values, Context context)
    			throws IOException, InterruptedException {
                    //reducer逻辑
    		context.write(null, put or delete);
    	}
    }

    mapper还依旧继承原来的MapReduce类中的Mapper即可。同样注意这前后数据类型的key/value一直性。

    3、最后就是从hbase中的表作为数据源读取,hdfs作为数据输出,简单的如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    Configuration conf = HBaseConfiguration.create();
    Job job = new Job(conf, "job name ");
    job.setJarByClass(test.class);
    Scan scan = new Scan();
    TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,
    		Writable.class, Writable.class, job);
    job.setOutputKeyClass(Writable.class);
    job.setOutputValueClass(Writable.class);
    FileOutputFormat.setOutputPath(job, Path);
    job.waitForCompletion(true);

    mapper和reducer简单如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    public class mapper extends
    		TableMapper<KEYOUT, VALUEOUT>{
     
    	public void map(Writable key, Writable value, Context context)
    			throws IOException, InterruptedException {
    		        //mapper逻辑
    			context.write(key, value);
    		}
     
    	}
    }
     
    public class reducer extends
    		Reducer<Writable,Writable,Writable,Writable>  {
     
    	public void reducer(Writable key, Writable value, Context context)
    			throws IOException, InterruptedException {
    		        //reducer逻辑
    			context.write(key, value);
    		}
    	}
    }

    最后说一下TableMapper和TableReducer的本质,其实这俩类就是为了简化一下书写代码,因为传入的4个泛型参数里都会有固定的参数类型,所以是Mapper和Reducer的简化版本,本质他们没有任何区别。源码如下:

    1
    2
    3
    4
    5
    6
    7
    
    public abstract class TableMapper<KEYOUT, VALUEOUT>
    extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
    }
     
    public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
    extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
    }

    好了,可以去写第一个wordcount的hbase mapreduce程序了。

    ps:另外详细的读写优化可以看之后的一篇,更为详尽的内容看这里

  • 相关阅读:
    那些离不开的 Chrome 扩展插件
    Spring Boot 实战 —— 入门
    Maven 学习笔记
    Linux lvm 分区知识笔记
    Linux 双向 SSH 免密登录
    CentOS Yum 源搭建
    Ubuntu 系统学习
    iOS 测试三方 KIF 的那些事
    Swift 网络请求数据与解析
    iOS Plist 文件的 增 删 改
  • 原文地址:https://www.cnblogs.com/end/p/2814819.html
Copyright © 2011-2022 走看看