zoukankan      html  css  js  c++  java
  • 记录Storm写HBase性能的一个问题

    一、集群storm版本:

       storm version命令打出来的:

    Storm 0.10.0.2.3.0.0-2557
    URL git@github.com:hortonworks/storm.git -r 38fad7c05bd00ac4ca61b68abf7411d9abc6189c
    Branch (no branch)
    Compiled by jenkins on 2015-07-14T14:45Z
    From source with checksum 43c4b3baaad6a0bca88145356d46327

    本地storm版本:apache-storm-0.10.1 注意版本和集群并不一致

         storm-hbase jar包版本:

        <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-hbase</artifactId>
        <version>0.10.0</version>
        </dependency>

    HBase version:

    [root@node3 tmp]# hbase version
    2016-07-08 14:02:14,137 INFO  [main] util.VersionInfo: HBase 1.1.1.2.3.0.0-2557
    2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: Source code repository git://ip-10-0-0-89.ec2.internal/grid/0/jenkins/workspace/HDP-dal-centos6/bigtop/build/hbase/rpm/BUILD/hbase-1.1.1.2.3.0.0 revision=6a55f21850cfccf19fa651b9e2c74c7f99bbd4f9
    2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: Compiled by jenkins on Tue Jul 14 09:41:13 EDT 2015
    2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: From source with checksum 8f076e3255b10e166a73c2436c2b1706

    二、本地模式下测试往HBase里写数据,拓扑定义代码如下,用了自带的HBaseBolt类

    public class PersistTopology {
    
        private static final String KAFKA_SPOUT = "KAFKA_SPOUT";
        private static final String HBASE_BOLT = "HBASE_BOLT";
        
        public static void main(String[] args) throws Exception {
            
            /* define spout */
            KafkaSpout kafkaSpout = new KafkaSpout();
     
            System.setProperty("hadoop.home.dir", "E:\eclipse\");
            
            /* define HBASE Bolt */
            HBaseMapper mapper = new MyHBaseMapper();
            HBaseBolt hbaseBolt = new HBaseBolt("testhbasebolt", mapper).withConfigKey("hbase.conf");
            
            /* define topology*/
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout(KAFKA_SPOUT, kafkaSpout);
            builder.setBolt(HBASE_BOLT, hbaseBolt, 2).shuffleGrouping(KAFKA_SPOUT);
    
            Config conf = new Config();
            conf.setDebug(true);
    
            Map<String, Object> hbConf = new HashMap<String, Object>();  
            conf.put("hbase.conf", hbConf);if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
    
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
    
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", conf, builder.createTopology());
                Utils.sleep(60000000);
                cluster.killTopology("test");
                cluster.shutdown();
            }
        }
    }

    测试结果:520多万条数据,写了几个多小时,平均一秒钟才几百条

    问题原因:看了HBaseBolt类的源码发现,此版本实现不是批量发的,如下,每收到一个tuple会调用execute函数,然后就直接batchMutate发出去了

    @Override
        public void execute(Tuple tuple) {
            byte[] rowKey = this.mapper.rowKey(tuple);
            ColumnList cols = this.mapper.columns(tuple);
            List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
    
            try {
                this.hBaseClient.batchMutate(mutations);
            } catch(Exception e){
                this.collector.reportError(e);
                this.collector.fail(tuple);
                return;
            }
    
            this.collector.ack(tuple);
        }

    二、下了一个apache-storm-1.0.1的原码发现execute函数的实现已经变成真正的批量发送,如下:

        @Override
        public void execute(Tuple tuple) {
            boolean flush = false;
            try {
                if (TupleUtils.isTick(tuple)) {
                    LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize);
                    collector.ack(tuple);
                    flush = true;
                } else {
                    byte[] rowKey = this.mapper.rowKey(tuple);
                    ColumnList cols = this.mapper.columns(tuple);
                    List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
                    batchMutations.addAll(mutations);
                    tupleBatch.add(tuple);
                    if (tupleBatch.size() >= batchSize) {
                        flush = true;
                    }
                }
    
                if (flush && !tupleBatch.isEmpty()) {
                    this.hBaseClient.batchMutate(batchMutations);
                    LOG.debug("acknowledging tuples after batchMutate");
                    for(Tuple t : tupleBatch) {
                        collector.ack(t);
                    }
                    tupleBatch.clear();
                    batchMutations.clear();
                }
            } catch(Exception e){
                this.collector.reportError(e);
                for (Tuple t : tupleBatch) {
                    collector.fail(t);
                }
                tupleBatch.clear();
                batchMutations.clear();
            }
        }
    batchSize可以设置,一旦当前数据量超过这个值就会被批量写入到HBase,同时,if (TupleUtils.isTick(tuple))这个目测是一种机制,隔一段时间bolt就会收到这样一个tick tuple,类似于一种
    定时的机制,这样可保证到达这个时间后即使数据量不到batchsize这么多也能被及时写入,该值可以设置,通过代码或storm.yaml配置文件都可以,代码设置如下:
            conf.put("hbase.conf", hbConf);
            
            //conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8);
    //        conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
    //        conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
    //        conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    16384);
              conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,    1);

    配置文件设置如下,代码设置应该优先级更高(还没试过):

    [root@node1 conf]# more storm.yaml |grep tuple
    topology.tick.tuple.freq.secs : 1

    没有升级storm版本,直接在当前的版本里把新版本中优化的代码抄了过来,上集群测试。
    测试结果:数据量和之前一样,但是有非常大的提升,之前需要5个多小时写入的数据,差不多二十分钟就写完了
  • 相关阅读:
    正则表达式分组小记
    Python中关于try...finally的一些疑问
    hello,world!
    02操控奴隶——掌握它的语言“Python”
    01操控奴隶——奴隶的构成与运行方式
    vue特殊属性 key ref slot
    vue内置组件 transition 和 keep-alive 使用
    vue文档全局api笔记2
    vue文档全局api笔记1
    vue 二三倍图适配,1像素边框
  • 原文地址:https://www.cnblogs.com/zhengchunhao/p/5653534.html
Copyright © 2011-2022 走看看