zoukankan      html  css  js  c++  java
  • Storm 测试

    本文将学习如何使用java创建Storm拓扑

    Storm集群的组件

    Storm集群类似于Hadoop集群,只不过 Hadoop 上运行"MapReduce jobs", Storm 上运行"topologies"。
    两者最大的差别是,MapReducejobs 最终是完成的,而 topologies 是一直处理消息(或直到你杀死它)。

    集群 任务名称 任务时效性
    Storm topologies(拓扑) 一直处理消息(或直到你杀死它)
    Hadoop MapReduce jobs 最终是完成的

    Storm集群上有两种节点:master 和 worker 节点

    • master:
      运行一个名为 Nimbus 的守护进程,
      负责在集群周围分发代码,
      为机器分配任务以及监控故障。
      (类似 Hadoop 的 JobTracker)
    • worker:
      运行一个名为 Supervisor 的守护进程,
      负责监听、并根据需要启动、停止 "Nimbus" 分配给其的任务。
      每个工作进程都执行拓扑的子集。 运行拓扑由分布在许多计算机上的许多工作进程组成。

    Nimbus 和 Supervisors 之间的协调是通过 Zookeeper 实现的。
    此外,Nimbus 守护程序和 Supervisors 守护程序是 fail-fast 和 stateless;
    所有状态都保存在Zookeeper或本地磁盘上。这意味着你可以通过 kill -9 杀死 Nimbus 或者 Supervisors ,但是它们会像没事一样重新开始。
    这种设计使Storm集群非常稳定。
    Storm集群的组件.png

    Topologies

    要想在 Storm 上进行实时计算,你需要创建一个 topologies 。
    topologies 是一个计算图,topologies中的每个节点包含计算逻辑,并且通过节点之间的连接定义了数据在节点之间的流动方向。

    运行拓扑很简单。首先,将所有代码和依赖项打包到一个jar中。然后,运行如下命令:

    storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
    额 这个命令没啥好解释的....
    

    Streams

    Stream 是一个无限的元组序列,是 Storm 抽象的核心。
    Storm 提供了以分布式和可靠的方式进行 Stream 传换的原语。
    比如将微博 Stream 转换为转换热门主题 Stream。

    Storm 为进行 Stream 转换提供 spouts 和 bolt 两个基本原语。

    • spouts
      spouts 是 Stream 的来源。
      例如,spout可以读取Kestrel队列中的元组并将其作为 Stream 发出。或者 spout 可以连接到Twitter API并发出推文流。
    • bolt
      bolt会消耗任意数量的输入流,进行一些处理,并可能发出新的流。
      像从推文流计算趋势主题流之类的复杂流转换,需要多个步骤,因此需要多个 bolt 。
      bolt 可以执行任何操作,包括运行函数,过滤元组,进行流聚合,进行流连接,与数据库对话等等。

    spout 和 bolt 网络被打包成一个 topology ,这是提交给 Storm 集群执行的顶级抽象。
    拓扑是流转换的图形,其中每个节点都是一个 spout 或 bolt 。
    图中的表示哪些 bolt 订阅了哪些流。
    当一个 spout 或 bolt 向一个流发出一个元组时,它会将元组发送给订阅该流的每个 bolt 。

    Streams 抽象.png

    拓扑中节点之间的链接指示应如何传递元组。
    如上图,Spout A 和Bolt B 之间有链接,Spout A 到 Bolt C 之间有链接,以及从 Bolt B 到 Bolt C 之间有链接。
    那么每次 Spout A 发出一个元组时,它都会将元组发送给 Bolt B 和 Bolt C .所有 Bolt B 的输出元组也将送给 Bolt C.

    Storm拓扑中的每个节点并行执行。
    在拓扑中,你可以为每个节点指定所需的并行度,Storm将在集群中生成该数量的线程以执行。

    拓扑会一直执行(或直到你杀死它)。
    Storm会自动重新分配失败的任务。
    此外,Storm保证不会丢失数据,即使计算机出现故障并且消息丢失也是如此。


    Data model

    Storm使用元组作为其数据模型。
    元组是一个命名的值列表,元组中的字段可以是任何类型的对象。
    Storm支持所有原始类型,字符串和字节数组作为元组字段值。
    要使用其他类型的对象,需要为该类型实现一个序列化程序。

    拓扑中的每个节点都必须声明它发出的元组的输出字段。
    例如下面代码中的bolt 声明它发出2元组,字段为 "double" 和 "triple"

    package com.aaa.test;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     * @author lillcol
     * 2019/7/18-11:46
     */
    public class DoubleAndTripleBolt extends BaseRichBolt {
        private OutputCollector _collector;
    
        @Override
        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }
    
        @Override
        public void execute(Tuple input) {
            int val = input.getInteger(0);
            _collector.emit(input, new Values(val*2, val*3));
            _collector.ack(input);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("double", "triple"));//声明["double", "triple"]组件的输出字段
        }
    }
    

    一个简单的拓扑(A simple topology)

    如何实现一个简单的拓扑?
    本地 idea测试
    sbt构建

    // libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"
    libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"
    

    定义一个Spout,此处采用随机数

    package com.test.storm;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Map;
    import java.util.Random;
    
    /**
     * @author lillcol
     * 2019/7/18-12:03
     */
    public class TestWordSpout extends BaseRichSpout {
        private SpoutOutputCollector collector;
    
        @Override
        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void nextTuple() {
            //Spouts负责向拓扑中发送新消息
            Utils.sleep(100);
            //每隔100ms就会从列表中随机选一个单词发出
            final String[] words = new String[]{"hellow", "lillcol", "study", "storm"};
            final Random rand = new Random();
            final String word = words[rand.nextInt(words.length)];
            collector.emit(new Values(word));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }
    
    

    定义Bolt,功能接收到的信息追加"levelUp!"

    package com.test.storm;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     * @author lillcol
     * 2019/7/18-12:04
     */
    public class ExclamationBolt extends BaseRichBolt {
        OutputCollector collector;
    
       //prepare方法为 Bolt 提供了一个OutputCollector用于从 Bolt 中发出元组 。
       //元组可以随时的从prepare,execute,cleanup,甚至在另一个线程中异步发出。
       //当前prepare实现只是将OutputCollector作为实例变量保存,以便稍后在execute方法中使用。
        @Override
        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
            this.collector =collector;
        }
        //execute方法从一个Bolt的输入接收一个元组。
        //此execute取数组的第一个字段并发出追加字符串“levelUp!” 得字符串。
        //如果您实现了一个订阅多个输入源的bolt,您可以通过使用Tuple#getSourceComponent方法找出Tuple来自哪个组件。
        @Override
        public void execute(Tuple input) {
            String sourceComponent = input.getSourceComponent();
            //输入元组作为第一个参数传递emit
            collector.emit(input, new Values(input.getString(0) + "levelUp!"));
            System.out.println(input.getString(0));
            // 输入元组在最后一行被激活。这些是Storm的可靠性API的一部分,用于保证不会丢失数据
            collector.ack(input);
        }
    
        //declareOutputFields方法声明ExclamationBolt发出1元组,其中一个字段称为“word”。
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }
    
    //如果implements IRichBol
    //还需要重写下面两个方法
    //当Bolt被关闭时调用cleanup方法,并且应该清除所有打开的资源。
    //无法保证在集群上调用此方法:例如,如果任务正在运行的计算机爆炸,则无法调用该方法。
     @Override
        public void cleanup() {
        }
    //getComponentConfiguration方法允许配置此组件运行方式的各个方面    
    @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    //但是一般情况下我们不需要这两个方法,所以我们可以通过继承BaseRichBolt来定义Bolt
    

    定义调用类

    package com.test.storm;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.shade.org.apache.jute.Utils;
    import org.apache.storm.topology.TopologyBuilder;
    
    
    /**
     * @author lillcol
     * 2019/7/18-12:03
     */
    public class SimpleTopology {
        public static void main(String[] args) throws Exception {
            SimpleTopology topology = new SimpleTopology();
            topology.runLocal(60);
    
        }
    
        public void runLocal(int waitSeconds) throws Exception {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //第一个参数是给Spout一个id "words", 
            //第二个参数是要调用的Spout类
            //第三个参数是节点所需的并行度,是可选的。它指示应在群集中执行该组件的线程数。如果省略它,Storm将只为该节点分配一个线程。
            topologyBuilder.setSpout("words", new TestWordSpout(), 1);
            //Bolt的参数与Spout
            //只是要通过shuffleGrouping 指定数据来源"words")
            //“shuffle grouping”意味着元组应该从输入任务随机分配到bolt的任务中。
            topologyBuilder.setBolt("DoubleAndTripleBolt1", new ExclamationBolt(), 1)
                    .shuffleGrouping("words");
            //一个Bolt可以接收多个数据来源,是要多次调用shuffleGrouping即可       
            topologyBuilder.setBolt("DoubleAndTripleBolt2", new ExclamationBolt(), 1)
                    .shuffleGrouping("DoubleAndTripleBolt1")
                    .shuffleGrouping("words");
    
            //loacl 测试
            Config config = new Config();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word_count", config, topologyBuilder.createTopology());
    
            org.apache.storm.utils.Utils.sleep(1000*10);
            cluster.killTopology("word_count");
            cluster.shutdown();
        }
    }
    
    
    运行结果:
    study
    study
    studylevelUp!
    study
    study
    studylevelUp!
    hellow
    hellow
    hellowlevelUp!
    lillcol
    lillcol
    lillcollevelUp!
    hellow
    hellow
    hellowlevelUp!
    hellow
    hellow
    hellowlevelUp!
    lillcol
    lillcol
    lillcollevelUp!
    . . .
    

    异常

    可能出现异常1:

    java.lang.NoClassDefFoundError: org/apache/storm/topology/IRichSpout
    	at java.lang.Class.getDeclaredMethods0(Native Method)
    	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    	at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    	at java.lang.Class.getMethod0(Class.java:3018)
    	at java.lang.Class.getMethod(Class.java:1784)
    	at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    	at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
    Caused by: java.lang.ClassNotFoundException: org.apache.storm.topology.IRichSpout
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 7 more
    Error: A JNI error has occurred, please check your installation and try again
    
    这个是因为在sbt构建的时候  % "provided" 意思是已提供相关jar,但是我们idea测试的时候并没有相关jar
    libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"
    
    所以不能用上面的语句,改成下面的即可
    libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"
    
    maven 对应着改就可以了
    

    可能出现异常2:

    Exception in thread "main" java.lang.ExceptionInInitializerError
    	at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
    	at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
    	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
    	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    	at org.apache.storm.LocalCluster.<clinit>(LocalCluster.java:128)
    	at com.test.storm.SimpleTopology.runLocal(SimpleTopology.java:28)
    	at com.test.storm.SimpleTopology.main(SimpleTopology.java:16)
    Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
    	at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
    	... 7 more
    	
    错误报的很明显
    log4j-over-slf4j.jar AND slf4j-log4j12.jar  冲突了
    我的解决办法是在测试的时候随便删掉一个,但是生产的时候在可能冲突的依赖中把它去掉
    

    Storm 的 的hellow word(word count)
    //定义Spout WordReader
    package com.test.storm;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.Map;
    
    /**
     * @author lillcol
     * 2019/7/19-9:17
     */
    public class WordReader extends BaseRichSpout {
        private SpoutOutputCollector collector;
        private FileReader fileReader;
        private boolean completed = false;
    
        /**
         * open方法,接收三个参数:
         * 第一个是创建Topology的配置,
         * 第二个是所有的Topology数据
         * 第三个是用来把Spout的数据发射给bolt
         **/
        @Override
        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
            try {
                //获取创建Topology时指定的要读取的文件路径
                this.fileReader = new FileReader(conf.get("wordsFile").toString());
            } catch (FileNotFoundException e) {
                throw new RuntimeException("Error reading file ["
                        + conf.get("wordFile") + "]");
            }
            //初始化发射器
            this.collector = collector;
        }
    
        /**
         * nextTuple是Spout最主要的方法:
         * 在这里我们读取文本文件,并把它的每一行发射出去(给bolt)
         * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下
         **/
        @Override
        public void nextTuple() {
    //如果要看到tail效果,去掉这个  if (completed) 语句块,我测试的时候不去掉看不到效果只有报错
            if (completed) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return;
            }
            String str;
            BufferedReader bufferedReader = new BufferedReader(fileReader);
            try {
                while ((str = bufferedReader.readLine()) != null) {
                    //发送一行
                    collector.emit(new Values(str), str);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                completed = true;
            }
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }
    
    
    //定义Bolt WordSplit 实现切割
    package com.test.storm;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author lillcol
     * 2019/7/19-9:38
     */
    public class WordSplit implements IRichBolt {
        private OutputCollector collector;
    
    
        @Override
        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        /**
         * execute是bolt中最重要的方法:
         * 当接收到一个tuple时,此方法被调用
         * 这个方法的作用就是把接收到的每一行切分成单个单词,并把单词发送出去(给下一个bolt处理)
         **/
        @Override
        public void execute(Tuple input) {
            String line = input.getString(0);
            String[] words = line.split(",| |\|");
            for (String word : words) {
                word = word.trim();
                if (!word.isEmpty()) {
                    List a = new ArrayList();
                    a.add(input);
                    collector.emit(a, new Values(word));
                }
            }
            collector.ack(input);
        }
    
        @Override
        public void cleanup() {
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    
    
    //定义Bolt WordCounter 实现统计
    package com.test.storm;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author lillcol
     * 2019/7/19-10:01
     */
    public class WordCounter implements IRichBolt {
        Integer id;
        String name;
        Map<String, Integer> counters;
        private OutputCollector collector;
    
        @Override
        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
            this.counters = new HashMap<String, Integer>();
            this.collector = collector;
            this.name = context.getThisComponentId();
            this.id = context.getThisTaskId();
        }
    
        @Override
        public void execute(Tuple input) {
            String str = input.getString(0);
            if (!counters.containsKey(str)) {
                counters.put(str, 1);
            } else {
                counters.put(str, counters.get(str) + 1);
            }
    //如果要看到tail效果,应该在这里打印统计信息
    // System.out.println(str+":"+counters.get(str) );
            collector.ack(input);
        }
    
    //这里只是最后一次打印,要tail效果不应该在这里打印统计信息。
        @Override
        public void cleanup() {
            System.out.println("--Word Counter [" + name + "-" + id + "] --");
            for (Map.Entry<String, Integer> entry : counters.entrySet()) {
                System.out.println(entry.getKey() + ":" + entry.getValue());
            }
            counters.clear();
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    
    
    //定义主类
    package com.test.storm;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    
    /**
     * @author lillcol
     * 2019/7/19-10:33
     */
    public class WordCountTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("wordReader",new WordReader(),1);
            topologyBuilder.setBolt("WordSplit",new WordSplit(),1)
                    .shuffleGrouping("wordReader");
            topologyBuilder.setBolt("",new WordCounter(),2)
                    .shuffleGrouping("WordSplit");
    
            //配置
            Config config = new Config();
            config.put("wordsFile","D:\stromFile");
            config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            //创建一个本地模式cluster
            LocalCluster localCluster = new LocalCluster();
            //提交Topology
            localCluster.submitTopology("WordCountTopology",config,topologyBuilder.createTopology());
    
            Thread.sleep(2000);//这个时间要控制好,太短看不到效果
            localCluster.shutdown();
    
        }
    }
    //输出结果
    11:35:16.845 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shutting down executor :[2, 2]
    11:35:16.845 [Thread-37--executor[2, 2]] INFO  o.a.s.u.Utils - Async loop interrupted!
    --Word Counter [-2] --
    Thread[SLOT_1027:73
    40673ms:1
    11:34:31.865:1
    30724ms:1
    11:34:23.065:1
    11:34:27.365:1
    . . .
    
    
    11:35:16.846 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shut down executor :[2, 2]
    11:35:16.846 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shutting down executor :[1, 1]
    11:35:16.846 [Thread-38--executor[1, 1]] INFO  o.a.s.u.Utils - Async loop interrupted!
    --Word Counter [-1] --
    Thread[SLOT_1027:87
    11:34:31.465:1
    29524ms:1
    26024ms:1
    . . . 
    
  • 相关阅读:
    CPU密集型、IO密集型
    打印在一行
    python多线程、线程锁
    WPS--world使用格式刷
    pycharm连接linux版python
    Pycharm连接windows上python
    一台windows主机上运行2个tomcat
    启动django应用报错 “Error: [WinError 10013] 以一种访问权限不允许的方式做了一个访问套接字的尝试。”
    ORA-03206,当表空间不够时,如何以添加数据文件的方式扩展表空间
    利用拷贝data目录文件的方式迁移mysql数据库
  • 原文地址:https://www.cnblogs.com/lillcol/p/11212383.html
Copyright © 2011-2022 走看看