zoukankan      html  css  js  c++  java
  • 学习storm实现求和操作

    1 storm求和简单操作

     主要逻辑,就是spout发送数据源,blot进行处理数据,主要注意的点就是 spout这有个nextTuple自旋,和使用父类的declare..方法声明要发送到下游的名称,然后blot execute接受到进行执行

    1.1代码实现

    package com.xiaodao.big;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Map;
    
    /**
     * 累积求和
     */
    public class LocalSumStormTopology {
    
        /**
         * spout 需要继承baserichspout
         * 数据源需要产生并发送数据
         */
        public static class DataSourceSpout extends BaseRichSpout{
    
    
            private SpoutOutputCollector collector;
            /**
             * 初始化方法只会被调用一次
             *
             * @param conf 配置参数
             * @param context   上下文
             * @param collector 数据发射器
             */
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                this.collector = collector;
            }
    
                int num = 0;
            /**
             * 会产生数据,在生产上肯定是从消息队列中获取数据
             * 这个方法是一个死循环,是因为storm一直运行,会一直不行的执行
             */
            public void nextTuple() {
                collector.emit( new Values(num++));
                System.out.println("Spout:发送 "+ num);
                    Utils.sleep(2000);
    
            }
    
            /**
             * 声明下一个blot接受的名称,不然blot不知道接受到了什么
             * @param declarer
             */
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("no"));
            }
        }
    
        /**
         * 数据的累积求和 blot,接受数据,并处理
         */
        public static class SumBlot extends BaseRichBolt{
    
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            }
    
            int sum =0;
            /**
             * 也是一个自旋锁.(死循环)
             * @param input
             */
            public void execute(Tuple input) {
                //这里获取方式有很多
                Integer no =  input.getIntegerByField("no");
                sum +=no;
                System.out.println("Blot: sum = ["+ sum+"]");
    
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
        }
    
        public static void main(String[] args) {
            //任何一个作业都需要topology
            //需要控制好blot spout 顺序
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("DataSourceSpout",new DataSourceSpout(),1);
            builder.setBolt("SumBlot",new SumBlot(),1).shuffleGrouping("DataSourceSpout");
            Config conf = new Config();
            conf.setNumWorkers(2);
            //如果到200个消息就不发送了
            conf.setMaxSpoutPending(200);
            //创建一个本地的模式,不需要搭建
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalSumStormTopology",conf,builder.createTopology());
        }
    }
    View Code

     执行运行就可以

    一起交流进步.扫描下方QQ二维码即可

  • 相关阅读:
    HAProxy、Keepalived 在 Ocatvia 的应用实现与分析
    Octavia 的 HTTPS 与自建、签发 CA 证书
    Octavia 创建 loadbalancer 的实现与分析
    OpenStack Rally 质量评估与自动化测试利器
    自建 CA 中心并签发 CA 证书
    Failed building wheel for netifaces
    通过 vSphere WS API 获取 vCenter Datastore Provisioned Space 置备空间
    OpenStack Placement Project
    我们建了一个 Golang 硬核技术交流群(内含视频福利)
    没有图形界面的软件有什么用?
  • 原文地址:https://www.cnblogs.com/bj-xiaodao/p/10502756.html
Copyright © 2011-2022 走看看