zoukankan      html  css  js  c++  java
  • 用mapreduce读取hdfs数据到hbase上

    hdfs数据到hbase过程

     

    将HDFS上的文件中的数据导入到hbase中

    实现上面的需求也有两种办法,一种是自定义mr,一种是使用hbase提供好的import工具

     

    hbase先创建好表   create 'TB','info'

    下面是实现代码:

    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    /**
     * 用于HDFS的数据读取,写入到hbase中,
     * hbase里预先创建好表:create 'NNTB','info'
     * */
    public class HdfsToHBase {
        public static void main(String[] args) throws Exception{
            System.setProperty("hadoop.home.dir", "D:\hadoop-2.7.6");//这行我是本地运行所需指定的hadoop home
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "202.168.27.196:2181");//ip乱写的,端口默认2181
            conf.set(TableOutputFormat.OUTPUT_TABLE, "NNTB");
            Job job = Job.getInstance(conf, HdfsToHBase.class.getSimpleName());
            TableMapReduceUtil.addDependencyJars(job);
            job.setJarByClass(HdfsToHBase.class);
            
            job.setMapperClass(HdfsToHBaseMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setReducerClass(HdfsToHBaseReducer.class);
            
            FileInputFormat.addInputPath(job, new Path("hdfs://202.168.27.196:9000/user/hadoop/gznt/gznt_bmda/*"));
            job.setOutputFormatClass(TableOutputFormat.class);
            job.waitForCompletion(true);
        }
        
        public static class HdfsToHBaseMapper extends Mapper<LongWritable, Text, Text, Text> {
            private Text outKey = new Text();
            private Text outValue = new Text();
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] splits = value.toString().split("	");
                outKey.set(splits[0]);
                outValue.set(splits[1]+"	"+splits[2]+"	"+splits[3]+"	"+splits[4]);
                context.write(outKey, outValue);
            }
        }
        //:::   create 'NNTB','info'
        public static class HdfsToHBaseReducer extends TableReducer<Text, Text, NullWritable> {
            @Override
            protected void reduce(Text k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException {
                Put put = new Put(k2.getBytes());
                for (Text v2 : v2s) {
                    String[] splis = v2.toString().split("	");
                    //info,对应hbase列族名
                    if(splis[0]!=null && !"NULL".equals(splis[0])){
                        put.addColumn("info".getBytes(), "NodeCode".getBytes(),splis[0].getBytes());
                    }
                    if(splis[1]!=null && !"NULL".equals(splis[1])){
                        put.addColumn("info".getBytes(), "NodeType".getBytes(),splis[1].getBytes());
                    }
                    if(splis[2]!=null && !"NULL".equals(splis[2])){
                        put.addColumn("info".getBytes(), "NodeName".getBytes(),splis[2].getBytes());
                    }
                    if(splis[3]!=null && !"NULL".equals(splis[3])){
                        put.addColumn("info".getBytes(), "IsWarehouse".getBytes(),splis[3].getBytes());
                    }
                }
                context.write(NullWritable.get(),put);
            }
        }
    }
    

      

     

  • 相关阅读:
    Spring AOP 随记
    Java设计模式系列 — 构造器模式
    【Java线程安全】 — 常用数据结构及原理(未完结)
    【最佳实践】好用的Quartz管理器类
    Timer和时间调度
    Java9之HashMap与ConcurrentHashMap
    记一次maven的包冲突经历
    hbase高可用集群部署(cdh)
    HBase 1.2.6 完全分布式集群安装部署详细过程
    hadoop-2.7.3完全分布式部署
  • 原文地址:https://www.cnblogs.com/zmoumou/p/10181133.html
Copyright © 2011-2022 走看看