zoukankan      html  css  js  c++  java
  • MapReduce和HBase集成(Apache版本和CDH版本)

    项目源码:https://github.com/cw1322311203/hbasedemo/tree/master/hbase-mr

    通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如我们从HBase中读取一些原始数据后使用MapReduce做数据分析。

    1. 官方HBase-MapReduce

    1. 查看HBase的MapReduce任务的执行

      $ bin/hbase mapredcp
      
    2. 环境变量的导入

      1. 执行环境变量的导入(临时生效,在命令行执行下述操作)
      $ export HBASE_HOME=/opt/module/hbase-1.3.1
      $ export HADOOP_HOME=/opt/module/hadoop-2.7.2
      $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp
      
      1. 永久生效:在/etc/profile配置
      export HBASE_HOME=/opt/module/hbase-1.3.1
      export HADOOP_HOME=/opt/module/hadoop-2.7.2
      

      并在hadoop-env.sh中配置:(注意:在for循环之后配)

      export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
      
    3. 运行官方的MapReduce任务

      案例一:统计Student表中有多少行数据

      $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
      

      案例二:使用MapReduce将本地数据导入到HBase

      1. 在本地创建一个tsv格式的文件:fruit.tsv

        1001	Apple	Red
        1002	Pear		Yellow
        1003	Pineapple	Yellow
        
      2. 创建HBase表

        hbase(main):001:0> create 'fruit','info'
        
      3. 在HDFS中创建input_fruit文件夹并上传fruit.tsv文件

        $ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
        $ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
        
      4. 执行MapReduce到HBase的fruit表中

        $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv 
        -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit 
        hdfs://hadoop102:9000/input_fruit
        
        # cdh版本命令
        $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/hbase-server-1.2.0-cdh5.16.2.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://cm1.cdh.com:8020/input_fruit
        
      5. 使用scan命令查看导入后的结果

        hbase(main):001:0> scan 'fruit' 
        

    2.cdh环境下HBase和MapReduce的集成

    1. 配置环境变量(每台机器都要配置)

      vim /etc/profile
      
      export HBASE_HOME=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase
      export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop
      
      source /etc/profile
      
    2. 修改hadoop-env.sh

      vim /etc/hadoop/conf/hadoop-env.sh
      
      # 每台机器均加入
      
      export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/*
      
      source /etc/hadoop/conf/hadoop-env.sh
      
    3. 测试

      先重启一下集群,再输入以下命令

      cd /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8
      
      /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/hbase-server-1.2.0-cdh5.16.2.jar rowcounter student
      

    3. 自定义HBase-MapReduce1

    目标:使用MapReduce将hdfs本地数据导入到HBase表中

    分步实现:

    1. 构建FruitMapper类,用于读取本地数据

      package com.cw.bigdata.mr1;
      
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      import java.io.IOException;
      
      public class FruitMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              context.write(key, value);
          }
      }
      
      
    2. 构建FruitReducer类,用于将读取到的数据写入到hbase中的fruit1表中

      package com.cw.bigdata.mr1;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.mapreduce.TableReducer;
      import org.apache.hadoop.hbase.util.Bytes;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      
      import java.io.IOException;
      
      public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
      
          @Override
          protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      
              // 1.遍历values:1001  Apple   Red
              for (Text value : values) {
      
                  // 2.获取每一行数据
                  String[] fields = value.toString().split("	");
      
                  // 3.构建Put对象
                  Put put = new Put(Bytes.toBytes(fields[0]));
      
                  // 4.给Put对象赋值
                  put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(fields[1]));
                  put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(fields[2]));
      
                  // 5.写出
                  context.write(NullWritable.get(), put);
              }
          }
      }
      
      
    3. 构建FruitDriver implements Tool用于组装运行Job任务

      package com.cw.bigdata.mr1;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      public class FruitDriver implements Tool {
      
          // 定义一个Configuration
          private Configuration configuration = null;
      
          public int run(String[] args) throws Exception {
      
              // 1.获取Job对象
              Job job = Job.getInstance(configuration);
      
              // 2.设置驱动类路径
              job.setJarByClass(FruitDriver.class);
      
              // 3.设置Mapper和Mapper输出的KV类型
              job.setMapperClass(FruitMapper.class);
              job.setMapOutputKeyClass(LongWritable.class);
              job.setMapOutputValueClass(Text.class);
      
              // 4.设置Reducer类
              TableMapReduceUtil.initTableReducerJob(args[1],
                      FruitReducer.class,
                      job);
      
              // 5.设置输入输出参数
              FileInputFormat.setInputPaths(job, new Path(args[0]));
      
              // 6.提交任务
              boolean result = job.waitForCompletion(true);
      
              return result ? 0 : 1;
          }
      
          public void setConf(Configuration conf) {
              configuration = conf;
          }
      
          public Configuration getConf() {
              return configuration;
          }
      
          public static void main(String[] args) {
              try {
                  Configuration configuration = new Configuration();
                  int run = ToolRunner.run(configuration, new FruitDriver(), args);
      
                  System.exit(run);
                  
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      
    4. 打包运行任务

      $ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.cw.bigdata.mr1.FruitDriver /input_fruit/fruit.tsv fruit1
      
      # CDH版命令
      # fruit1为HBase中的表名   /input_fruit/fruit.tsv为hdfs上要上传的数据
      $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /root/jars/hbaseapi-1.0.jar com.cw.bigdata.mr1.FruitDriver /input_fruit/fruit.tsv fruit1
      

      提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。

      提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

    4. 自定义HBase-MapReduce2

    目标:将HBase中fruit1表中的一部分数据(name列),通过MR迁入到HBase的fruit2表中。

    1. 构建Fruit2Mapper类,用于读取fruit1表中的数据

      package com.cw.bigdata.mr2;
      
      import org.apache.hadoop.hbase.Cell;
      import org.apache.hadoop.hbase.CellUtil;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.client.Result;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableMapper;
      import org.apache.hadoop.hbase.util.Bytes;
      
      import java.io.IOException;
      
      public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> {
          @Override
          protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
      
              // 构建Put对象
              Put put = new Put(key.get());
      
              // 1.获取数据
              for (Cell cell : value.rawCells()) {
      
                  // 2.判断当前的cell是否为"name"列
                  if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                      // 3.给Put对象赋值
                      put.add(cell);
                  }
              }
      
              // 4.写出
              context.write(key, put);
          }
      }
      
      
    2. 构建Fruit2Reducer类,用于将读取到的fruit1表中的数据写入到fruit2表中

      package com.cw.bigdata.mr2;
      
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableReducer;
      import org.apache.hadoop.io.NullWritable;
      
      import java.io.IOException;
      
      public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
          @Override
          protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
      
              // 遍历写出
              for (Put put : values) {
                  context.write(NullWritable.get(), put);
              }
          }
      }
      
      
    3. 构建Fruit2Driver implements Tool用于组装运行Job任务

      package com.cw.bigdata.mr2;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.hbase.HBaseConfiguration;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.client.Scan;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      public class Fruit2Driver implements Tool {
      
          // 定义配置信息
          private Configuration configuration = null;
      
          public int run(String[] args) throws Exception {
      
              // 1.获取Job对象
              Job job = Job.getInstance(configuration);
      
              // 2.设置主类路径
              job.setJarByClass(Fruit2Driver.class);
      
              // 3.设置Mapper和输出KV类型
              TableMapReduceUtil.initTableMapperJob("fruit1",
                      new Scan(),
                      Fruit2Mapper.class,
                      ImmutableBytesWritable.class,
                      Put.class,
                      job);
      
              // 4.设置Reducer&输出的表
              TableMapReduceUtil.initTableReducerJob("fruit2",
                      Fruit2Reducer.class,
                      job);
      
              // 5.提交任务
              boolean result = job.waitForCompletion(true);
      
              return result ? 0 : 1;
          }
      
          public void setConf(Configuration conf) {
              configuration = conf;
          }
      
          public Configuration getConf() {
              return configuration;
          }
      
          public static void main(String[] args) {
              try {
                  Configuration configuration = HBaseConfiguration.create();
                  ToolRunner.run(configuration, new Fruit2Driver(), args);
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      
    4. 打包运行任务

      $ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.cw.bigdata.mr2.FruitDriver
      
      # CDH版命令
      # fruit1为HBase中的表名   /input_fruit/fruit.tsv为hdfs上要上传的数据
      $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /root/jars/hbaseapi-1.0.jar com.cw.bigdata.mr2.FruitDriver
      

      提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。

      提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
      提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。

  • 相关阅读:
    认识“委托”
    程序员的修炼之道:从小工到专家(一)
    知识的使用 与 知识的内化
    VB.Net中 Module 的前世今生
    memcached
    C#知识
    Android之垂直显示TextView
    Android开发之各个语言
    Android之hint提示字体大小修改,显示完全
    Android 之计算控件颜色透明度
  • 原文地址:https://www.cnblogs.com/chenxiaoge/p/13335436.html
Copyright © 2011-2022 走看看