zoukankan      html  css  js  c++  java
  • hbase 从hbase上读取数据写入到hdfs

    Mapper

     1 package cn.hbase.mapreduce.hb2hdfs;
     2 
     3 import java.io.IOException;
     4 import org.apache.hadoop.hbase.client.Result;
     5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     6 import org.apache.hadoop.hbase.mapreduce.TableMapper;
     7 
     8 /**
     9  *
    10  * @author Tele
    11  *
    12  * 输出key 行键 输出out 读出的一行数据
    13  */
    14 
    15 public class ReadFruitFromHbMapper extends TableMapper<ImmutableBytesWritable, Result> {
    16 
    17     @Override
    18     protected void map(ImmutableBytesWritable key, Result value, Context context)
    19             throws IOException, InterruptedException {
    20         context.write(key, value);
    21     }
    22 }

    Reducer

     1 package cn.hbase.mapreduce.hb2hdfs;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.hbase.Cell;
     6 import org.apache.hadoop.hbase.CellScanner;
     7 import org.apache.hadoop.hbase.CellUtil;
     8 import org.apache.hadoop.hbase.client.Result;
     9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    10 import org.apache.hadoop.hbase.util.Bytes;
    11 import org.apache.hadoop.io.NullWritable;
    12 import org.apache.hadoop.io.Text;
    13 import org.apache.hadoop.mapreduce.Reducer;
    14 
    15 /**
    16  *
    17  * @author Tele
    18  *
    19  */
    20 
    21 public class WriteFruit2HdfsReducer extends Reducer<ImmutableBytesWritable, Result, NullWritable, Text> {
    22     @Override
    23     protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context)
    24             throws IOException, InterruptedException {
    25         for (Result result : values) {
    26             CellScanner scanner = result.cellScanner();
    27             while (scanner.advance()) {
    28                 Cell cell = scanner.current();
    29                 Text text = new Text();
    30                 // 封装数据
    31                 String row = Bytes.toString(CellUtil.cloneRow(cell)) + "	";
    32                 String cf = Bytes.toString(CellUtil.cloneFamily(cell)) + "	";
    33                 String cn = Bytes.toString(CellUtil.cloneQualifier(cell)) + "	";
    34                 String value = Bytes.toString(CellUtil.cloneValue(cell)) + "	";
    35 
    36                 StringBuffer buffer = new StringBuffer();
    37                 buffer.append(row).append(cf).append(cn).append(value);
    38                 text.set(buffer.toString());
    39 
    40                 // 写出
    41                 context.write(NullWritable.get(), text);
    42             }
    43 
    44         }
    45 
    46     }
    47 }

    Runner

     1 package cn.hbase.mapreduce.hb2hdfs;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.conf.Configured;
     5 import org.apache.hadoop.fs.Path;
     6 import org.apache.hadoop.hbase.HBaseConfiguration;
     7 import org.apache.hadoop.hbase.client.Result;
     8 import org.apache.hadoop.hbase.client.Scan;
     9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    10 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    11 import org.apache.hadoop.io.NullWritable;
    12 import org.apache.hadoop.io.Text;
    13 import org.apache.hadoop.mapreduce.Job;
    14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    15 import org.apache.hadoop.util.Tool;
    16 import org.apache.hadoop.util.ToolRunner;
    17 
    18 /**
    19  *
    20  * @author Tele
    21  *
    22  */
    23 
    24 public class FruitRunner extends Configured implements Tool {
    25 
    26     public int run(String[] args) throws Exception {
    27 
    28         System.setProperty("HADOOP_USER_NAME", "tele");
    29         // 实例化job
    30         Job job = Job.getInstance(this.getConf());
    31 
    32         // 设置jar
    33         job.setJarByClass(FruitRunner.class);
    34 
    35         // 设置缓存行键
    36         Scan scan = new Scan();
    37         scan.setCaching(300);
    38       
    39         // 组装mapper
    40         TableMapReduceUtil.initTableMapperJob("fruit", scan, ReadFruitFromHbMapper.class, ImmutableBytesWritable.class,
    41                 Result.class, job);
    42         // 组装reuder
    43         job.setReducerClass(WriteFruit2HdfsReducer.class);
    44         job.setOutputKeyClass(NullWritable.class);
    45         job.setOutputValueClass(Text.class);
    46 
    47         FileOutputFormat.setOutputPath(job, new Path("/outputfruit"));
    48 
    49         // reduce个数
    50         job.setNumReduceTasks(1);
    51 
    52         // 提交
    53         return job.waitForCompletion(true) ? 0 : 1;
    54     }
    55 
    56     public static void main(String[] args) throws Exception {
    57         Configuration conf = HBaseConfiguration.create();
    58         ToolRunner.run(conf, new FruitRunner(), args);
    59     }
    60 
    61 }
  • 相关阅读:
    [JavaEE] Hibernate ORM
    [PHP] htaccess 探秘
    [JavaEE] SSH框架搭建所需要的包
    博客园使用技巧
    vs快捷键
    算法:递归、循环、迭代、哈希表、查找、内排序、外排序
    【译】.NET中六个重要的概念:栈、堆、值类型、引用类型、装箱和拆箱 --转载
    .NET框架与开发语言:相关框架、共用部分、开发语言、一些疑问
    c#原理:c#代码是怎么运行的、实例化时发生了什么、静态对象(类、方法、变量、属性)的原理
    EA:UML建模-流程图、时序图、部署图
  • 原文地址:https://www.cnblogs.com/tele-share/p/9989375.html
Copyright © 2011-2022 走看看