zoukankan      html  css  js  c++  java
  • 31-hadoop-hbase-mapreduce操作hbase

    有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。

    HbaseMapper:

    package com.wenbronk.hbase.hbase;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class HbaseMapper extends Mapper<LongWritable, Text, Text, Text> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            String k = split[0];
            String v = split[1];
            context.write(new Text(k), new Text(v));
        }
    }

    HbaseReducer

    package com.wenbronk.hbase.hbase;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import javax.xml.soap.Text;
    import java.io.IOException;
    
    /**
    *
    继承 TableReducer<KeyIn,Values,KeyOut>, 因此 Hbase中的key是ImmutableBytesWritable
    */
    public class HbaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {

      @Override
      
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String k
    = key.toString();
        StringBuilder sb
    = new StringBuilder();
        for (Text value : values) {
          sb.append(value.toString()).append(
    ",");
        }
        
    if (sb.length() > 0) {
          sb.deleteCharAt(sb.length()
    - 1);
        }
        
    // rowkey
        Put put = new Put(k.getBytes());
        put.addColumn(
    "cf1".getBytes(), "name".getBytes(), sb.toString().getBytes());
      }
    }

    job

    package com.wenbronk.hbase.hbase;
    
    
    import com.wenbronk.hbase.mapreduce.ReducerClass;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    
    import java.io.IOException;
    
    public class JobTest {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration config = new Configuration();
            config.set("fs.defaultFS", "hdfs://192.168.208.106:8020");
            config.set("yarn.resourcemanager.hostname", "192.168.208.106");
            config.set("mapred.job.tracker", "192.168.208.106:9001");
            config.set("ha.zookeeper.quorum", "192.168.208.106,192.168.208.107,192.168.208.108");
    
            Job job = new Job(config, "Hbase");
            job.setJarByClass(JobTest.class);
    
            FileSystem fileSystem = FileSystem.get(config);
            Path inPath = new Path("/usr/test/test.txt");
            job.setInputFormatClass(TextInputFormat.class);
    
            job.setMapperClass(HbaseMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            TableMapReduceUtil.initTableReducerJob("t_user", ReducerClass.class, job, null, null, null, null, false);
    
            boolean b = job.waitForCompletion(true);
            if (b) {
                System.out.println("mapreduce 执行成功");
            }
    
    
        }
    }

    系列来自尚学堂视频

  • 相关阅读:
    无法启动程序 ”*.lib”
    winedt打开.tex文件时会出现reading error,看不到任何文字
    VS2012 OpenCV2.4.9 Debug可以允许,Release不可以
    VS2012的调试插件Image Watch,opencv编程神器
    VS2012 配置 OpenCV3.0
    ICP 算法步骤
    linux 文件系统
    interrupt_control
    bootm命令移植
    DMA
  • 原文地址:https://www.cnblogs.com/wenbronk/p/7407034.html
Copyright © 2011-2022 走看看