zoukankan      html  css  js  c++  java
  • 使用Storm实现累加求和操作

    package com.csylh;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Map;
    
    /**
     * Description:使用Storm实现累加求和操作
     *
     * @author: 留歌36
     * Date:2018/9/3 16:50
     */
    public class LocalSumStormTopology {
        /**
         * Spout需要继承BaseRichSpout
         * 数据源需要产生数据并发射到Bolt
         */
        public static class DataSourceSpout extends BaseRichSpout{
            //定义一个发射器
            private SpoutOutputCollector collector;
    
            /**
             * 初始化方法 只是会被调用一次
             * @param conf     配置参数
             * @param context 上下文:相当于一个框 可以从里面获取许多东西
             * @param collector 数据发射器
             */
            @Override
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                //将传入的collector发射器 对私有变量 进行赋初值
                this.collector = collector;
            }
            int number = 0;
            /**
             *  用于产生数据
             *  生产中肯定是从消息队列中获取数据
             *  这个方法是一个死循环
             */
            @Override
            public void nextTuple() {
                //发送方式,调用上面定义的数据发射器
                this.collector.emit(new Values(number++));
    
                System.out.println("Spout==》发送的数据:" + number);
                //每隔1s中发射一次,防止数据产生太快
                Utils.sleep(1000);
    
            }
            /**
             * 声明输出字段
             * @param declarer
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("num"));
            }
        }
    
        /**
         * Bolt需要继承BaseRichBolt
         * 用于接收数据并对数据进行处理
         */
        public static class SumBolt extends BaseRichBolt{
            /**
             *  初始化方法 ,会被执行一次
             * @param stormConf
             * @param context
             * @param collector  这里的数据发射器,由于业务逻辑中没有没有必要进行放下发的操作,所以就是没有必要进行new一个
             */
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            }
            int sum = 0;
            /**
             * 也是一个死循环 ,职责: 获取Spout发射过来的数据
             * @param input
             */
            @Override
            public void execute(Tuple input) {
    
               //Bolt中获取值可以通过index获取
                // 也可以根据上一个环节中定义的filed的名称获取(***推荐)
              Integer value = input.getIntegerByField("num");
              sum += value;
    
              System.out.println("Bolt :Sum = ["+ sum + "]");
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
        }
    
        public static void main(String[] args) {
    
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout("DataSourceSpout",new DataSourceSpout());
            builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
    
            //创建一个本地的Storm集群 ,本地模式运行,不需要搭建Storm集群
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
    
        }
    
    }
    
    
  • 相关阅读:
    驱动控制浏览器 和排程算法
    Python简单人脸识别,可调摄像头,基础入门,先简单了解一下吧
    机器学习
    “一拖六”屏幕扩展实战
    Apple iMac性能基准测试
    IDC机房KVM应用案例分析
    突破极限 解决大硬盘上安装Unix新思路
    Domino系统从UNIX平台到windows平台的迁移及备份
    走进集装箱数据中心(附动画详解)
    企业实战之部署Solarwinds Network八部众
  • 原文地址:https://www.cnblogs.com/liuge36/p/9882766.html
Copyright © 2011-2022 走看看