一、集群storm版本:
storm version命令打出来的:
Storm 0.10.0.2.3.0.0-2557 URL git@github.com:hortonworks/storm.git -r 38fad7c05bd00ac4ca61b68abf7411d9abc6189c Branch (no branch) Compiled by jenkins on 2015-07-14T14:45Z From source with checksum 43c4b3baaad6a0bca88145356d46327
本地storm版本:apache-storm-0.10.1 注意版本和集群并不一致
storm-hbase jar包版本:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> <version>0.10.0</version> </dependency>
HBase version:
[root@node3 tmp]# hbase version
2016-07-08 14:02:14,137 INFO [main] util.VersionInfo: HBase 1.1.1.2.3.0.0-2557
2016-07-08 14:02:14,140 INFO [main] util.VersionInfo: Source code repository git://ip-10-0-0-89.ec2.internal/grid/0/jenkins/workspace/HDP-dal-centos6/bigtop/build/hbase/rpm/BUILD/hbase-1.1.1.2.3.0.0 revision=6a55f21850cfccf19fa651b9e2c74c7f99bbd4f9
2016-07-08 14:02:14,140 INFO [main] util.VersionInfo: Compiled by jenkins on Tue Jul 14 09:41:13 EDT 2015
2016-07-08 14:02:14,140 INFO [main] util.VersionInfo: From source with checksum 8f076e3255b10e166a73c2436c2b1706
二、本地模式下测试往HBase里写数据,拓扑定义代码如下,用了自带的HBaseBolt类
public class PersistTopology { private static final String KAFKA_SPOUT = "KAFKA_SPOUT"; private static final String HBASE_BOLT = "HBASE_BOLT"; public static void main(String[] args) throws Exception { /* define spout */ KafkaSpout kafkaSpout = new KafkaSpout(); System.setProperty("hadoop.home.dir", "E:\eclipse\"); /* define HBASE Bolt */ HBaseMapper mapper = new MyHBaseMapper(); HBaseBolt hbaseBolt = new HBaseBolt("testhbasebolt", mapper).withConfigKey("hbase.conf"); /* define topology*/ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(KAFKA_SPOUT, kafkaSpout); builder.setBolt(HBASE_BOLT, hbaseBolt, 2).shuffleGrouping(KAFKA_SPOUT); Config conf = new Config(); conf.setDebug(true); Map<String, Object> hbConf = new HashMap<String, Object>(); conf.put("hbase.conf", hbConf);if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(60000000); cluster.killTopology("test"); cluster.shutdown(); } } }
测试结果:520多万条数据,写了几个多小时,平均一秒钟才几百条
问题原因:看了HBaseBolt类的源码发现,此版本实现不是批量发的,如下,每收到一个tuple会调用execute函数,然后就直接batchMutate发出去了
@Override public void execute(Tuple tuple) { byte[] rowKey = this.mapper.rowKey(tuple); ColumnList cols = this.mapper.columns(tuple); List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); try { this.hBaseClient.batchMutate(mutations); } catch(Exception e){ this.collector.reportError(e); this.collector.fail(tuple); return; } this.collector.ack(tuple); }
二、下了一个apache-storm-1.0.1的原码发现execute函数的实现已经变成真正的批量发送,如下:
@Override public void execute(Tuple tuple) { boolean flush = false; try { if (TupleUtils.isTick(tuple)) { LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize); collector.ack(tuple); flush = true; } else { byte[] rowKey = this.mapper.rowKey(tuple); ColumnList cols = this.mapper.columns(tuple); List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); batchMutations.addAll(mutations); tupleBatch.add(tuple); if (tupleBatch.size() >= batchSize) { flush = true; } } if (flush && !tupleBatch.isEmpty()) { this.hBaseClient.batchMutate(batchMutations); LOG.debug("acknowledging tuples after batchMutate"); for(Tuple t : tupleBatch) { collector.ack(t); } tupleBatch.clear(); batchMutations.clear(); } } catch(Exception e){ this.collector.reportError(e); for (Tuple t : tupleBatch) { collector.fail(t); } tupleBatch.clear(); batchMutations.clear(); } }
batchSize可以设置,一旦当前数据量超过这个值就会被批量写入到HBase,同时,if (TupleUtils.isTick(tuple))这个目测是一种机制,隔一段时间bolt就会收到这样一个tick tuple,类似于一种
定时的机制,这样可保证到达这个时间后即使数据量不到batchsize这么多也能被及时写入,该值可以设置,通过代码或storm.yaml配置文件都可以,代码设置如下:
conf.put("hbase.conf", hbConf); //conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); // conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); // conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); // conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
配置文件设置如下,代码设置应该优先级更高(还没试过):
[root@node1 conf]# more storm.yaml |grep tuple topology.tick.tuple.freq.secs : 1
没有升级storm版本,直接在当前的版本里把新版本中优化的代码抄了过来,上集群测试。
测试结果:数据量和之前一样,但是有非常大的提升,之前需要5个多小时写入的数据,差不多二十分钟就写完了