zoukankan      html  css  js  c++  java
  • Storm- 使用Storm实现累积求和的操作

    需求:1+2+3+... = ???

    实现方案

      Spout发出数字作为input

      使用Bolt来处理业务逻辑:求和

      将结果输出到控制台

    拓扑设计:DataSourceSpout -->SumBolt→输出

    package com.imooc.bigdata;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Map;
    
    /**
     * 使用Storm实现累计求和的操作
     */
    public class LocalSumStormTopology {
        /**
         * spout 需要继承BaseRichSpout
         * 数据源需要产生数据并发射
         */
        public static class DataSourceSpout extends BaseRichSpout{
    
            private SpoutOutputCollector collector;
    
            /**
             * 初始化方法,只会被调用一次
             * @param conf 配置参数
             * @param context 上下文
             * @param collector 数据发射器
             */
            @Override
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                this.collector = collector;
            }
    
            int number = 0;
            /**
             * 会产生数据,在生产上肯定是从消息队列中获取数据
             *
             * 这个方法是一个死循环,会一直不停的执行
             */
            @Override
            public void nextTuple() {
                this.collector.emit(new Values(++number));
    
                System.out.println("Spout:"+number);
    
                //防止数据产生太快
                Utils.sleep(1000);
            }
    
            /**
             * 声明输出字段
             * @param declarer
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("num"));
            }
        }
    
    
        /**
         * 数据的累计求和Bolt:接收数据并处理
         */
        public static class SumBolt extends BaseRichBolt{
    
            /**
             * 初始化方法,会被执行一次
             * @param stormConf
             * @param context
             * @param collector
             */
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            }
    
            int sum= 0;
            /**
             * 其实也是一个死循环,职责:获取Spout发送过来的数据
             * @param input
             */
            @Override
            public void execute(Tuple input) {
    
                // Bolt中获取值可以根据index获取,也可以根据上一个环节中定义的field的名称获取(建议使用该方式)
                Integer value = input.getIntegerByField("num");
                sum += value;
    
                System.out.println("Bolt:sum = ["+sum +"]");
            }
    
            /**
             *
             * @param declarer
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }
        }
    
        public static void main(String[] args) {
    
            // TopologyBuilder根据Spout和Bolt来构建Topology
            // Storm中任何一个作业都是通过Topology的方式进行提交的
            // Topology中需要指定Spout和Bolt的执行顺序
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("DataSourceSpout", new DataSourceSpout());
            builder.setBolt("SumBolt", new SumBolt()).shuffleGrouping("DataSourceSpout");
    
            // 创建一个本地的Storm集群:本地模式运行,不需要搭建Storm集群
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalSumStormTopology", new Config(), builder.createTopology());
        }
    
    
    }
  • 相关阅读:
    Asp.Net下的DataGrid的多层表头(转自csdn)
    C#中使用DirectX编程 (转)
    Factory Method来实现数据库操作的类 (转) 原文:冷风.NET
    (转)关于定时器,介绍得很好!
    (原创)如何让web页面产生服务器数据返回后仍然能够保留到用户输入的位置!
    最近加入了控件开发团队,发现一些基础的东西,转发上来方便大家学习(转)
    中国共享软件走向国际指南(转,有感而发!)
    水晶报表官方实例大全 (转)
    用VS.NET2003制作WEB应用程序的安装包 (转)
    开发 Windows Mobile 应用程序: FAQ(最近买了ppc,正好打算学习开发使用。)
  • 原文地址:https://www.cnblogs.com/RzCong/p/9382940.html
Copyright © 2011-2022 走看看