zoukankan      html  css  js  c++  java
  • HBase学习之路 (五)MapReduce操作Hbase

    MapReduce从HDFS读取数据存储到HBase中

    现有HDFS中有一个student.txt文件,格式如下

    95002,刘晨,女,19,IS
    95017,王风娟,女,18,IS
    95018,王一,女,19,IS
    95013,冯伟,男,21,CS
    95014,王小丽,女,19,CS
    95019,邢小丽,女,19,IS
    95020,赵钱,男,21,IS
    95003,王敏,女,22,MA
    95004,张立,男,19,IS
    95012,孙花,女,20,CS
    95010,孔小涛,男,19,CS
    95005,刘刚,男,18,MA
    95006,孙庆,男,23,CS
    95007,易思玲,女,19,MA
    95008,李娜,女,18,CS
    95021,周二,男,17,MA
    95022,郑明,男,20,MA
    95001,李勇,男,20,CS
    95011,包小柏,男,18,MA
    95009,梦圆圆,女,18,MA
    95015,王君,男,18,MA

    将HDFS上的这个文件里面的数据写入到HBase数据块中

    MapReduce实现代码如下

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class ReadHDFSDataToHbaseMR extends Configured implements Tool{
    
        public static void main(String[] args) throws Exception {
            
            int run = ToolRunner.run(new ReadHDFSDataToHbaseMR(), args);
            System.exit(run);
        }
    
        @Override
        public int run(String[] arg0) throws Exception {
    
            Configuration conf = HBaseConfiguration.create();
            conf.set("fs.defaultFS", "hdfs://myha01/");
            conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
            System.setProperty("HADOOP_USER_NAME", "hadoop");
            FileSystem fs = FileSystem.get(conf);
    //        conf.addResource("config/core-site.xml");
    //        conf.addResource("config/hdfs-site.xml");
            
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(ReadHDFSDataToHbaseMR.class);
            
            job.setMapperClass(HDFSToHbaseMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job,null,null,null,null,false);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Put.class);
            
            Path inputPath = new Path("/student/input/");
            Path outputPath = new Path("/student/output/");
            
            if(fs.exists(outputPath)) {
                fs.delete(outputPath,true);
            }
            
            FileInputFormat.addInputPath(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
            
            boolean isDone = job.waitForCompletion(true);
            
            return isDone ? 0 : 1;
        }
        
        
        public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
            
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {    
                context.write(value, NullWritable.get());
            }
            
        }
        
        /**
         * 95015,王君,男,18,MA
         * */
        public static class HDFSToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable>{
            
            @Override
            protected void reduce(Text key, Iterable<NullWritable> values,Context context)
                    throws IOException, InterruptedException {
                
                String[] split = key.toString().split(",");
                
                Put put = new Put(split[0].getBytes());
                
                put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes());
                put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes());
                put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes());
                put.addColumn("info".getBytes(), "department".getBytes(), split[4].getBytes());
                
                context.write(NullWritable.get(), put);
            
            }
            
        }
        
    }

    MapReduce从HBase读取数据计算平均年龄并存储到HDFS中

    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    
    public class ReadHbaseDataToHDFS extends Configured implements Tool{
    
        public static void main(String[] args) throws Exception {
            
            int run = ToolRunner.run(new ReadHbaseDataToHDFS(), args);
            System.exit(run);
            
        }
    
        @Override
        public int run(String[] arg0) throws Exception {
    
            Configuration conf = HBaseConfiguration.create();
            conf.set("fs.defaultFS", "hdfs://myha01/");
            conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
            System.setProperty("HADOOP_USER_NAME", "hadoop");
            FileSystem fs = FileSystem.get(conf);
    //        conf.addResource("config/core-site.xml");
    //        conf.addResource("config/hdfs-site.xml");
            
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(ReadHbaseDataToHDFS.class);
            
            
            // 取对业务有用的数据 info,age
            Scan scan = new Scan();
            scan.addColumn("info".getBytes(), "age".getBytes());
            
            TableMapReduceUtil.initTableMapperJob(
                    "student".getBytes(), // 指定表名
                    scan, // 指定扫描数据的条件
                    HbaseToHDFSMapper.class, // 指定mapper class
                    Text.class,     // outputKeyClass mapper阶段的输出的key的类型
                    IntWritable.class, // outputValueClass mapper阶段的输出的value的类型
                    job, // job对象
                    false
                    );
        
    
            job.setReducerClass(HbaseToHDFSReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
            
            Path outputPath = new Path("/student/avg/");
            
            if(fs.exists(outputPath)) {
                fs.delete(outputPath,true);
            }
            
            FileOutputFormat.setOutputPath(job, outputPath);
            
            boolean isDone = job.waitForCompletion(true);
            
            return isDone ? 0 : 1;
        }
        
        public static class HbaseToHDFSMapper extends TableMapper<Text, IntWritable>{
            
            Text outKey = new Text("age");
            IntWritable outValue = new IntWritable();
            // key是hbase中的行键
            // value是hbase中的所行键的所有数据
            @Override
            protected void map(ImmutableBytesWritable key, Result value,Context context)
                    throws IOException, InterruptedException {
                
                boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes());
            
                if(isContainsColumn) {
                    
                    List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes());
                    System.out.println("listCells:	"+listCells);
                    Cell cell = listCells.get(0);
                    System.out.println("cells:	"+cell);
                    
                    byte[] cloneValue = CellUtil.cloneValue(cell);
                    String ageValue = Bytes.toString(cloneValue);
                    outValue.set(Integer.parseInt(ageValue));
                    
                    context.write(outKey,outValue);
                    
                }
                
            }
            
        }
        
        public static class HbaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{
            
            DoubleWritable outValue = new DoubleWritable();
            
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                    throws IOException, InterruptedException {
                
                int count = 0;
                int sum = 0;
                for(IntWritable value : values) {
                    count++;
                    sum += value.get();
                }
                
                double avgAge = sum * 1.0 / count;
                outValue.set(avgAge);
                context.write(key, outValue);
            }
            
        }
        
    }
  • 相关阅读:
    变焦与对焦的区别
    教你在Zuul中增加Filter过滤请求
    FEIGN开启对HYSTRIX支持
    Feign使用Hystrix
    Spring Boot中使用Redis小结
    Spring 中的事件机制 ApplicationEventPublisher
    SpringBoot+EasyExcel实现Excel的导出
    SpringBoot–集成验证码kaptcha实现验证码功能
    Spring Cloud使用Feign调用服务接口
    Spring cloud ReadTimeout 问题解决
  • 原文地址:https://www.cnblogs.com/qingyunzong/p/8681490.html
Copyright © 2011-2022 走看看