zoukankan      html  css  js  c++  java
  • storm(3)-本机模式-helloworld

    pom.xml

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.2.2</version>
        <!-- 本机模式,需要storm-core,集群模式不需求-->
        <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
    
    ...
    <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
                <manifest>
                    <mainClass></mainClass>
                </manifest>
            </archive>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    
    </plugin>
    HelloWorldSpout.java
    package com.ebc.spout;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.Map;
    
    /**
     * @author yaoyuan2
     * @date 2019/4/11
     */
    @Slf4j
    public class HelloWorldSpout extends BaseRichSpout {
        private SpoutOutputCollector collector;
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
        /**
         *  重复调用,包含spout整个逻辑
         * @return void
         */
        @Override
        public void nextTuple() {
            Utils.sleep(100);
            String sendStr = "Hello World";
            collector.emit(new Values(sendStr));
            //log.info(sendStr);
        }
        /**
         * 告诉storm集群,spout发送了那些字段
         * @param declarer
         * @return void
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
    }
    HelloWorldBolt.java
    package com.ebc.blot;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.Map;
    
    /**
     * 读取已产生的Tuple并实现必要的统计
     * @author yaoyuan2
     * @date 2019/4/11
     */
    @Slf4j
    public class HelloWorldBolt extends BaseRichBolt {
        private int myCount = 0;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        }
    
        @Override
        public void execute(Tuple input) {
            String test = input.getStringByField("sentence");
            if (test == "Hello World") {
                myCount++;
                log.info("发现了1个Hello World!现在的计数值="+Integer.toString(myCount));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("myCount"));
        }
    }
    HelloWorldTopology.java
    package com.ebc;
    
    import com.ebc.blot.HelloWorldBolt;
    import com.ebc.spout.HelloWorldSpout;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    /**
     * @author yaoyuan2
     * @date 2019/4/11
     */
    public class HelloWorldTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("helloWorld",new HelloWorldSpout(),2);
            builder.setBolt("HelloWorldBolt",new HelloWorldBolt(),4).shuffleGrouping("helloWorld");
            Config conf = new Config();
            conf.setDebug(true);
            if(args!=null && args.length > 0) {
                conf.setNumWorkers(20);
                StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test",conf,builder.createTopology());
                Utils.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();
            }
        }
    }

    log4j2.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration monitorInterval="1" status="ERROR" strict="true" name="LogConfig">
        <Properties>
            <Property name="log.layout">%date{HH:mm:ss.SSS} [%thread] %-5level %logger{20}:%line - %msg%n</Property>
        </Properties>
    
        <Appenders>
            <Appender type="Console" name="STDOUT">
                <Target>SYSTEM_OUT</Target>
                <Layout type="PatternLayout" pattern="${log.layout}"/>
            </Appender>
        </Appenders>
    
        <Loggers>
            <Root level="info">
                <AppenderRef ref="STDOUT"/>
            </Root>
            <Logger name="org.apache.storm" level="error" />
        </Loggers>
    
    </Configuration>

    输出:

    13:21:53.792 [Thread-22-HelloWorldBolt-executor[3 3]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=36
    13:21:53.855 [Thread-32-HelloWorldBolt-executor[4 4]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=41
    13:21:53.892 [Thread-24-HelloWorldBolt-executor[1 1]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=35
    13:21:53.955 [Thread-32-HelloWorldBolt-executor[4 4]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=42
    13:21:53.992 [Thread-32-HelloWorldBolt-executor[4 4]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=43
    13:21:54.055 [Thread-18-HelloWorldBolt-executor[2 2]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=45
    13:21:57.532 [Thread-14] ERROR org.apache.storm.daemon.supervisor.ReadClusterState:182 - Failed to Sync Supervisor
    java.lang.RuntimeException: java.lang.InterruptedException

    blot 有4个线程执行计算。

    将HelloWorldSpout.nextTuple方法中的//log.info(sendStr);放开,将HelloWorldBolt.execute方法中的log.info("发现了1个Hello World!现在的计数值="+Integer.toString(myCount));注释,输出:

    13:26:48.969 [Thread-20-helloWorld-executor[7 7]] INFO  com.ebc.spout.HelloWorldSpout:35 - Hello World
    13:26:49.056 [Thread-26-helloWorld-executor[6 6]] INFO  com.ebc.spout.HelloWorldSpout:35 - Hello World
    13:26:52.512 [Thread-14] ERROR org.apache.storm.daemon.supervisor.ReadClusterState:182 - Failed to Sync Supervisor
    java.lang.RuntimeException: java.lang.InterruptedException

    spout有2个线程同时向blot发送消息。

    抛出的异常是因为:

    Utils.sleep(10000);
    cluster.killTopology("test");
    cluster.shutdown();

    休眠10秒后,自动关闭storm服务了。

    
    
  • 相关阅读:
    看《长安十二时辰》可以了解哪些算法知识
    面试官,我会写二分查找法!对,没有 bug 的那种!
    毕业十年后,我忍不住出了一份程序员的高考试卷
    扫雷与算法:如何随机化的布雷(一)
    降维打击!为什么我认为数据结构与算法对前端开发很重要
    盖尔-沙普利算法告诉你,你的对象在哪里?
    这道算法题太太太太太简单啦
    有点难度,几道和「滑动窗口」有关的算法面试题
    几道和「黑洞照片」那种海量数据有关的算法问题
    LeetCode 上最难的链表算法题,没有之一!
  • 原文地址:https://www.cnblogs.com/yaoyuan2/p/10689079.html
Copyright © 2011-2022 走看看