zoukankan      html  css  js  c++  java
  • hbase 从hdfs上读取数据到hbase中

     1 <dependencies>
     2     <dependency>
     3         <groupId>org.apache.hbase</groupId>
     4         <artifactId>hbase-client</artifactId>
     5         <version>2.0.2</version>
     6     </dependency>
     7     <dependency>
     8         <groupId>org.apache.hbase</groupId>
     9         <artifactId>hbase-server</artifactId>
    10         <version>2.0.2</version>
    11     </dependency>
    12     <dependency>
    13         <groupId>org.apache.hbase</groupId>
    14         <artifactId>hbase-mapreduce</artifactId>
    15         <version>2.0.2</version>
    16     </dependency>
    17   </dependencies>

    Mappper

     1 package cn.hbase.mapreduce.hdfs;
     2 
     3 import java.io.IOException;
     4 import java.nio.ByteBuffer;
     5 import java.util.ArrayList;
     6 import java.util.HashMap;
     7 import java.util.Iterator;
     8 import java.util.List;
     9 import java.util.Map;
    10 import java.util.Map.Entry;
    11 import java.util.Set;
    12 
    13 import org.apache.hadoop.hbase.client.Put;
    14 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    15 import org.apache.hadoop.hbase.util.Bytes;
    16 import org.apache.hadoop.io.LongWritable;
    17 import org.apache.hadoop.io.Text;
    18 import org.apache.hadoop.mapreduce.Mapper;
    19 
    20 /**
    21  *
    22  * @author Tele 输入key hdfs上的文本的行号 输入value 文本 输出key 行键 输出value 将插入hbase的一行数据,需要行键
    23  *
    24  */
    25 
    26 public class ReadFruitFromHdfsMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    27 
    28     @Override
    29     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    30         // 读取
    31         String line = value.toString();
    32 
    33         // 切割
    34         /**
    35          * 1001 apple red 1002 pear yellow 1003 pineapple yellow
    36          */
    37         String[] fields = line.split("	");
    38 
    39         // 每个列族对应多个列
    40         Map<String, Object> map = new HashMap<String, Object>();
    41 
    42         // 封装列族下需要的列
    43         List<String> infoCNList = new ArrayList<String>();
    44         infoCNList.add("name");// 值对应field[1]
    45         infoCNList.add("color");// 值对应field[2]
    46         map.put("info", infoCNList);
    47 
    48         String row = fields[0];
    49 
    50         // 封装
    51         Put put = new Put(Bytes.toBytes(row));
    52 
    53         // 遍历map,封装每个列族下的列
    54         Set<Entry<String, Object>> entrySet = map.entrySet();
    55         Iterator<Entry<String, Object>> iterator = entrySet.iterator();
    56         while (iterator.hasNext()) {
    57             Entry<String, Object> entry = iterator.next();
    58             String cf = entry.getKey();
    59             List<String> cnList = (List<String>) entry.getValue();
    60 
    61             // 遍历list
    62             for (int i = 0; i < cnList.size(); i++) {
    63                 put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cnList.get(i)), Bytes.toBytes(fields[i + 1]));
    64             }
    65         }
    66 
    67         // 行键
    68         ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(fields[0]));
    69 
    70         // 写出
    71         context.write(immutableBytesWritable, put);
    72 
    73     }
    74 
    75 }

    Reducer

     1 package cn.hbase.mapreduce.hdfs;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.hbase.client.Mutation;
     6 import org.apache.hadoop.hbase.client.Put;
     7 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     8 import org.apache.hadoop.hbase.mapreduce.TableReducer;
     9 import org.apache.hadoop.io.NullWritable;
    10 import org.apache.hadoop.mapreduce.Reducer;
    11 
    12 /** 
    13  *
    14  *@author Tele
    15  *
    16  *对hbase上的表操作,继承tablereducer即可
    17  *
    18  */
    19 
    20 public class WriteFruitReducer extends TableReducer<ImmutableBytesWritable,Put,NullWritable> {
    21     
    22     @Override
    23     protected void reduce(ImmutableBytesWritable key, Iterable<Put> value,Context context) throws IOException, InterruptedException {
    24         for (Put put : value) {
    25             context.write(NullWritable.get(), put);
    26         }
    27     }
    28     
    29 }

    Runner

     1 package cn.hbase.mapreduce.hdfs;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.conf.Configured;
     5 import org.apache.hadoop.fs.Path;
     6 import org.apache.hadoop.hbase.HBaseConfiguration;
     7 import org.apache.hadoop.hbase.client.Put;
     8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    10 import org.apache.hadoop.mapreduce.Job;
    11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12 import org.apache.hadoop.util.Tool;
    13 import org.apache.hadoop.util.ToolRunner;
    14 
    15 /**
    16  *
    17  * @author Tele
    18  *
    19  */
    20 
    21 public class FruitRunner extends Configured implements Tool {
    22 
    23     public int run(String[] args) throws Exception {
    24         // 实例化job
    25         Job job = Job.getInstance(this.getConf());
    26 
    27         // 设置jar包路径
    28         job.setJarByClass(FruitRunner.class);
    29 
    30         // 组装mapper
    31         job.setMapperClass(ReadFruitFromHdfsMapper.class);
    32         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    33         job.setMapOutputValueClass(Put.class);
    34 
    35         // 设置数据来源
    36         FileInputFormat.addInputPath(job, new Path("/input_fruit"));
    37 
    38         // 组装reducer
    39         TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitReducer.class, job);
    40 
    41         // 设置reduce个数
    42         job.setNumReduceTasks(1);
    43 
    44         // 提交
    45 
    46         return job.waitForCompletion(true) ? 0 : 1;
    47     }
    48 
    49     public static void main(String[] args) throws Exception {
    50         Configuration conf = HBaseConfiguration.create();
    51         ToolRunner.run(new FruitRunner(), args);
    52 
    53     }
    54 
    55 }

     ps:需要预先创建表

  • 相关阅读:
    loaded some nib but the view outlet was not set
    指标评比
    IOS DEVELOP FOR DUMMIES
    软件测试题二
    javascript select
    DOM节点类型详解
    mysql操作
    UVA 10055
    solutions for 'No Suitable Driver Found For Jdbc'
    解决git中文乱码问题
  • 原文地址:https://www.cnblogs.com/tele-share/p/9979569.html
Copyright © 2011-2022 走看看