zoukankan      html  css  js  c++  java
  • 把hdfs数据写入到hbase表

    功能:把hdfs上的数据写入到hbase表。

    hadoop的mapreduce输出要导入到hbase表,最好先输出HFile格式,再导入hbase,因为HFile是hbase的内部存储格式,所以导入效率很高,下面我们来看一下具体怎么做。

    1、我们在hdfs上有一个文本文件:

      

    2、在hbase表里我们创建一个t1表

      创建语句:create 't1','cf'

    3、写MR作业

      

     1 package cn.tendency.wenzhouhbase.hadoop;
     2 
     3 import java.io.IOException;
     4 import java.text.SimpleDateFormat;
     5 import java.util.Calendar;
     6 import org.apache.hadoop.conf.Configuration;
     7 import org.apache.hadoop.hbase.client.Mutation;
     8 import org.apache.hadoop.hbase.client.Put;
     9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    10 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    11 import org.apache.hadoop.hbase.mapreduce.TableReducer;
    12 import org.apache.hadoop.io.LongWritable;
    13 import org.apache.hadoop.io.NullWritable;
    14 import org.apache.hadoop.io.Text;
    15 import org.apache.hadoop.mapreduce.Job;
    16 import org.apache.hadoop.mapreduce.Mapper;
    17 import org.apache.hadoop.mapreduce.Reducer;
    18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    19 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    20 
    21 public class Hadoop2Hbase {
    22 
    23     @SuppressWarnings("deprecation")
    24     public static void main(String[] args) throws Exception {
    25         Configuration conf = new Configuration();
    26         conf.set("hbase.zookeeper.quorum", "192.168.1.124,192.168.1.125,192.168.1.126");
    27         conf.set("hbase.zookeeper.property.clientPort", "2181");
    28         conf.set("hbase.master.port", "60000");
    29         conf.set("hbase.rootdir", "hdfs://192.168.1.122:9000/hbase");
    30         conf.set(TableOutputFormat.OUTPUT_TABLE, "t1");
    31 
    32         Job job = new Job(conf, Hadoop2Hbase.class.getSimpleName());
    33         TableMapReduceUtil.addDependencyJars(job);
    34         job.setJarByClass(Hadoop2Hbase.class);
    35 
    36         job.setMapperClass(HbaseMapper.class);
    37         job.setReducerClass(HbaseReducer.class);
    38 
    39         job.setMapOutputKeyClass(LongWritable.class);
    40         job.setMapOutputValueClass(Text.class);
    41 
    42         job.setInputFormatClass(TextInputFormat.class);
    43         job.setOutputFormatClass(TableOutputFormat.class);
    44 
    45         FileInputFormat.setInputPaths(job, "hdfs://192.168.1.123:9000/mytest/*");
    46         job.waitForCompletion(true);
    47     }
    48 
    49     static class HbaseMapper extends
    50             Mapper<LongWritable, Text, LongWritable, Text> {
    51         @Override
    52         protected void map(LongWritable key, Text value,
    53                 Mapper<LongWritable, Text, LongWritable, Text>.Context context)
    54                 throws IOException, InterruptedException {
    55             SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
    56             String[] split = value.toString().split("	");
    57             context.write(
    58                     key,
    59                     new Text(split[0]+sdf.format(Calendar.getInstance().getTime())
    60                             + "	" + value.toString()));
    61         }
    62     }
    63 
    64     static class HbaseReducer extends
    65             TableReducer<LongWritable, Text, NullWritable> {
    66         @Override
    67         protected void reduce(
    68                 LongWritable key,
    69                 Iterable<Text> values,
    70                 Reducer<LongWritable, Text, NullWritable, Mutation>.Context context)
    71                 throws IOException, InterruptedException {
    72             for (Text text : values) {
    73                 String[] split = text.toString().split("	");
    74                 Put put = new Put(split[0].getBytes());
    75                 put.addColumn("cf".getBytes(), "oneColumn".getBytes(), text
    76                         .toString().getBytes());
    77                 put.addColumn("cf".getBytes(), "id".getBytes(),
    78                         split[1].getBytes());
    79                 put.addColumn("cf".getBytes(), "name".getBytes(),
    80                         split[2].getBytes());
    81                 put.addColumn("cf".getBytes(), "age".getBytes(),
    82                         split[3].getBytes());
    83 //                put.addColumn("cf".getBytes(), "addr".getBytes(),
    84 //                        split[4].getBytes());
    85                 context.write(NullWritable.get(), put);
    86             }
    87         }
    88     }
    89 }
  • 相关阅读:
    ReentrantLock的实现语义与使用场景
    队列同步器详解
    设计模式--模板方法模式
    Synchronized及其实现原理
    JAVA线程基础
    JAVA内存模型
    java 线上问题定位工具
    JMX超详细解读
    Hexo
    [转]html5 video在安卓大部分浏览器包括微信最顶层的问题
  • 原文地址:https://www.cnblogs.com/kwzblog/p/7525431.html
Copyright © 2011-2022 走看看