zoukankan      html  css  js  c++  java
  • Storm系列(十)聚流示例

    功能:将多个数据源的数据汇集到一个处理单元进行集中分类处理;

    入口类TestMain

    public class TestMain {
     
        public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("random1", new RandomWordSpout1(), 1);
            builder.setSpout("random2", new RandomWordSpout2(), 1);
            builder.setSpout("random3", new RandomWordSpout3(), 1);
            builder.setBolt("", new TransferBolt(), 1)
                    .localOrShuffleGrouping("random1", "stream1")
    10                  .localOrShuffleGrouping("random2", "stream2")
    11                  .localOrShuffleGrouping("random3", "stream3");
    12   
    13          Config conf = new Config();
    14          conf.setDebug(false);
    15          conf.setNumWorkers(1);
    16          LocalCluster cluster = new LocalCluster();
    17          cluster.submitTopology("test-1", conf, builder.createTopology());
    18      }
    19  }

    数据源类RandomWordSpout1 输出字段为name

    public class RandomWordSpout1 extends BaseRichSpout {
     
        private static final long serialVersionUID = -4287209449750623371L;
     
        private SpoutOutputCollector collector;
     
        @Override
        public void open(@SuppressWarnings("rawtypes") Map conf,
                TopologyContext context, SpoutOutputCollector collector) {
    10          this.collector = collector;
    11      }
    12   
    13      @Override
    14      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    15          declarer.declareStream("stream1", new Fields("name"));
    16      }
    17   
    18      @Override
    19      public void nextTuple() {
    20          collector.emit("stream1", new Values("RandomWordSpout1"));
    21      }
    22   
    23  }

    数据源类RandomWordSpout2 输出字段为content

    public class RandomWordSpout2 extends BaseRichSpout {
     
        private SpoutOutputCollector collector;
       
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
     
    10      @Override
    11      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    12          declarer.declareStream("stream2", new Fields("content"));
    13      }
    14   
    15      @Override
    16      public void nextTuple() {
    17          collector.emit("stream2",new Values("RandomWordSpout2"));
    18      }
    19   
    20  }

    数据源类RandomWordSpout3输出key、value两个字段

    public class RandomWordSpout3 extends BaseRichSpout {
     
        private SpoutOutputCollector collector;
     
        @Override
        public void open(@SuppressWarnings("rawtypes") Map conf,
                TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
    10   
    11      @Override
    12      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    13          declarer.declareStream("stream3", new Fields("key", "value"));
    14      }
    15   
    16      @Override
    17      public void nextTuple() {
    18          collector.emit("stream3", new Values("chenx","happyday"));
    19         
    20      }
    21   
    22  }

    聚流处理类TransferBolt,输出从各流获取到的数据

    public class TransferBolt extends BaseBasicBolt {
     
        private static final long serialVersionUID = 4223708336037089125L;
        private Map<String, Fields> _fieldMap = null;
     
        @Override
        public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
                TopologyContext context) {
            _fieldMap = new HashMap<String, Fields>();
    10          Set<GlobalStreamId> sourceSet = context.getThisSources().keySet();
    11          for (GlobalStreamId source : sourceSet) {
    12              Fields fields = context.getComponentOutputFields(
    13                      source.get_componentId(), source.get_streamId());
    14              _fieldMap.put(source.get_componentId() + source.get_streamId(),
    15                      fields);
    16          }
    17   
    18      }
    19   
    20      @Override
    21      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    22      }
    23   
    24      @Override
    25      public void execute(Tuple input, BasicOutputCollector collector) {
    26          String key = input.getSourceComponent() + input.getSourceStreamId();
    27          Fields fields = _fieldMap.get(key);
    28          int size = fields.size();
    29          String content = "";
    30          for (int i = 0; i < size; i++) {
    31              content += input.getStringByField(fields.get(i));
    32          }
    33          System.out.println("SourceComponent:" + input.getSourceComponent()
    34                  + ",SourceStreamId:" + input.getSourceStreamId() + ",content:"
    35                  + content);
    36      }
    37   
    38  }
  • 相关阅读:
    简明Secure boot介绍
    密码学有什么用?
    mkimage, no such file or dir
    嵌入式系统安全简介
    希尔排序
    jQuery选择器
    css 选择器
    安装 SQL Server 2008 和管理工具 SQL Server 2008 management studio 及相关问题解决
    mac下安装安卓开发环境
    IOS开发小记-内存管理
  • 原文地址:https://www.cnblogs.com/jianyuan/p/4830839.html
Copyright © 2011-2022 走看看