FixedBatchSpout 继承自 IBatchSpout
IBatchSpout 方法
public interface IBatchSpout extends Serializable { void open(Map conf, TopologyContext context); void emitBatch(long batchId, TridentCollector collector); void ack(long batchId); void close(); Map getComponentConfiguration(); Fields getOutputFields(); }
FixedBatchSpout代码 package storm.trident.testing; import backtype.storm.Config; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import java.util.List; import java.util.Map; import storm.trident.operation.TridentCollector; import storm.trident.spout.IBatchSpout; public class FixedBatchSpout implements IBatchSpout { Fields fields; List<Object>[] outputs; int maxBatchSize; public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) { this.fields = fields; // 输出字段 this.outputs = outputs; // 保存至本地, 每个对象都是一个List<Object> this.maxBatchSize = maxBatchSize; // 该批次最大发射次数,但是不是唯一决定元素 } int index = 0; boolean cycle = false; public void setCycle(boolean cycle) { this.cycle = cycle; } @Override public void open(Map conf, TopologyContext context) { index = 0; }
// trident调用 @Override public void emitBatch(long batchId, TridentCollector collector) { //Utils.sleep(2000); if(index>=outputs.length && cycle) { index = 0; // 超过下标后,让index归零, 继续循环发送 } // 在不超过outputs大小的情况下,每次发射一个List<Object> for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) { collector.emit(outputs[index]); } } @Override public void ack(long batchId) { } @Override public void close() { } @Override public Map getComponentConfiguration() { Config conf = new Config(); conf.setMaxTaskParallelism(1); // 最大并行度,默认是1. 好像没提供接口来修改, 很奇怪。 return conf; } @Override public Fields getOutputFields() { return fields ; // 输出字段 } }
外部使用
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1,
new Values("ab ab ab ab ab ab ab ab ab ab")); // 这里设置为1,表示每批只发送一个List<Value>,但是设置更大,也不会出错,参见上面的代码注释,它要同时满足不超过数组大小,所以不会越界。
spout.setCycle(true); // 设置则表示会一直发送,如果不用它一直发射, 可以注释掉。
其他就是trident内部调用。
如分析有误,请指出,谢谢。。