zoukankan      html  css  js  c++  java
  • 学习storm实现求和操作

    1 storm求和简单操作

     主要逻辑,就是spout发送数据源,blot进行处理数据,主要注意的点就是 spout这有个nextTuple自旋,和使用父类的declare..方法声明要发送到下游的名称,然后blot execute接受到进行执行

    1.1代码实现

    package com.xiaodao.big;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Map;
    
    /**
     * 累积求和
     */
    public class LocalSumStormTopology {
    
        /**
         * spout 需要继承baserichspout
         * 数据源需要产生并发送数据
         */
        public static class DataSourceSpout extends BaseRichSpout{
    
    
            private SpoutOutputCollector collector;
            /**
             * 初始化方法只会被调用一次
             *
             * @param conf 配置参数
             * @param context   上下文
             * @param collector 数据发射器
             */
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                this.collector = collector;
            }
    
                int num = 0;
            /**
             * 会产生数据,在生产上肯定是从消息队列中获取数据
             * 这个方法是一个死循环,是因为storm一直运行,会一直不行的执行
             */
            public void nextTuple() {
                collector.emit( new Values(num++));
                System.out.println("Spout:发送 "+ num);
                    Utils.sleep(2000);
    
            }
    
            /**
             * 声明下一个blot接受的名称,不然blot不知道接受到了什么
             * @param declarer
             */
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("no"));
            }
        }
    
        /**
         * 数据的累积求和 blot,接受数据,并处理
         */
        public static class SumBlot extends BaseRichBolt{
    
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            }
    
            int sum =0;
            /**
             * 也是一个自旋锁.(死循环)
             * @param input
             */
            public void execute(Tuple input) {
                //这里获取方式有很多
                Integer no =  input.getIntegerByField("no");
                sum +=no;
                System.out.println("Blot: sum = ["+ sum+"]");
    
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
        }
    
        public static void main(String[] args) {
            //任何一个作业都需要topology
            //需要控制好blot spout 顺序
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("DataSourceSpout",new DataSourceSpout(),1);
            builder.setBolt("SumBlot",new SumBlot(),1).shuffleGrouping("DataSourceSpout");
            Config conf = new Config();
            conf.setNumWorkers(2);
            //如果到200个消息就不发送了
            conf.setMaxSpoutPending(200);
            //创建一个本地的模式,不需要搭建
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalSumStormTopology",conf,builder.createTopology());
        }
    }
    View Code

     执行运行就可以

    一起交流进步.扫描下方QQ二维码即可

  • 相关阅读:
    Shell流程控制语句if
    Linux命令之read
    Docker容器(六)——创建docker私有化仓库
    Docker容器(五)——Docker静态化IP
    Docker容器(四)——常用命令
    python笔记
    iOS应用性能调优建议
    QQ音乐项目(OC版)
    iOS : 静态库制作
    iOS : 静态库(.framework)合并
  • 原文地址:https://www.cnblogs.com/bj-xiaodao/p/10502756.html
Copyright © 2011-2022 走看看