zoukankan      html  css  js  c++  java
  • 十九、Hadoop学记笔记————Hbase和MapReduce

    概要:

    hadoop和hbase导入环境变量:

    要运行Hbase中自带的MapReduce程序,需要运行如下指令,可在官网中找到:

    如果遇到如下问题,则说明Hadoop的MapReduce没有权限访问Hbase的jar包:

    参考官网可解决:

    运行后解决:

    导入数据运行指令:

    tsv是指以制表符为分隔符的文件

    先创建测试数据,创建user文件:

    上传至hdfs,并且启动hbase shell:

    创建表:

    之后导入数据:

    还有一些其他的方法,比如rowcounter统计行数:

    接下来演示用sqoop将mysql数据考入hbase,构建测试数据:

    使用import,需要先配置hbase环境变量:

    Hbase表数据的迁移:

    之后编写MapReduce程序,代码如下:

    package com.tyx.hbase.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    public class Tab2TabMapReduce extends Configured implements Tool {
    
        // mapper class
        public static class TabMapper extends TableMapper<Text, Put> {
            private Text rowkey = new Text();
            
            @Override
            protected void map(ImmutableBytesWritable key, Result value,Context context)
                    throws IOException, InterruptedException {
                byte[] bytes = key.get();
                rowkey.set(Bytes.toString(bytes));
                
                Put put = new Put(bytes);
                
                for (Cell cell : value.rawCells()) {
                    // add cell
                    if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                        if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                            put.add(cell);
                        }
                    }
                }
                
                context.write(rowkey, put);
            }
        }
        
        // reduce class
        public static class TabReduce extends TableReducer<Text,Put, ImmutableBytesWritable> {
            @Override
            protected void reduce(Text key, Iterable<Put> values,Context context)
                    throws IOException, InterruptedException {
                for (Put put : values) {
                    context.write(null, put);
                }
                
            }
        }
        
        @Override
        public int run(String[] args) throws Exception {
            //create job
            Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
            
            // set run class
            job.setJarByClass(this.getClass());
            
            Scan scan = new Scan();
            scan.setCaching(500);
            scan.setCacheBlocks(false);
            
            // set mapper
            TableMapReduceUtil.initTableMapperJob(
                    "tab1", // input table
                    scan , // scan instance   
                    TabMapper.class,  // set mapper class
                    Text.class, // mapper output key 
                    Put.class, //mapper output value 
                    job // set job
            );
            
            TableMapReduceUtil.initTableReducerJob(
                    "tab2" , // output table 
                    TabReduce.class, // set reduce class 
                    job // set job
            );
            
            job.setNumReduceTasks(1);
            
            boolean b = job.waitForCompletion(true);
            
            if(!b) {
                System.err.print("error with job!!!");
            }
            
            
            
            
            
            
            
            
            
            
            return 0;
        }
        
        public static void main(String[] args) throws Exception {
            
            //create config
            Configuration config = HBaseConfiguration.create();
            
            //submit job
            int status = ToolRunner.run(config, new Tab2TabMapReduce(), args);
            
            //exit
            System.exit(status);
        }
    
    }

    运行指令:

    接下来是hdfs中文件导入Hbase:

    构造数据:

    然后编写MapReduce程序:

    package com.jkxy.hbase.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class HDFS2TabMapReduce extends Configured implements Tool{
        
        public static class HDFS2TabMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
            
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
            
            @Override
            protected void map(LongWritable key, Text value,Context context)
                    throws IOException, InterruptedException {
                
                String[] words = value.toString().split("	");
                //rk0001    zhangsan    33
                
                Put put = new Put(Bytes.toBytes(words[0]));
                put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(words[1]));
                put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(words[2]));
                
                rowkey.set(Bytes.toBytes(words[0]));
                
                context.write(rowkey, put);
            }
        }
        
        @Override
        public int run(String[] args) throws Exception {
            
            // create job
            Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
            
            // set class
            job.setJarByClass(this.getClass());
            
            // set path
            FileInputFormat.addInputPath(job, new Path(args[0]));
            
            //set mapper
            job.setMapperClass(HDFS2TabMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            
            // set reduce
            TableMapReduceUtil.initTableReducerJob(
                    "user", // set table
                    null, 
                    job);
            job.setNumReduceTasks(0);
            
            boolean b = job.waitForCompletion(true);
            
            if(!b) {
                throw new IOException("error with job!!!");
            }
            
            return 0;
        }
        
        public static void main(String[] args) throws Exception {
            //get configuration
            Configuration conf = HBaseConfiguration.create();
            
            //submit job
            int status = ToolRunner.run(conf, new HDFS2TabMapReduce(), args);
            
            //exit
            System.exit(status);
        }
    
    }

    运行指令

    接下来演示使用BulkLaod将数据从Hdfs导入Hbase,使用该方式可以绕过WAL,memstor等步骤,加快海量数据的效率,代码如下:

    package com.jkxy.hbase.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
    import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class HFile2TabMapReduce extends Configured implements Tool {
    
        public static class HFile2TabMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
            
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
            
            @Override
            protected void map(LongWritable key, Text value,Context context)
                    throws IOException, InterruptedException {
                
                String[] words = value.toString().split("	");
                
                Put put = new Put(Bytes.toBytes(words[0]));
                put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(words[1]));
                put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(words[2]));
                rowkey.set(Bytes.toBytes(words[0]));
                
                context.write(rowkey, put);
            }
        }
        
        
        @Override
        public int run(String[] args) throws Exception {
            
            //create job
            Job job = Job.getInstance(getConf(), this.getClass().getSimpleName());
            
            // set run jar class
            job.setJarByClass(this.getClass());
            
            // set input . output
            FileInputFormat.addInputPath(job, new Path(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[2]));
            
            // set map
            job.setMapperClass(HFile2TabMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            
            // set reduce
            job.setReducerClass(PutSortReducer.class);
            
            HTable table = new HTable(getConf(), args[0]);
            // set hfile output
            HFileOutputFormat2.configureIncrementalLoad(job, table );
            
            // submit job
            boolean b = job.waitForCompletion(true);
            if(!b) {
                throw new IOException(" error with job !!!");
            }
            LoadIncrementalHFiles loader = new LoadIncrementalHFiles(getConf());
            // load hfile
            loader.doBulkLoad(new Path(args[2]), table);
            
            
            
            return 0;
        }
        
        public static void main(String[] args) throws Exception {
            // get configuration
            Configuration conf = HBaseConfiguration.create();
            
            //run job
            int status = ToolRunner.run(conf, new HFile2TabMapReduce(), args);
            
            // exit
            System.exit(status);
            
        }
    
    }

    使用如下指令:

    人生苦短,远离IT脱离苦海
  • 相关阅读:
    QEMU裸机开发之S模式中断设置
    ARM64 的 memcpy 优化与实现
    RISCV from scratch 4: Creating a function prologue for our UART driver (2 / 3)
    RISCV MCU堆栈机制
    riscv 中断处理
    Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security问题解决
    每日学习
    每日学习
    每日学习
    每日学习
  • 原文地址:https://www.cnblogs.com/liuxiaopang/p/8039237.html
Copyright © 2011-2022 走看看