zoukankan      html  css  js  c++  java
  • MapReduce和HBase集成(Apache版本和CDH版本)

    项目源码:https://github.com/cw1322311203/hbasedemo/tree/master/hbase-mr

    通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如我们从HBase中读取一些原始数据后使用MapReduce做数据分析。

    1. 官方HBase-MapReduce

    1. 查看HBase的MapReduce任务的执行

      $ bin/hbase mapredcp
      
    2. 环境变量的导入

      1. 执行环境变量的导入(临时生效,在命令行执行下述操作)
      $ export HBASE_HOME=/opt/module/hbase-1.3.1
      $ export HADOOP_HOME=/opt/module/hadoop-2.7.2
      $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp
      
      1. 永久生效:在/etc/profile配置
      export HBASE_HOME=/opt/module/hbase-1.3.1
      export HADOOP_HOME=/opt/module/hadoop-2.7.2
      

      并在hadoop-env.sh中配置:(注意:在for循环之后配)

      export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
      
    3. 运行官方的MapReduce任务

      案例一:统计Student表中有多少行数据

      $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
      

      案例二:使用MapReduce将本地数据导入到HBase

      1. 在本地创建一个tsv格式的文件:fruit.tsv

        1001	Apple	Red
        1002	Pear		Yellow
        1003	Pineapple	Yellow
        
      2. 创建HBase表

        hbase(main):001:0> create 'fruit','info'
        
      3. 在HDFS中创建input_fruit文件夹并上传fruit.tsv文件

        $ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
        $ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
        
      4. 执行MapReduce到HBase的fruit表中

        $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv 
        -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit 
        hdfs://hadoop102:9000/input_fruit
        
        # cdh版本命令
        $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/hbase-server-1.2.0-cdh5.16.2.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://cm1.cdh.com:8020/input_fruit
        
      5. 使用scan命令查看导入后的结果

        hbase(main):001:0> scan 'fruit' 
        

    2.cdh环境下HBase和MapReduce的集成

    1. 配置环境变量(每台机器都要配置)

      vim /etc/profile
      
      export HBASE_HOME=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase
      export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop
      
      source /etc/profile
      
    2. 修改hadoop-env.sh

      vim /etc/hadoop/conf/hadoop-env.sh
      
      # 每台机器均加入
      
      export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/*
      
      source /etc/hadoop/conf/hadoop-env.sh
      
    3. 测试

      先重启一下集群,再输入以下命令

      cd /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8
      
      /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hbase/lib/hbase-server-1.2.0-cdh5.16.2.jar rowcounter student
      

    3. 自定义HBase-MapReduce1

    目标:使用MapReduce将hdfs本地数据导入到HBase表中

    分步实现:

    1. 构建FruitMapper类,用于读取本地数据

      package com.cw.bigdata.mr1;
      
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      import java.io.IOException;
      
      public class FruitMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              context.write(key, value);
          }
      }
      
      
    2. 构建FruitReducer类,用于将读取到的数据写入到hbase中的fruit1表中

      package com.cw.bigdata.mr1;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.mapreduce.TableReducer;
      import org.apache.hadoop.hbase.util.Bytes;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      
      import java.io.IOException;
      
      public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
      
          @Override
          protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      
              // 1.遍历values:1001  Apple   Red
              for (Text value : values) {
      
                  // 2.获取每一行数据
                  String[] fields = value.toString().split("	");
      
                  // 3.构建Put对象
                  Put put = new Put(Bytes.toBytes(fields[0]));
      
                  // 4.给Put对象赋值
                  put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(fields[1]));
                  put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(fields[2]));
      
                  // 5.写出
                  context.write(NullWritable.get(), put);
              }
          }
      }
      
      
    3. 构建FruitDriver implements Tool用于组装运行Job任务

      package com.cw.bigdata.mr1;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      public class FruitDriver implements Tool {
      
          // 定义一个Configuration
          private Configuration configuration = null;
      
          public int run(String[] args) throws Exception {
      
              // 1.获取Job对象
              Job job = Job.getInstance(configuration);
      
              // 2.设置驱动类路径
              job.setJarByClass(FruitDriver.class);
      
              // 3.设置Mapper和Mapper输出的KV类型
              job.setMapperClass(FruitMapper.class);
              job.setMapOutputKeyClass(LongWritable.class);
              job.setMapOutputValueClass(Text.class);
      
              // 4.设置Reducer类
              TableMapReduceUtil.initTableReducerJob(args[1],
                      FruitReducer.class,
                      job);
      
              // 5.设置输入输出参数
              FileInputFormat.setInputPaths(job, new Path(args[0]));
      
              // 6.提交任务
              boolean result = job.waitForCompletion(true);
      
              return result ? 0 : 1;
          }
      
          public void setConf(Configuration conf) {
              configuration = conf;
          }
      
          public Configuration getConf() {
              return configuration;
          }
      
          public static void main(String[] args) {
              try {
                  Configuration configuration = new Configuration();
                  int run = ToolRunner.run(configuration, new FruitDriver(), args);
      
                  System.exit(run);
                  
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      
    4. 打包运行任务

      $ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.cw.bigdata.mr1.FruitDriver /input_fruit/fruit.tsv fruit1
      
      # CDH版命令
      # fruit1为HBase中的表名   /input_fruit/fruit.tsv为hdfs上要上传的数据
      $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /root/jars/hbaseapi-1.0.jar com.cw.bigdata.mr1.FruitDriver /input_fruit/fruit.tsv fruit1
      

      提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。

      提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

    4. 自定义HBase-MapReduce2

    目标:将HBase中fruit1表中的一部分数据(name列),通过MR迁入到HBase的fruit2表中。

    1. 构建Fruit2Mapper类,用于读取fruit1表中的数据

      package com.cw.bigdata.mr2;
      
      import org.apache.hadoop.hbase.Cell;
      import org.apache.hadoop.hbase.CellUtil;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.client.Result;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableMapper;
      import org.apache.hadoop.hbase.util.Bytes;
      
      import java.io.IOException;
      
      public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> {
          @Override
          protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
      
              // 构建Put对象
              Put put = new Put(key.get());
      
              // 1.获取数据
              for (Cell cell : value.rawCells()) {
      
                  // 2.判断当前的cell是否为"name"列
                  if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                      // 3.给Put对象赋值
                      put.add(cell);
                  }
              }
      
              // 4.写出
              context.write(key, put);
          }
      }
      
      
    2. 构建Fruit2Reducer类,用于将读取到的fruit1表中的数据写入到fruit2表中

      package com.cw.bigdata.mr2;
      
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableReducer;
      import org.apache.hadoop.io.NullWritable;
      
      import java.io.IOException;
      
      public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
          @Override
          protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
      
              // 遍历写出
              for (Put put : values) {
                  context.write(NullWritable.get(), put);
              }
          }
      }
      
      
    3. 构建Fruit2Driver implements Tool用于组装运行Job任务

      package com.cw.bigdata.mr2;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.hbase.HBaseConfiguration;
      import org.apache.hadoop.hbase.client.Put;
      import org.apache.hadoop.hbase.client.Scan;
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
      import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      public class Fruit2Driver implements Tool {
      
          // 定义配置信息
          private Configuration configuration = null;
      
          public int run(String[] args) throws Exception {
      
              // 1.获取Job对象
              Job job = Job.getInstance(configuration);
      
              // 2.设置主类路径
              job.setJarByClass(Fruit2Driver.class);
      
              // 3.设置Mapper和输出KV类型
              TableMapReduceUtil.initTableMapperJob("fruit1",
                      new Scan(),
                      Fruit2Mapper.class,
                      ImmutableBytesWritable.class,
                      Put.class,
                      job);
      
              // 4.设置Reducer&输出的表
              TableMapReduceUtil.initTableReducerJob("fruit2",
                      Fruit2Reducer.class,
                      job);
      
              // 5.提交任务
              boolean result = job.waitForCompletion(true);
      
              return result ? 0 : 1;
          }
      
          public void setConf(Configuration conf) {
              configuration = conf;
          }
      
          public Configuration getConf() {
              return configuration;
          }
      
          public static void main(String[] args) {
              try {
                  Configuration configuration = HBaseConfiguration.create();
                  ToolRunner.run(configuration, new Fruit2Driver(), args);
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      
    4. 打包运行任务

      $ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.cw.bigdata.mr2.FruitDriver
      
      # CDH版命令
      # fruit1为HBase中的表名   /input_fruit/fruit.tsv为hdfs上要上传的数据
      $ /opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/yarn jar /root/jars/hbaseapi-1.0.jar com.cw.bigdata.mr2.FruitDriver
      

      提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。

      提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
      提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。

  • 相关阅读:
    〖Linux〗Kubuntu设置打开应用时就只在打开时的工作区显示
    〖Linux〗Kubuntu, the application 'Google Chrome' has requested to open the wallet 'kdewallet'解决方法
    unity, dll is not allowed to be included or could not be found
    android check box 自定义图片
    unity, ios skin crash
    unity, Collider2D.bounds的一个坑
    unity, ContentSizeFitter立即生效
    类里的通用成员函数应声明为static
    unity, Gizmos.DrawMesh一个坑
    直线切割凹多边形
  • 原文地址:https://www.cnblogs.com/chenxiaoge/p/13335436.html
Copyright © 2011-2022 走看看