zoukankan      html  css  js  c++  java
  • 使用MapReduce将HDFS数据导入到HBase(一)

    package com.bank.service;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    /**
     * 使用MapReduce批量导入Hbase(没有Reduce函数的MapReduce)
     * @author mengyao
     *
     */
    public class DataImportToHbase extends Configured implements Tool {

        static class DataImportToHbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
            private static String familyName = "info";
            private static String[] qualifiers = {"gzh", "currency", "version", "valuta", "qfTime", "flag", "machineID"};
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] values = line.split(" ");
                if (values.length == 7 && values.length == qualifiers.length) {
                    String row = values[0]+"_"+values[1]+"_"+values[2]+"_"+values[3];
                    long timestamp = System.currentTimeMillis();
                    ImmutableBytesWritable immutable = new ImmutableBytesWritable(Bytes.toBytes(row));
                    Put put = new Put(Bytes.toBytes(row));
                    for (int i = 0; i < values.length; i++) {
                        String qualifier = qualifiers[i];
                        String val = values[i];
                        put.add(Bytes.toBytes(familyName), Bytes.toBytes(qualifier), timestamp, Bytes.toBytes(val));
                    }
                    context.write(immutable, put);
                } else {
                    System.err.println(" ERROR: value length must equale qualifier length ");
                }
            }
        }

        @Override
        public int run(String[] arg0) throws Exception {
            Job job = Job.getInstance(getConf(), DataImportToHbase.class.getSimpleName());
            job.setJarByClass(DataImportToHbase.class);
            
            job.setInputFormatClass(TextInputFormat.class);
            FileInputFormat.setInputPaths(job, new Path(arg0[0]));
            
            job.setMapperClass(DataImportToHbaseMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            
            TableMapReduceUtil.initTableReducerJob(arg0[1], null, job);        
            job.setNumReduceTasks(0);
            TableMapReduceUtil.addDependencyJars(job);
            
            return job.waitForCompletion(true) ? 0 : 1;
        }
        
        public static void main(String[] args) throws Exception {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("dfs.socket.timeout", "3600000");
            String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println(" ERROR: <dataInputDir> <tableName>");
                System.exit(2);
            }
            int status = ToolRunner.run(conf, new DataImportToHbase(), otherArgs);
            System.exit(status);
        }
    }

  • 相关阅读:
    JS document.execCommand实现复制功能(带你出坑)
    jquery动态添加删除一行数据示例
    SpringBoot SpEL表达式注入漏洞-分析与复现
    Fastjson 1.2.22-24 反序列化漏洞分析
    udf提权原理详解
    ZZCMS v8.2 前台Insert注入+任意文件删除
    安恒杯 3月线上个人赛WriteUp
    SQLI LABS Stacked Part(38-53) WriteUp
    【转】Ubuntu16.04安装docker
    安装部署k8s-版本-1.13
  • 原文地址:https://www.cnblogs.com/mengyao/p/4231064.html
Copyright © 2011-2022 走看看