zoukankan      html  css  js  c++  java
  • MapReduce的方式进行HBase向HDFS导入和导出

    附录代码:

    HBase---->HDFS

     1 import java.io.IOException;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.hbase.HBaseConfiguration;
     6 import org.apache.hadoop.hbase.client.Result;
     7 import org.apache.hadoop.hbase.client.Scan;
     8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    10 import org.apache.hadoop.hbase.mapreduce.TableMapper;
    11 import org.apache.hadoop.io.Text;
    12 import org.apache.hadoop.mapreduce.Job;
    13 import org.apache.hadoop.mapreduce.Mapper;
    14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    16 
    17 public class HBase2HDFS {
    18 
    19     public static void main(String[] args) throws Exception {
    20         Configuration conf = HBaseConfiguration.create();
    21         Job job = Job.getInstance(conf, HBase2HDFS.class.getSimpleName());
    22         job.setJarByClass(HBase2HDFS.class);
    23         //MR有输入和输出,输入一般是FileInputFormat等...但是在HBase中需要用到一个特殊的工具类是TableMapReduceUtil
    24         TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), HBase2HDFSMapper.class,
    25                                             Text.class, Text.class, job);
    26         //HBase中的具体操作打到MR的job中.
    27         TableMapReduceUtil.addDependencyJars(job);
    28         job.setMapperClass(HBase2HDFSMapper.class);
    29         job.setMapOutputKeyClass(Text.class);
    30         job.setMapOutputValueClass(Text.class);
    31         job.setOutputFormatClass(TextOutputFormat.class);
    32         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    33         //FileOutputFormat.setOutputPath(job, new Path("/t1-out"));
    34         job.setNumReduceTasks(0);
    35         job.waitForCompletion(true);
    36         
    37         
    38     }
    39     static class HBase2HDFSMapper extends TableMapper<Text, Text>{
    40         private Text rowKeyText = new Text();
    41         private Text value = new Text();
    42         
    43         //这个TableMapper中的两个泛型是Map阶段的输出..HBase中的数据要想进入HBase,几乎都用引号引起来.
    44         //TableMapper是Mapper类的一个子类.这个类用来定义前面的两个泛型参数.
    45         @Override
    46         protected void map(
    47                 ImmutableBytesWritable key,
    48                 Result result,
    49                 Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
    50                 throws IOException, InterruptedException {
    51             //结果都在result对象,用raw方法从result对象中找到数据. 这个raw()方法已经过时了.
    52             /*
    53             KeyValue[] raw = result.raw();
    54             for (KeyValue keyValue : raw) {
    55                 keyValue.getValue();
    56             }
    57             */
    58             /*
    59              * 想输出的数据格式如下: 1 zhangsan 13  (行键,name,age)
    60              *                     2 lisi 14
    61              */
    62             
    63             //要想精确的获得某一列的值,要根据行键,列族,列的时间戳.
    64             //getColumnLatestCell 是获得最新的时间戳的值 相当于时间戳已经定义好了.
    65             byte[] nameBytes = result.getColumnLatestCell("cf".getBytes(), "name".getBytes()).getValue();
    66             byte[] ageBytes = result.getColumnLatestCell("cf".getBytes(), "age".getBytes()).getValue();
    67             
    68             rowKeyText.set(key.get());
    69             value.set(new String(nameBytes) + "	" + new String(ageBytes));
    70             context.write(new Text(key.get()), value);
    71             //这里已经把数据搞成了 1 name age 的形式....就不需要写Reduce
    72         }
    73     }
    74 }

    HDFS---->HBase 通过MR导入到HBase

     1 import java.io.IOException;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.hbase.HBaseConfiguration;
     5 import org.apache.hadoop.hbase.client.Mutation;
     6 import org.apache.hadoop.hbase.client.Put;
     7 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
     8 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
     9 import org.apache.hadoop.hbase.mapreduce.TableReducer;
    10 import org.apache.hadoop.io.LongWritable;
    11 import org.apache.hadoop.io.NullWritable;
    12 import org.apache.hadoop.io.Text;
    13 import org.apache.hadoop.mapreduce.Job;
    14 import org.apache.hadoop.mapreduce.Mapper;
    15 import org.apache.hadoop.mapreduce.Reducer;
    16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    18 
    19 public class HDFS2HBaseImport {
    20 
    21     public static void main(String[] args) throws Exception {
    22         Configuration conf = HBaseConfiguration.create();
    23         conf.set(TableOutputFormat.OUTPUT_TABLE, args[0]);
    24         
    25         Job job = Job.getInstance(conf, HDFS2HBaseImport.class.getSimpleName());
    26         job.setJarByClass(HDFS2HBaseImport.class);
    27         
    28         //数据到底放到哪一张表中,还是要用到TableMapReduceUtil类.
    29         TableMapReduceUtil.addDependencyJars(job);
    30         job.setMapperClass(HDFS2HBaseMapper.class);
    31         job.setMapOutputKeyClass(Text.class);
    32         job.setMapOutputValueClass(Text.class);
    33         job.setOutputFormatClass(TextOutputFormat.class);
    34         job.setReducerClass(HDFS2HBaseReducer.class);
    35         job.setOutputFormatClass(TableOutputFormat.class);
    36         FileInputFormat.setInputPaths(job, args[1]);
    37         job.waitForCompletion(true);        
    38     }
    39     
    40     static class HDFS2HBaseMapper extends Mapper<LongWritable, Text, Text, Text>{
    41         private Text rowKeyText = new Text();
    42         private Text value = new Text();
    43         
    44         @Override
    45         protected void map(LongWritable key, Text text,
    46                 Mapper<LongWritable, Text, Text, Text>.Context context)
    47                 throws IOException, InterruptedException {
    48             String[] splits = text.toString().split("	");
    49             rowKeyText.set(splits[0]);
    50             value.set(splits[1] + "	" + splits[2]);//name	age
    51             context.write(rowKeyText, value);
    52         }
    53     }
    54     //Reduce继承的是和在导出的时候Map extends TableMapper 对应的  因为导入的是HBase中,所以后面的参数用NullWritable代替
    55     static class HDFS2HBaseReducer extends TableReducer<Text, Text, NullWritable> {
    56         @Override
    57         protected void reduce(Text k2, Iterable<Text> v2s,
    58                 Reducer<Text, Text, NullWritable, Mutation>.Context context)
    59                 throws IOException, InterruptedException {
    60             //向HBase中插入数据一定要用到Put对象.
    61             Put put = new Put(k2.getBytes());
    62             
    63             for (Text text : v2s) {
    64                 String[] splits = text.toString().split("	");
    65                 //加载列和对应的值
    66                 put.add("cf".getBytes(), "name".getBytes(), splits[0].getBytes());
    67                 put.add("cf".getBytes(), "age".getBytes(), splits[1].getBytes());
    68                 context.write(NullWritable.get(), put);//一个参数是key,一个是对应的value.
    69                 //导入HBase不需要key...直接用NullWritable对象和封装好数据的put对象.
    70             }
    71         }
    72     }
    73 }
  • 相关阅读:
    Delphi 控件使用谷歌浏览器控件chromium 安装包是 dcef3-20140522 --碰到的问题
    salesforce零基础学习(九十九)Git 在salesforce项目中的应用(vs code篇)
    Salesforce LWC学习(二十) CLI篇:新版本不支持Audience解决方案
    Salesforce LWC学习(十九) 针对 lightning-input-field的label值重写
    apt安装后需要移除的问题
    mysql 触发器阻止不合理数据插入
    本机环境virtualbox出现问题重装
    php 版本升级后需要对代码进行兼容性检测
    k8s使用需认证的私服仓库
    无法启动electron,提示node_modules/electron/dist/chrome-sandbox is owned by root and has mode 4755.
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/5583135.html
Copyright © 2011-2022 走看看