zoukankan      html  css  js  c++  java
  • 自定义 HBase-MapReduce1

    自定义 HBase-MapReduce1

    目标:将 fruit 表中的一部分数据(列为 name 的数据),通过 MR 迁入到 fruit_mr 表中。
    分步实现:

    1.构建 FruitMapper 类,用于读取 fruit 表中的数据

    package com.atlxl.mr1;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put>{
    
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
    
            //构建Put对象
            Put put = new Put(key.get());
    
            //遍历数据
            Cell[] cells = value.rawCells();
            for (Cell cell : cells) {
                if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    put.add(cell);
                }
            }
    
            //写出去
            context.write(key, put);
    
        }
    }
     
     

    2. 构建 FruitReducer 类,用于将读取到的 fruit 表中的数据写入到 fruit_mr表中

     
    package com.atlxl.mr1;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{
    
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    
            //遍历写出
            for (Put value : values) {
                context.write(NullWritable.get(), value);
            }
    
        }
    }
     
     
     
     

    3.构建 FruitDriver extends Configured implements Tool 用于组装运行 Job任务

    package com.atlxl.mr1;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class FruitDriver extends Configuration implements Tool{
    
        private Configuration configuration = null;
    
        public int run(String[] strings) throws Exception {
    
            //获取任务对象
            Job job = Job.getInstance(configuration);
    
            //指定Driver类
            job.setJarByClass(FruitDriver.class);
    
            //指定Mapper
            TableMapReduceUtil.initTableMapperJob("fruit",new Scan(),FruitMapper.class, ImmutableBytesWritable.class,Put.class,job);
    
            //指定Reducer
            TableMapReduceUtil.initTableReducerJob("fruit_mr", FruitReducer.class, job);
    
            //提交
            boolean b = job.waitForCompletion(true);
    
            return b?0:1;
        }
    
        public void setConf(Configuration conf) {
            this.configuration = conf;
        }
    
        public Configuration getConf() {
            return configuration;
        }

    //4.主函数中调用运行该 Job 任务

    
        public static void main(String[] args) throws Exception {
    
            Configuration configuration = HBaseConfiguration.create();
    
            int i = ToolRunner.run(configuration, new FruitDriver(), args);
    
        }
    }

    5.打包运行任务

    提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。
    提示:maven 打包命令:-P local clean package 或-P dev clean package install(将第三方 jar 包
    一同打包,需要插件:maven-shade-plugin)

    1)将打好的jar包丢到hbase目录下

    2)创建接受数据的表

    hbase(main):005:0>  create 'fruit_mr','info'

    3)运行jar包

    [lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr1.FruitDriver

    4)查看导入的数据

    hbase(main):006:0> scan "fruit_mr"
    ROW                        COLUMN+CELL                                                                
     1001                      column=info:name, timestamp=1560441335521, value=Apple                     
     1002                      column=info:name, timestamp=1560441335521, value=Pear                      
     1003                      column=info:name, timestamp=1560441335521, value=Pineapple                 
    3 row(s) in 0.1330 seconds

    自定义 HBase-MapReduce2

    目标:实现将 HDFS 中的数据写入到 HBase 表中。
    分步实现:

    1.构建 HDFSMapper 于读取 HDFS 中的文件数据

    package com.atlxl.mr2;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class HDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取一行数据
            String line = value.toString();
    
    
            //切割
            String[] split = line.split("	");
    
    
            //封装Put对象
            Put put = new Put(Bytes.toBytes(split[0]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
    
    
            //写出
            context.write(NullWritable.get(), put);
    
    
        }
    }

    2.构建 HDFSReducer 类

    package com.atlxl.mr2;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class HDFSReducer extends TableReducer<NullWritable, Put,NullWritable>{
    
        @Override
        protected void reduce(NullWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    
            //遍历写出
            for (Put value : values) {
                context.write(NullWritable.get(), value);
            }
    
        }
    }

    3.创建HDFSDriver 组装 Job

    package com.atlxl.mr2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class HDFSDriver extends Configuration implements Tool{
    
        private Configuration configuration = null;
    
        public int run(String[] args) throws Exception {
    
            //获取Job对象
            Job job = Job.getInstance(configuration);
    
            //设置主类
            job.setJarByClass(HDFSDriver.class);
    
            //设置Mapper
            job.setMapperClass(HDFSMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Put.class);
    
            //设置Reducer
            TableMapReduceUtil.initTableReducerJob("fruit2", HDFSReducer.class, job);
    
            //设置输入路径
            FileInputFormat.setInputPaths(job, args[0]);
    
            //提交
            boolean result = job.waitForCompletion(true);
    
            return result?0:1;
        }
    
        public void setConf(Configuration conf) {
            configuration = conf;
        }
    
        public Configuration getConf() {
            return configuration;
        }

    4.调用执行 Job

    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            int i = ToolRunner.run(configuration, new HDFSDriver(), args);
    
            System.exit(i);
    
        }
    
    }

    5.打包运行

     输入路径为:HDFS

    [lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr2.HDFSDriver /fruit.tsv

    输入路径为:本地hbase包下

    [lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr2.HDFSDriver file:///opt/module/hbase/fruit.tsv 
  • 相关阅读:
    Different AG groups have the exactly same group_id value if the group names are same and the ‘CLUSTER_TYPE = EXTERNAL/NONE’
    An example of polybase for Oracle
    use azure data studio to create external table for oracle
    Missing MSI and MSP files
    You may fail to backup log or restore log after TDE certification/key rotation.
    Password is required when adding a database to AG group if the database has a master key
    Use KTPASS instead of adden to configure mssql.keytab
    ardunio+舵机
    android webview 全屏100%显示图片
    glide 长方形图片显示圆角问题
  • 原文地址:https://www.cnblogs.com/LXL616/p/11018793.html
Copyright © 2011-2022 走看看