zoukankan      html  css  js  c++  java
  • 【HBase】通过Bulkload批量加载数据到Hbase表中


    需求

    将hdfs上面的这个路径 /hbase/input/user.txt 的数据文件,转换成HFile格式,然后load到myuser2表里面去
    先清空一下myuser2表的数据 —— truncate 'myuser2'


    步骤

    一、开发MapReduce

    定义一个main方法类——BulkloadMain

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class BulkloadMain extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //获取Job对象
            Job job = Job.getInstance(super.getConf(), "Bulkload");
            //获取数据输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.setInputPaths(job,new Path("hdfs://node01:8020/hbase/input"));
    
            //自定义Map逻辑
            job.setMapperClass(BulkloadMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
    
            Connection connection = ConnectionFactory.createConnection(super.getConf());
            Table table = connection.getTable(TableName.valueOf("myuser2"));
    
            //通过 configureIncrementalLoad 设置增量添加
            HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("myuser2")));
    
            //使用 HFileOutputFormat2 设置输出类型为HFile
            job.setOutputFormatClass(HFileOutputFormat2.class);
    
            HFileOutputFormat2.setOutputPath(job,new Path("hdfs://node01:8020/hbase/output2"));
    
            boolean b = job.waitForCompletion(true);
    
            return b?0:1;
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
            int run = ToolRunner.run(configuration, new BulkloadMain(), args);
            System.exit(run);
        }
    }
    

    自定义Map逻辑,定义一个Mapper类——BulkloadMapper

    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class BulkloadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //以	为分隔符分割拿到的v1
            String[] split = value.toString().split("	");
    
            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
            immutableBytesWritable.set(split[0].getBytes());
    
            //创建Put对象
            Put put = new Put(split[0].getBytes());
            //往put中插入行数据
            put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
            put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());
            //转换为k2,v2输出
            context.write(immutableBytesWritable,put);
    
        }
    }
    

    二、打成Jar包放到linux执行

    yarn jar day12_HBaseANDMapReduce-1.0-SNAPSHOT.jar cn.itcast.mr.demo4.BulkloadMain
    

    在HDFS可以看见成功输出的Hfile文件
    在这里插入图片描述

    三、有两种办法将HFile文件加载到HBase表中

    开发代码
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
    
    public class LoadData {
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
    
            Connection connection =  ConnectionFactory.createConnection(configuration);
            Admin admin = connection.getAdmin();
            Table table = connection.getTable(TableName.valueOf("myuser2"));
            LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
            load.doBulkLoad(new Path("hdfs://node01:8020/hbase/hfile_out"), admin,table,connection.getRegionLocator(TableName.valueOf("myuser2")));
        }
    }
    
    Hadoop命令运行
    yarn jar /export/servers/hbase-1.2.0-cdh5.14.0/lib/hbase-server-1.2.0-cdh5.14.0.jar completebulkload /hbase/hfile_out myuser2
    

    两种方法均能成功
    在这里插入图片描述

  • 相关阅读:
    char
    export和export default
    递归打印文件目录
    centso7 安装redmine
    sonar结合jenkins
    sonar安装
    gitlab+jenkins
    centos5 安装redmine
    elk安装最佳实践
    elk认证模块x-pack安装
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772419.html
Copyright © 2011-2022 走看看