zoukankan      html  css  js  c++  java
  • 自定义 HBase-MapReduce1

    自定义 HBase-MapReduce1

    目标:将 fruit 表中的一部分数据(列为 name 的数据),通过 MR 迁入到 fruit_mr 表中。
    分步实现:

    1.构建 FruitMapper 类,用于读取 fruit 表中的数据

    package com.atlxl.mr1;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put>{
    
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
    
            //构建Put对象
            Put put = new Put(key.get());
    
            //遍历数据
            Cell[] cells = value.rawCells();
            for (Cell cell : cells) {
                if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    put.add(cell);
                }
            }
    
            //写出去
            context.write(key, put);
    
        }
    }
     
     

    2. 构建 FruitReducer 类,用于将读取到的 fruit 表中的数据写入到 fruit_mr表中

     
    package com.atlxl.mr1;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{
    
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    
            //遍历写出
            for (Put value : values) {
                context.write(NullWritable.get(), value);
            }
    
        }
    }
     
     
     
     

    3.构建 FruitDriver extends Configured implements Tool 用于组装运行 Job任务

    package com.atlxl.mr1;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class FruitDriver extends Configuration implements Tool{
    
        private Configuration configuration = null;
    
        public int run(String[] strings) throws Exception {
    
            //获取任务对象
            Job job = Job.getInstance(configuration);
    
            //指定Driver类
            job.setJarByClass(FruitDriver.class);
    
            //指定Mapper
            TableMapReduceUtil.initTableMapperJob("fruit",new Scan(),FruitMapper.class, ImmutableBytesWritable.class,Put.class,job);
    
            //指定Reducer
            TableMapReduceUtil.initTableReducerJob("fruit_mr", FruitReducer.class, job);
    
            //提交
            boolean b = job.waitForCompletion(true);
    
            return b?0:1;
        }
    
        public void setConf(Configuration conf) {
            this.configuration = conf;
        }
    
        public Configuration getConf() {
            return configuration;
        }

    //4.主函数中调用运行该 Job 任务

    
        public static void main(String[] args) throws Exception {
    
            Configuration configuration = HBaseConfiguration.create();
    
            int i = ToolRunner.run(configuration, new FruitDriver(), args);
    
        }
    }

    5.打包运行任务

    提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。
    提示:maven 打包命令:-P local clean package 或-P dev clean package install(将第三方 jar 包
    一同打包,需要插件:maven-shade-plugin)

    1)将打好的jar包丢到hbase目录下

    2)创建接受数据的表

    hbase(main):005:0>  create 'fruit_mr','info'

    3)运行jar包

    [lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr1.FruitDriver

    4)查看导入的数据

    hbase(main):006:0> scan "fruit_mr"
    ROW                        COLUMN+CELL                                                                
     1001                      column=info:name, timestamp=1560441335521, value=Apple                     
     1002                      column=info:name, timestamp=1560441335521, value=Pear                      
     1003                      column=info:name, timestamp=1560441335521, value=Pineapple                 
    3 row(s) in 0.1330 seconds

    自定义 HBase-MapReduce2

    目标:实现将 HDFS 中的数据写入到 HBase 表中。
    分步实现:

    1.构建 HDFSMapper 于读取 HDFS 中的文件数据

    package com.atlxl.mr2;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class HDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取一行数据
            String line = value.toString();
    
    
            //切割
            String[] split = line.split("	");
    
    
            //封装Put对象
            Put put = new Put(Bytes.toBytes(split[0]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
    
    
            //写出
            context.write(NullWritable.get(), put);
    
    
        }
    }

    2.构建 HDFSReducer 类

    package com.atlxl.mr2;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class HDFSReducer extends TableReducer<NullWritable, Put,NullWritable>{
    
        @Override
        protected void reduce(NullWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    
            //遍历写出
            for (Put value : values) {
                context.write(NullWritable.get(), value);
            }
    
        }
    }

    3.创建HDFSDriver 组装 Job

    package com.atlxl.mr2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class HDFSDriver extends Configuration implements Tool{
    
        private Configuration configuration = null;
    
        public int run(String[] args) throws Exception {
    
            //获取Job对象
            Job job = Job.getInstance(configuration);
    
            //设置主类
            job.setJarByClass(HDFSDriver.class);
    
            //设置Mapper
            job.setMapperClass(HDFSMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Put.class);
    
            //设置Reducer
            TableMapReduceUtil.initTableReducerJob("fruit2", HDFSReducer.class, job);
    
            //设置输入路径
            FileInputFormat.setInputPaths(job, args[0]);
    
            //提交
            boolean result = job.waitForCompletion(true);
    
            return result?0:1;
        }
    
        public void setConf(Configuration conf) {
            configuration = conf;
        }
    
        public Configuration getConf() {
            return configuration;
        }

    4.调用执行 Job

    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            int i = ToolRunner.run(configuration, new HDFSDriver(), args);
    
            System.exit(i);
    
        }
    
    }

    5.打包运行

     输入路径为:HDFS

    [lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr2.HDFSDriver /fruit.tsv

    输入路径为:本地hbase包下

    [lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr2.HDFSDriver file:///opt/module/hbase/fruit.tsv 
  • 相关阅读:
    使用C#调用C++类库
    C# IntPtr类型
    C# 调用C++ dll string类型返回
    C# try、catch、finally语句
    C语言 char *、char []、const char *、string的区别与相互转换
    C# 字符串string与char数组互转!
    C#如何调用C++(进阶篇)
    Springboot通过过滤器实现对请求头的修改
    【spring事务】
    命令行参数库:McMaster.Extensions.CommandLineUtils【转】
  • 原文地址:https://www.cnblogs.com/LXL616/p/11018793.html
Copyright © 2011-2022 走看看