1、拓扑(Topology):
builder.setBolt(TRANSFORM_BOLT, new TransformationBolt(), 1).shuffleGrouping(MY_SPOUT); builder.setBolt(PROCESS1_BOLT, new FirstProcessBolt(), 1).shuffleGrouping(TRANSFORM_BOLT, "StreamOne"); builder.setBolt(PROCESS2_BOLT, new SecondProcessBolt(), 1).shuffleGrouping(TRANSFORM_BOLT, "StreamTwo"); builder.setBolt(PROCESS3_BOLT, new ThirdProcessBolt(), 1).shuffleGrouping(TRANSFORM_BOLT, "StreamThree");
2、Bolt:
@Override
private void execute(Tuple input) {
// perform some logic
collector.emit("StreamOne", input, new Values(...));
collector.emit("StreamTwo", input, new Values(...));
collector.emit("StreamThree", input, new Values(...));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("StreamOne", new Fields(...));
declarer.declareStream("StreamTwo", new Fields(...));
declarer.declareStream("StreamThree", new Fields(...));
}