zoukankan      html  css  js  c++  java
  • storm定时器

    package com.example.mail;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    
    public class Main {
      public static void main(String[] args) {
        // 组装topology
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout1", new MySpout());
        // .shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
        topologyBuilder.setBolt("bolt1", new EmailBolt()).shuffleGrouping("spout1");
    
        // 创建本地storm集群
        LocalCluster localCluster = new LocalCluster();
        Config config = new Config();
    
        localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
      }
    }
    package com.example.mail;
    
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    public class MySpout extends BaseRichSpout {
      private Map conf;
      private TopologyContext context;
      private SpoutOutputCollector collector;
      
      /**
       * 初始化方法,只会执行一次
       * 在这里面可以写一个初始化的代码
       * Map conf:其实里面保存的是topology的一些配置信息
       * TopologyContext context:topology的上下文,类似于servletcontext
       * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
       */
      @Override
      public void open(Map conf, TopologyContext context,
              SpoutOutputCollector collector) {
          this.conf = conf;
          this.context = context;
          this.collector = collector;
      }
    
      int num = 1;
      /**
       * 这个方法是spout中最重要的方法,
       * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
       * 每调用一次,会向外发射一条数据
       */
      @Override
      public void nextTuple() {
          System.out.println("spout发射:"+num);
          //把数据封装到values中,称为一个tuple,发射出去
          this.collector.emit(new Values(num++));
          Utils.sleep(1000);
      }
      
      /**
       * 声明输出字段
       */
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
          //给values中的数据起个名字,方便后面的bolt从这个values中取数据
          //fields中定义的参数和values中传递的数值是一一对应的
          declarer.declare(new Fields("num"));
      }
    }
    package com.example.mail;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class EmailBolt extends BaseRichBolt {
    
    
      private transient OutputCollector collector;
    
    
      @Override
      public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
      }
    
      @Override
      public void execute(Tuple input) {
        System.out.println(input.getSourceComponent());
        System.out.println(input.getSourceStreamId());
        if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
          System.out.println("定时任务执行了。");
    
        }
      }
    
      @Override
      public Map<String, Object> getComponentConfiguration() {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        return hashMap;
      }
    
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // last bolt in flow, no further processing
      }
    
    }

    23616 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Preparing bolt __system:(-1)
    23629 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Prepared bolt __system:(-1)
    23633 [Thread-20-spout1-executor[3 3]] INFO o.a.s.d.executor - Opening spout spout1:(3)
    23637 [Thread-20-spout1-executor[3 3]] INFO o.a.s.d.executor - Opened spout spout1:(3)
    23642 [Thread-20-spout1-executor[3 3]] INFO o.a.s.d.executor - Activating spout spout1:(3)
    spout发射:1
    23670 [Thread-22-__acker-executor[1 1]] INFO o.a.s.d.executor - Preparing bolt __acker:(1)
    23673 [Thread-22-__acker-executor[1 1]] INFO o.a.s.d.executor - Prepared bolt __acker:(1)
    23694 [Thread-18-bolt1-executor[2 2]] INFO o.a.s.d.executor - Preparing bolt bolt1:(2)
    23696 [Thread-18-bolt1-executor[2 2]] INFO o.a.s.d.executor - Prepared bolt bolt1:(2)
    spout1
    default
    spout发射:2
    spout1
    default
    spout发射:3
    spout1
    default
    spout发射:4
    spout1
    default
    spout发射:5
    spout1
    default
    spout发射:6
    spout1
    default
    spout发射:7
    spout1
    default
    spout发射:8
    spout1
    default
    spout发射:9
    spout1
    default
    spout发射:10
    spout1
    default
    spout发射:11
    spout1
    default
    __system
    __tick
    定时任务执行了。
    spout发射:12
    spout1
    default
    spout发射:13
    spout1
    default
    spout发射:14
    spout1
    default
    spout发射:15
    spout1
    default
    spout发射:16
    spout1
    default
    spout发射:17
    spout1
    default
    spout发射:18
    spout1
    default
    spout发射:19
    spout1
    default
    spout发射:20
    spout1
    default
    __system
    __tick
    定时任务执行了。

  • 相关阅读:
    python之面向对象函数与方法,反射,双下方法
    python之面向对象的成员,方法,属性,异常处理
    python之面向对象性封装,多态,以及鸭子类型
    python之面向对象三大特性: 继承(单继承)
    AMAP-TECH算法大赛开赛!基于车载视频图像的动态路况分析
    深度学习在高德ETA应用的探索与实践
    高德SD地图数据生产自动化技术的路线与实践(道路篇)
    高德前端这五年:动态化技术的研发历程和全面落地实践
    深度学习在高德POI鲜活度提升中的演进
    高德技术评测建设之路
  • 原文地址:https://www.cnblogs.com/tonggc1668/p/9036866.html
Copyright © 2011-2022 走看看