zoukankan      html  css  js  c++  java
  • 记录Storm写HBase性能的一个问题

    一、集群storm版本:

       storm version命令打出来的:

    Storm 0.10.0.2.3.0.0-2557
    URL git@github.com:hortonworks/storm.git -r 38fad7c05bd00ac4ca61b68abf7411d9abc6189c
    Branch (no branch)
    Compiled by jenkins on 2015-07-14T14:45Z
    From source with checksum 43c4b3baaad6a0bca88145356d46327

    本地storm版本:apache-storm-0.10.1 注意版本和集群并不一致

         storm-hbase jar包版本:

        <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-hbase</artifactId>
        <version>0.10.0</version>
        </dependency>

    HBase version:

    [root@node3 tmp]# hbase version
    2016-07-08 14:02:14,137 INFO  [main] util.VersionInfo: HBase 1.1.1.2.3.0.0-2557
    2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: Source code repository git://ip-10-0-0-89.ec2.internal/grid/0/jenkins/workspace/HDP-dal-centos6/bigtop/build/hbase/rpm/BUILD/hbase-1.1.1.2.3.0.0 revision=6a55f21850cfccf19fa651b9e2c74c7f99bbd4f9
    2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: Compiled by jenkins on Tue Jul 14 09:41:13 EDT 2015
    2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: From source with checksum 8f076e3255b10e166a73c2436c2b1706

    二、本地模式下测试往HBase里写数据,拓扑定义代码如下,用了自带的HBaseBolt类

    public class PersistTopology {
    
        private static final String KAFKA_SPOUT = "KAFKA_SPOUT";
        private static final String HBASE_BOLT = "HBASE_BOLT";
        
        public static void main(String[] args) throws Exception {
            
            /* define spout */
            KafkaSpout kafkaSpout = new KafkaSpout();
     
            System.setProperty("hadoop.home.dir", "E:\eclipse\");
            
            /* define HBASE Bolt */
            HBaseMapper mapper = new MyHBaseMapper();
            HBaseBolt hbaseBolt = new HBaseBolt("testhbasebolt", mapper).withConfigKey("hbase.conf");
            
            /* define topology*/
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout(KAFKA_SPOUT, kafkaSpout);
            builder.setBolt(HBASE_BOLT, hbaseBolt, 2).shuffleGrouping(KAFKA_SPOUT);
    
            Config conf = new Config();
            conf.setDebug(true);
    
            Map<String, Object> hbConf = new HashMap<String, Object>();  
            conf.put("hbase.conf", hbConf);if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
    
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
    
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", conf, builder.createTopology());
                Utils.sleep(60000000);
                cluster.killTopology("test");
                cluster.shutdown();
            }
        }
    }

    测试结果:520多万条数据,写了几个多小时,平均一秒钟才几百条

    问题原因:看了HBaseBolt类的源码发现,此版本实现不是批量发的,如下,每收到一个tuple会调用execute函数,然后就直接batchMutate发出去了

    @Override
        public void execute(Tuple tuple) {
            byte[] rowKey = this.mapper.rowKey(tuple);
            ColumnList cols = this.mapper.columns(tuple);
            List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
    
            try {
                this.hBaseClient.batchMutate(mutations);
            } catch(Exception e){
                this.collector.reportError(e);
                this.collector.fail(tuple);
                return;
            }
    
            this.collector.ack(tuple);
        }

    二、下了一个apache-storm-1.0.1的原码发现execute函数的实现已经变成真正的批量发送,如下:

        @Override
        public void execute(Tuple tuple) {
            boolean flush = false;
            try {
                if (TupleUtils.isTick(tuple)) {
                    LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize);
                    collector.ack(tuple);
                    flush = true;
                } else {
                    byte[] rowKey = this.mapper.rowKey(tuple);
                    ColumnList cols = this.mapper.columns(tuple);
                    List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
                    batchMutations.addAll(mutations);
                    tupleBatch.add(tuple);
                    if (tupleBatch.size() >= batchSize) {
                        flush = true;
                    }
                }
    
                if (flush && !tupleBatch.isEmpty()) {
                    this.hBaseClient.batchMutate(batchMutations);
                    LOG.debug("acknowledging tuples after batchMutate");
                    for(Tuple t : tupleBatch) {
                        collector.ack(t);
                    }
                    tupleBatch.clear();
                    batchMutations.clear();
                }
            } catch(Exception e){
                this.collector.reportError(e);
                for (Tuple t : tupleBatch) {
                    collector.fail(t);
                }
                tupleBatch.clear();
                batchMutations.clear();
            }
        }
    batchSize可以设置,一旦当前数据量超过这个值就会被批量写入到HBase,同时,if (TupleUtils.isTick(tuple))这个目测是一种机制,隔一段时间bolt就会收到这样一个tick tuple,类似于一种
    定时的机制,这样可保证到达这个时间后即使数据量不到batchsize这么多也能被及时写入,该值可以设置,通过代码或storm.yaml配置文件都可以,代码设置如下:
            conf.put("hbase.conf", hbConf);
            
            //conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8);
    //        conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
    //        conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
    //        conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    16384);
              conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,    1);

    配置文件设置如下,代码设置应该优先级更高(还没试过):

    [root@node1 conf]# more storm.yaml |grep tuple
    topology.tick.tuple.freq.secs : 1

    没有升级storm版本,直接在当前的版本里把新版本中优化的代码抄了过来,上集群测试。
    测试结果:数据量和之前一样,但是有非常大的提升,之前需要5个多小时写入的数据,差不多二十分钟就写完了
  • 相关阅读:
    word设置的密码忘了怎么办?
    Navicat Report Viewer 设置 HTTP 的方法
    如何处理Navicat Report Viewer 报表
    excel密码忘记了怎么办
    Beyond Compare文本比较搜索功能详解
    Popular Cows POJ
    Problem B. Harvest of Apples HDU
    网络流模型整理
    The Shortest Statement CodeForces
    Vasya and Multisets CodeForces
  • 原文地址:https://www.cnblogs.com/zhengchunhao/p/5653534.html
Copyright © 2011-2022 走看看