zoukankan      html  css  js  c++  java
  • 使用MapReduce查询Hbase表指定列簇的全部数据输出到HDFS(一)

    package com.bank.service;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    /**
     * 查询hbase表指定列簇的全部数据输出到HDFS上
     * @author mengyao
     *
     */
    public class ReadHbase extends Configured implements Tool {

        private static String tableName;
        private static String outputDir;

        static class ReadHbaseMapper extends TableMapper<Text, Text> {
            private static Text k = new Text();
            private static Text v = new Text();
            @Override
            protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
                StringBuffer sb = new StringBuffer("");
                for(java.util.Map.Entry<byte[], byte[]> val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){
                    String str = new String(val.getValue());
                    if (str != null) {
                        sb.append(new String(val.getKey()));
                        sb.append(":");
                        sb.append(str);
                        sb.append(",");
                    }
                }
                String line = sb.toString();
                k.set(key.get());
                v.set(new String(line.substring(0,line.length()-1)));
                context.write(k, v);
            }
        }

        static class ReadHbaseReduce extends Reducer<Text, Text, Text, Text> {
            private Text result = new Text();
            @Override
            protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
                for (Text val : value) {
                    result.set(val);
                    context.write(key, result);
                }
            }
        }
        
        @Override
        public int run(String[] arg0) throws Exception {
            tableName = arg0[0];
            outputDir = arg0[1];
            Job job = Job.getInstance(getConf(), ReadHbase.class.getSimpleName());
            job.setJarByClass(ReadHbase.class);
            job.setReducerClass(ReadHbaseReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileOutputFormat.setOutputPath(job, new Path(outputDir));
            TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), ReadHbaseMapper.class, Text.class, Text.class, job);
            TableMapReduceUtil.addDependencyJars(job);
            return job.waitForCompletion(true) ? 0 : 1;
        }
        
        public static void main(String[] args) throws Exception {
            String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println(" Usage:" + ReadHbase.class.getSimpleName() + " <tableName> <outputDir> ");
                System.exit(2);
            }
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("dfs.socket.timeout", "3600000");
            int status = ToolRunner.run(conf, new ReadHbase(), otherArgs);
            System.exit(status);
        }

    }

  • 相关阅读:
    Python使用pymysql模块插入数据报错
    layui的select标签样式没有加载出来
    Python计算平均数
    Django获取小时内的数据
    mysql5.0忘记登录密码
    数据库介绍
    Linux基础
    测试理论
    计算机基础
    chrome 70 一下载文件就卡死
  • 原文地址:https://www.cnblogs.com/mengyao/p/4249164.html
Copyright © 2011-2022 走看看