zoukankan      html  css  js  c++  java
  • Storm实现数字累加Demo

      1 import java.util.Map;
      2 
      3 import backtype.storm.Config;
      4 import backtype.storm.LocalCluster;
      5 import backtype.storm.spout.SpoutOutputCollector;
      6 import backtype.storm.task.OutputCollector;
      7 import backtype.storm.task.TopologyContext;
      8 import backtype.storm.topology.OutputFieldsDeclarer;
      9 import backtype.storm.topology.TopologyBuilder;
     10 import backtype.storm.topology.base.BaseRichBolt;
     11 import backtype.storm.topology.base.BaseRichSpout;
     12 import backtype.storm.tuple.Fields;
     13 import backtype.storm.tuple.Tuple;
     14 import backtype.storm.tuple.Values;
     15 import backtype.storm.utils.Utils;
     16 
     17 /**
     18  * 数字累加求和
     19  * 先添加storm依赖
     20  * 
     21  * @author Administrator
     22  *
     23  */
     24 public class LocalTopologySum {
     25     
     26     
     27     /**
     28      * spout需要继承baserichspout,实现未实现的方法
     29      * @author Administrator
     30      *
     31      */
     32     public static class MySpout extends BaseRichSpout{
     33         private Map conf;
     34         private TopologyContext context;
     35         private SpoutOutputCollector collector;
     36         
     37         /**
     38          * 初始化方法,只会执行一次
     39          * 在这里面可以写一个初始化的代码
     40          * Map conf:其实里面保存的是topology的一些配置信息
     41          * TopologyContext context:topology的上下文,类似于servletcontext
     42          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     43          */
     44         @Override
     45         public void open(Map conf, TopologyContext context,
     46                 SpoutOutputCollector collector) {
     47             this.conf = conf;
     48             this.context = context;
     49             this.collector = collector;
     50         }
     51 
     52         int num = 1;
     53         /**
     54          * 这个方法是spout中最重要的方法,
     55          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     56          * 每调用一次,会向外发射一条数据
     57          */
     58         @Override
     59         public void nextTuple() {
     60             System.out.println("spout发射:"+num);
     61             //把数据封装到values中,称为一个tuple,发射出去
     62             this.collector.emit(new Values(num++));
     63             Utils.sleep(1000);
     64         }
     65         
     66         /**
     67          * 声明输出字段
     68          */
     69         @Override
     70         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     71             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     72             //fields中定义的参数和values中传递的数值是一一对应的
     73             declarer.declare(new Fields("num"));
     74         }
     75         
     76     }
     77     
     78     
     79     /**
     80      * 自定义bolt需要实现baserichbolt
     81      * @author Administrator
     82      *
     83      */
     84     public static class MyBolt extends BaseRichBolt{
     85         private Map stormConf; 
     86         private TopologyContext context;
     87         private OutputCollector collector;
     88         
     89         /**
     90          * 和spout中的open方法意义一样
     91          */
     92         @Override
     93         public void prepare(Map stormConf, TopologyContext context,
     94                 OutputCollector collector) {
     95             this.stormConf = stormConf;
     96             this.context = context;
     97             this.collector = collector;
     98         }
     99 
    100         int sum = 0;
    101         /**
    102          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    103          */
    104         @Override
    105         public void execute(Tuple input) {
    106             //input.getInteger(0);//也可以根据角标获取tuple中的数据
    107             Integer value = input.getIntegerByField("num");
    108             sum+=value;
    109             System.out.println("和:"+sum);
    110         }
    111         
    112         /**
    113          * 声明输出字段
    114          */
    115         @Override
    116         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    117             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    118             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    119         }
    120         
    121     }
    122     /**
    123      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    124      * @param args
    125      */
    126     public static void main(String[] args) {
    127         //组装topology
    128         TopologyBuilder topologyBuilder = new TopologyBuilder();
    129         topologyBuilder.setSpout("spout1", new MySpout());
    130         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    131         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
    132         
    133         //创建本地storm集群
    134         LocalCluster localCluster = new LocalCluster();
    135         Config config = new Config();
    136         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
    137     }
    138 
    139 }
  • 相关阅读:
    类之OCP(Open Closed Principle):开闭原则
    OO书籍 zz
    ObjectOriented Design Heuristics (zz)
    CSS3 filter(滤镜) 网站整体变灰色调
    js和jquery设置css样式的几种方法
    20条书写CSS代码
    js 调用向html追加内容
    移动端检测微信浏览器返回,关闭,进入后台操作
    防止表单重复提交的4种方法
    CSS浮动标准修复top塌陷和清除浮动及IE兼容标准格式
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/5774957.html
Copyright © 2011-2022 走看看