最近在做一个jstorm的程序。我的jstorm程序消费一个kafka主题,根据数据逻辑判断需要往下游哪几个kafka主题的生产者发送。
1、bolt的execute(Tuple input)方法每次接收一条,处理好对应的DTO的json数据的话需要将数据发送到下游kafka主题。我观察到producer发送数据需要的时间比较多,一般发一次都要800毫秒。所以用了一个队列ArrayBlockingQueue存储固定数量批量发送。批量处理的条数不能太大,因为一次性发送到kafka的数据条数*单条数据大小不能超过限制,我公司设置的不能超过1MB;
配置参数的计算公式:spout_pending*spout数量/ (bolt数量*批量处理条数) 等于1.2左右
2、spout数量和分区数一致,设大了没有意义;
3、jstorm有一个系统自带的发系统级别的tuple,只需要设置全局参数即可:
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10); //表示每隔10秒storm会给Topology中的所有bolt发射一个系统级别的tuple
在execute方法中能判断是否是系统tuple: StringUtils.equals(input.getSourceComponent(), "__system");
4、一个 workers 可以理解为一个jvm进程(也就是一个节点), task 理解为运行任务的线程。task就是一个 spout 或者一个bolt