zoukankan      html  css  js  c++  java
  • [原创]HBase学习笔记(4)- 数据导入

    需要分别从Oracle和文本文件往HBase中导入数据,这里介绍几种数据导入方案。

    1.使用importTSV导入HBase

    importTSV支持增量导入。新数据插入,已存在数据则修改。

    1.1.首先将待导入文本test_import.txt放到hdfs集群

    文本格式如下(从网上找的虚拟话单数据)。逗号分隔,共13个字段,其中第1个字段作为rowkey。

    1,12026546272,2013/10/19,20:52,33分18秒,被叫,13727310234,北京市,省际,0,32.28,0.4,全球通商旅88套餐
    2,12026546272,2013/10/19,20:23,33分18秒,被叫,13727310234,北京市,省际,0,32.28,0.4,全球通商旅88套餐
    3,16072996404,2013/10/19,20:52,10分52秒,主叫,19271253211,北京市,省际,0,2.8,1.9,全球通商旅88套餐
    4,10023895821,2013/10/19,20:52,09分20秒,被叫,15115468122,绵阳市,省内,0,45.91,5.26,全球通商旅88套餐
    5,13381653644,2013/10/19,20:53,06分00秒,被叫,10991482287,北京市,省际,0,54.79,7.16,全球通商旅88套餐
    6,18695195919,2013/10/19,21:37,27分00秒,主叫,14858652217,绵阳市,省内,0,36.27,6.68,全球通商旅88套餐
    7,11396010469,2013/10/19,21:37,27分02秒,主叫,12939968466,绵阳市,省内,0,65.63,4.45,全球通商旅88套餐
    8,15109754362,2013/10/19,21:37,05分00秒,被叫,14240771580,绵阳市,省内,0,66.86,5.75,全球通商旅88套餐
    9,13845944798,2013/10/19,21:37,13分50秒,被叫,13648619896,广州市,省际,0,60.71,3.39,全球通商旅88套餐
    10,17883953443,2013/10/19,21:38,37分54秒,被叫,10110778698,广州市,省际,0,55.14,1.45,全球通商旅88套餐
    11,19643495044,2013/10/19,21:38,49分34秒,主叫,14581482419,广州市,省际,0,16.84,1.36,全球通商旅88套餐

    1.2.在HBase中创建表:create ‘test_import’, ‘cf’

    1.3.使用importTSV导入

    执行命令:

    $hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf:field1,cf:field2,cf:field3,cf:field4,cf:field5,cf:field6,cf:field7,cf:field8,cf:field9,cf:field10,cf:field11,cf:field12 test_import /test_import.txt

    其中:

    -Dimporttsv.separator指定分隔符,只支持单字节分隔符。

    -Dimporttsv.columns指定导入的列。HBASE_ROW_KEY是关键字,导入数据时必须指定rowkey。

    其它可选参数:

    执行后自动提交MapReduce任务进行导入:

    1.4.用scan命令查看hbase中test_import表的内容

    一共11条记录。

    1.5.使用get查看记录

     

    2.使用importTSV+bulkload导入HBase

    先使用importTSV生成HFile文件,再使用bulkload导入HBase

    这种方式对RegionServer更友好一些,加载数据几乎不占用RegionServer的计算资源,只是在HDFS上移动HFile文件,然后通过HMaster将该RegionServer的一个或多个Region上线。

    2.1.生成HFile文件

    使用importTSV生成HFile文件。与上一中方法略有不同的是,在importTSV执行时指定-Dimporttsv.bulk.output参数,则是生成HFile文件到指定文件,而不会直接导入HBase表。

    $hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=","  -Dimporttsv.columns=HBASE_ROW_KEY,cf:field1,cf:field2,cf:field3,cf:field4,cf:field5,cf:field6,cf:field7,cf:field8,cf:field9,cf:field10,cf:field11,cf:field12 -Dimporttsv.bulk.output=/test_import_outputdir/ test_import /test_import.txt

    查看/test_import_outputdir/数据:

    2.2.导入HBase表

    使用bulkload导入HBase表。执行速度非常快。

    执行命令:

    $hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /test_import_outputdir/ test_import 

    2.3.使用get查看记录

     

    3.支持多字节分隔符导入HBase

    importTSV只支持单字节分隔符,不支持多字节分隔符,比如“|@|”如果要实现多字节分隔符,需要自己编写MapReduce作业生成HFile文件,或者导入HBase。

    3.1.数据格式

    待导入数据如下。|@|作为分隔符,第1个字段是主键。

    1|@|12026546272|@|2013/10/19|@|20:52|@|33分18秒|@|被叫|@|13727310234|@|北京市|@|省际|@|0|@|32.28|@|0.4|@|全球通商旅88套餐
    2|@|12026546272|@|2013/10/19|@|20:23|@|33分18秒|@|被叫|@|13727310234|@|北京市|@|省际|@|0|@|32.28|@|0.4|@|全球通商旅88套餐
    ......
    ......
    ...... 

    3.2.编写Mapper生成HFile

    我们只要使用1个Mapper生成HFile即可,不需要额外写Reducer。示例代码如下。

    public class HfileGenMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
    
        protected void map(LongWritable key, Text value,
                           Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)
                throws IOException, InterruptedException {
            // 第一个字段作为rowkey
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
                    value.toString().split("\|@\|")[0].getBytes());
            List<KeyValue> list = createKeyValue(value.toString());
            Iterator<KeyValue> itor = list.iterator();
            while (itor.hasNext()) {
                KeyValue kv = itor.next();
                if (kv != null) {
                    context.write(rowkey, kv);
                }
            }
        }
    
        // 解析一行记录,得到多个KeyValue对象。
        private List<KeyValue> createKeyValue(String line) {
            List<KeyValue> list = new ArrayList<KeyValue>();
            String[] fields = line.split("\|@\|");
            String rowkey = fields[0];
            String columnFamily = "cf";
            for (int i = 1; i < fields.length; i++) {
                String qualifyName = "field" + String.valueOf(i);
                String value = fields[i];
                KeyValue kv = new KeyValue(rowkey.getBytes(), columnFamily.getBytes(),
                        qualifyName.getBytes(), System.currentTimeMillis(), value.getBytes());
                list.add(kv);
            }
            return list;
        }
    }

    3.3.创建作业实例

    创建job实例,填写相关配置。指定输入输出数据,设置Mapper和Reducer,并提交作业。

    public class BulkLoadHFileJob extends Configured implements Tool {
    
        public static void main(String[] args) throws Exception {
            int status = ToolRunner.run(new BulkLoadHFileJob(), args);
            System.exit(status);
        }
    
        @Override
        public int run(String[] args) throws Exception {
            this.setConf(HBaseConfiguration.create(this.getConf()));
            getConf().set("hbase.zookeeper.property.clientPort", "2181");
            getConf().set("hbase.zookeeper.quorum", "W122PC04VM07,W122PC05VM07,W122PC06VM07");
            String inputPath = "hdfs://cluster1/test_import2.txt";
            String outputPath = "hdfs://cluster1/test_import2_outputdir";
    
            Job job = Job.getInstance(getConf(), "Hdfs2HFile test");
            job.setJarByClass(BulkLoadHFileJob.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(HfileGenMapper.class);
            job.setReducerClass(KeyValueSortReducer.class);
    
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(KeyValue.class);
            job.setOutputFormatClass(HFileOutputFormat2.class);
            job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
    
            FileInputFormat.addInputPath(job, new Path(inputPath));
            FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
            Connection connection = ConnectionFactory.createConnection(getConf());
            TableName tableName = TableName.valueOf("table_test");
            HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    }

    3.4.提交作业

    使用hadoop客户端提交作业。

    hadoop jar ./hbasetest.jar com.hbase.test.hdfs2hbase.BulkLoadHFileJob

    3.5.导入HBase

    使用bulkload将生成的HFile导入HBase,速度非常快。具体操作步骤参考2.2节。

  • 相关阅读:
    DDD实战进阶第一波(三):开发一般业务的大健康行业直销系统(搭建支持DDD的轻量级框架二)
    DDD实战进阶第一波(二):开发一般业务的大健康行业直销系统(搭建支持DDD的轻量级框架一)
    01-JavaScript之变量
    18-TypeScript模板方法模式
    17-TypeScript代理模式
    16-TypeScript装饰器模式
    15-TypeScript策略模式
    真的可以「 人人都是产品经理 」吗
    如何从程序员到架构师?
    除代码之外,程序员还有哪些能力也非常的关键?
  • 原文地址:https://www.cnblogs.com/simplestupid/p/6610541.html
Copyright © 2011-2022 走看看