zoukankan      html  css  js  c++  java
  • 3、SpringBoot 集成Storm wordcount

    WordCountBolt

    public class WordCountBolt extends BaseBasicBolt {
    
        private Map<String,Integer> counters = new ConcurrentHashMap<String, Integer>();
    
    
        /**
         * 该方法只会被调用一次,用来初始化
         * @param stormConf
         * @param context
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            super.prepare(stormConf, context);
        }
    
    
        @Override
        public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
            /**
             * 将collector中的元素存放在成员变量counters(Map)中
             * 如果counters中已经存在该元素,getValue并对value进行累加操作
             * @param input
             * @param collector
             */
            String str = (String)input.getValueByField("word");
            Integer num = input.getIntegerByField("num");
    //        System.out.println("----------------------"+Thread.currentThread().getId() + "    "+ str);
            System.out.println("----------------------"+Thread.currentThread().getName() + "    "+ str);
    
            if(!counters.containsKey(str)){
                counters.put(str,num);
            }else{
                Integer c = counters.get(str) + num;
                counters.put(str,c);
            }
            System.out.println("WordCountBolt 统计单词:"+counters);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
    
    

    SplitSentenceBolt

    public class SplitSentenceBolt extends BaseBasicBolt {
    
        /**
         * 该方法只会被调用一次,用来初始化
         * @param stormConf
         * @param context
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            super.prepare(stormConf, context);
        }
    
    
        /**
         * 接收的参数是RandomSentenceSpout发出的句子,即input的内容是句子
         * execute 方法将句子切割形成的单词发出
         * @param input
         * @param collector
         */
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
    
            String sentence = (String) input.getValueByField("sentence");
    
            String[] words = sentence.split(" ");
    
            for(String word:words){
                word = word.trim();
                if(!word.equals("") || word!=null){
                    word = word.toLowerCase();
                    System.out.println("SplitSentenceBolt 切割单词:"+ word);
    
                    collector.emit(new Values(word,1));
                }
            }
        }
    
    
        /**
         * 消息源可以发射多条消息流stream,多条消息可以理解为多种类型的数据
         * @param outputFieldsDeclarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word","num"));
        }
    }
    
    

    RandomSentenceSpout

    //数据源,在已知的英文句子中,随机发送一条句子出去。
    public class RandomSentenceSpout extends BaseRichSpout {
    
        //用来收集Spout输出的tuple
        private SpoutOutputCollector collector;
        private Random random;
        String[] sentences = null;
    
    
        //该方法调用一次,主要由storm框架传入SpoutOutputCollector
        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.collector = spoutOutputCollector;
            random = new Random();
            //连接kafka mysql ,打开本地文件
            sentences = new String[]{
                    "the cow jumped over the moon",
                    "the dog jumped over the moon",
                    "the pig jumped over the gun",
                    "the fish jumped over the moon",
                    "the duck jumped over the moon",
                    "the man jumped over the sun",
                    "the girl jumped over the sun",
                    "the boy jumped over the sun"
                    };
    
        }
    
        /**
         * 上帝之手
         * while(true)
         *      spout.nextTuple()
         */
        @Override
        public void nextTuple() {
    
            String sentence = sentences[random.nextInt(sentences.length)];
    
            collector.emit(new Values(sentence));
    
            System.out.println("RandomSentenceSpout 发送数据:"+sentence);
        }
    
    
        //消息源可以发射多条消息流stream
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("sentence"));
        }
    }
    
    

    WordCoutTopology

    @Component
    public class WordCountTopology {
        public static void main(String[] args) {
            //Storm框架支持多语言,在Java环境下创建一个拓扑,需要使用TopologyBuilder进行构建
            TopologyBuilder builder = new TopologyBuilder();
            
            //RandomSentenceSpout类,在已知的英文句子中,随机发送一条句子出去
            builder.setSpout("RandomSentenceSpout",new RandomSentenceSpout(),3);
    
            //SplitSentenceBolt类,主要是将一行一行的文本内容切割成单词
            builder.setBolt("SplitSentenceBolt",new SplitSentenceBolt(),9).shuffleGrouping("RandomSentenceSpout");
    
    
            //WordCountBolt类,对单词出现的次数进行统计
            builder.setBolt("WordCountBolt",new WordCountBolt(),3).fieldsGrouping("SplitSentenceBolt",new Fields("word"));
    
    
            //启动topology的配置信息
            Config conf = new Config();
            //TOPOLOGY_DEBUG(setDebug),当他被设置成true的话,storm会记录下每个组件所发射的每条消息
            //这在本地环境调试topology很有用。但是在线上这么做的话,会影响性能
            conf.setDebug(false);
            
            //storm的运行模式有两种:本地模式和分布式模式
            if(args != null || args.length>0){
                conf.setNumWorkers(3);
                //向集群提交topology
                try {
                    StormSubmitter.submitTopologyWithProgressBar(args[0],conf,builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            }
    //        else{
    
    
    //            conf.setMaxTaskParallelism(3);
    //
    //            LocalCluster cluster = new LocalCluster();
    //            cluster.submitTopology("word-count",conf,builder.createTopology());
    //        }
        }
    
    
    }
    
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.1.4.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
    	<groupId>com.test</groupId>
    	<artifactId>stormstack</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<name>stormstack</name>
    	<description>Springboot integrated with storm</description>
    
    	<properties>
    		<java.version>1.8</java.version>
    		<slf4j.version>1.7.25</slf4j.version>
    		<logback.version>1.2.3</logback.version>
    		<storm.version>1.2.2</storm.version>
    	</properties>
    	<dependencyManagement>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-dependencies</artifactId>
    				<version>2.1.4.RELEASE</version>
    				<type>pom</type>
    				<scope>import</scope>
    			</dependency>
    		</dependencies>
    	</dependencyManagement>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter</artifactId>
    			<exclusions>
    				<exclusion>
    					<groupId>org.springframework.boot</groupId>
    					<artifactId>spring-boot-starter-logging</artifactId>
    				</exclusion>
    			</exclusions>
    			<!--<exclusions>-->
    				<!--<exclusion>-->
    					<!--<groupId>org.apache.logging.log4j</groupId>-->
    					<!--<artifactId>log4j-to-slf4j2</artifactId>-->
    				<!--</exclusion>-->
    				<!--<exclusion>-->
    					<!--<groupId>ch.qos.logback</groupId>-->
    					<!--<artifactId>logback-classic2</artifactId>-->
    				<!--</exclusion>-->
    			<!--</exclusions>-->
    
    		</dependency>
    
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>${slf4j.version}</version>
    		</dependency>
    
    		<!--storm相关jar  -->
    		<dependency>
    			<groupId>org.apache.storm</groupId>
    			<artifactId>storm-core</artifactId>
    			<version>${storm.version}</version>
    			<!--排除相关依赖  -->
    			<exclusions>
    				<exclusion>
    					<groupId>org.apache.logging.log4j</groupId>
    					<artifactId>log4j-slf4j-impl</artifactId>
    				</exclusion>
    				<exclusion>
    					<groupId>org.apache.logging.log4j</groupId>
    					<artifactId>log4j-1.2-api</artifactId>
    				</exclusion>
    				<exclusion>
    					<groupId>org.apache.logging.log4j</groupId>
    					<artifactId>log4j-web</artifactId>
    				</exclusion>
    				<exclusion>
    					<groupId>org.slf4j</groupId>
    					<artifactId>slf4j-log4j12</artifactId>
    				</exclusion>
    				<exclusion>
    					<artifactId>ring-cors</artifactId>
    					<groupId>ring-cors</groupId>
    				</exclusion>
    			</exclusions>
    			<scope>provided</scope>
    		</dependency>
    
    		<!-- https://mvnrepository.com/artifact/com.codahale.metrics/metrics-core -->
    		<dependency>
    			<groupId>com.codahale.metrics</groupId>
    			<artifactId>metrics-core</artifactId>
    			<version>3.0.2</version>
    		</dependency>
    
    
    		<!--<dependency>-->
    			<!--<groupId>org.springframework.boot</groupId>-->
    			<!--<artifactId>spring-boot-devtools</artifactId>-->
    			<!--<scope>runtime</scope>-->
    		<!--</dependency>-->
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<!--<plugin>-->
    				<!--<groupId>org.springframework.boot</groupId>-->
    				<!--<artifactId>spring-boot-maven-plugin</artifactId>-->
    				<!--&lt;!&ndash;<configuration>&ndash;&gt;-->
    					<!--&lt;!&ndash;<mainClass>com.test.stormstack.StormstackApplication</mainClass>&ndash;&gt;-->
    				<!--&lt;!&ndash;</configuration>&ndash;&gt;-->
    				<!--<executions>-->
    					<!--<execution>-->
    						<!--<goals>-->
    							<!--<goal>repackage</goal>-->
    						<!--</goals>-->
    					<!--</execution>-->
    				<!--</executions>-->
    			<!--</plugin>-->
    
    
    			<!--<plugin>-->
    				<!--<groupId>org.apache.maven.plugins</groupId>-->
    				<!--<artifactId>maven-dependency-plugin</artifactId>-->
    				<!--<version>3.1.1</version>-->
    				<!--<executions>-->
    					<!--<execution>-->
    						<!--<phase>package</phase>-->
    						<!--<goals>-->
    							<!--<goal>copy-dependencies</goal>-->
    						<!--</goals>-->
    						<!--<configuration>-->
    							<!--<outputDirectory>${project.build.directory}/lib</outputDirectory>-->
    							<!--<overWriteReleases>false</overWriteReleases>-->
    							<!--<overWriteSnapshots>false</overWriteSnapshots>-->
    							<!--<overWriteIfNewer>true</overWriteIfNewer>-->
    						<!--</configuration>-->
    					<!--</execution>-->
    				<!--</executions>-->
    			<!--</plugin>-->
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-jar-plugin</artifactId>
    				<version>3.1.1</version>
    				<configuration>
    					<excludes>
    						<!--<exclude>*.keep</exclude>-->
    						<!--<exclude>*.properties</exclude>-->
    						<!--<exclude>*.yml</exclude>-->
    						<!--<exclude>*.yaml</exclude>-->
    						<!--<exclude>*.xml</exclude>-->
    						<!--<exclude>config</exclude>-->
    						<!--<exclude>config/*</exclude>-->
    						<!--<exclude>config/*/*</exclude>-->
    						<!--<exclude>config/*/*/*</exclude>-->
    					</excludes>
    					<archive>
    						<manifest>
    							<useUniqueVersions>false</useUniqueVersions>
    							<addClasspath>true</addClasspath>
    							<!--在外部补充配置main方法的入口-->
    							<mainClass>com.test.stormstack.storm.topology.WordCountTopology</mainClass>
    						</manifest>
    						<!--<manifestEntries>-->
    						<!--<Class-Path>./config/</Class-Path>-->
    						<!--</manifestEntries>-->
    					</archive>
    				</configuration>
    			</plugin>
    
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-shade-plugin</artifactId>
    				<version>3.2.1</version>
    				<dependencies>
    					<dependency>
    						<groupId>org.springframework.boot</groupId>
    						<artifactId>spring-boot-maven-plugin</artifactId>
    						<version>2.1.4.RELEASE</version>
    					</dependency>
    				</dependencies>
    				<configuration>
    					<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
    					<createDependencyReducedPom>true</createDependencyReducedPom>
    					<filters>
    						<filter>
    							<artifact>*:*</artifact>
    							<excludes>
    								<exclude>META-INF/*.SF</exclude>
    								<exclude>META-INF/*.DSA</exclude>
    								<exclude>META-INF/*.RSA</exclude>
    							</excludes>
    						</filter>
    					</filters>
    					<artifactSet>
    						<excludes>
    							<exclude>com.xxx.storm:xxx-storm</exclude>
    							<exclude>org.slf4j:slf4j-api</exclude>
    							<exclude>javax.mail:javax.mail-api</exclude>
    							<exclude>org.apache.storm:storm-core</exclude>
    							<exclude>org.apache.storm:storm-kafka</exclude>
    							<exclude>org.apache.logging.log4j:log4j-slf4j-impl</exclude>
    						</excludes>
    					</artifactSet>
    				</configuration>
    				<executions>
    					<execution>
    						<phase>package</phase>
    						<goals>
    							<goal>shade</goal>
    						</goals>
    						<configuration>
    							<transformers>
    								<transformer
    										implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    									<resource>META-INF/spring.handlers</resource>
    								</transformer>
    								<transformer
    										implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
    									<resource>META-INF/spring.factories</resource>
    								</transformer>
    								<transformer
    										implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    									<resource>META-INF/spring.schemas</resource>
    								</transformer>
    								<transformer
    										implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
    								<transformer
    										implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    									<mainClass>com.test.stormstack.storm.topology.WordCountTopology</mainClass>
    								</transformer>
    							</transformers>
    						</configuration>
    					</execution>
    				</executions>
    			</plugin>
    			<!-- ****************************************************************** -->
    
    			<!--<plugin>-->
    				<!--<artifactId>maven-assembly-plugin</artifactId>-->
    				<!--<version>3.1.1</version>-->
    				<!--<configuration>-->
    					<!--<descriptorRefs>-->
    						<!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
    					<!--</descriptorRefs>-->
    					<!--<archive>-->
    						<!--<manifest>-->
    							<!--<mainClass>com.test.stormstack.StormstackApplication</mainClass>-->
    						<!--</manifest>-->
    					<!--</archive>-->
    				<!--</configuration>-->
    				<!--<executions>-->
    					<!--<execution>-->
    						<!--<id>make-assembly</id>-->
    						<!--<phase>package</phase>-->
    						<!--<goals>-->
    							<!--<goal>single</goal>-->
    						<!--</goals>-->
    					<!--</execution>-->
    				<!--</executions>-->
    			<!--</plugin>-->
    		</plugins>
    	</build>
    
    </project>
    
    

    启动Topology

    [root@ip101 app]# storm jar stormstack-0.0.1-SNAPSHOT.jar com.test.stormstack.StormstackApplication wordcount
    Running: /opt/app/jdk1.8.0_181/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/app/apache-storm-1.2.2 -Dstorm.log.dir=/opt/app/apache-storm-1.2.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/app/apache-storm-1.2.2/*:/opt/app/apache-storm-1.2.2/lib/*:/opt/app/apache-storm-1.2.2/extlib/*:stormstack-0.0.1-SNAPSHOT.jar:/opt/app/apache-storm-1.2.2/conf:/opt/app/apache-storm-1.2.2/bin -Dstorm.jar=stormstack-0.0.1-SNAPSHOT.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} com.test.stormstack.StormstackApplication worldcount
    
      .   ____          _            __ _ _
     /\ / ___'_ __ _ _(_)_ __  __ _    
    ( ( )\___ | '_ | '_| | '_ / _` |    
     \/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::       (v0.0.1-SNAPSHOT)
    
    1312 [main] INFO  c.t.s.StormstackApplication - Starting StormstackApplication v0.0.1-SNAPSHOT on ip101 with PID 71384 (/opt/app/stormstack-0.0.1-SNAPSHOT.jar started by root in /opt/app)
    1325 [main] INFO  c.t.s.StormstackApplication - No active profile set, falling back to default profiles: default
    2208 [main] INFO  c.t.s.StormstackApplication - Started StormstackApplication in 1.366 seconds (JVM running for 2.252)
    2454 [main] WARN  o.a.s.u.Utils - STORM-VERSION new 1.2.2 old null
    2510 [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -5776098762847491014:-8580003487138287892
    2642 [main] WARN  o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
    2703 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : ip101:6627
    2746 [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
    2747 [main] WARN  o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
    2754 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : ip101:6627
    2812 [main] INFO  o.a.s.StormSubmitter - Uploading dependencies - jars...
    2813 [main] INFO  o.a.s.StormSubmitter - Uploading dependencies - artifacts...
    2813 [main] INFO  o.a.s.StormSubmitter - Dependency Blob keys - jars : [] / artifacts : []
    2859 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar stormstack-0.0.1-SNAPSHOT.jar to assigned location: /opt/app/apache-storm-1.2.2/status/nimbus/inbox/stormjar-de0f08e3-da44-45b9-9d06-63b4a4763eea.jar
    Start uploading file 'stormstack-0.0.1-SNAPSHOT.jar' to '/opt/app/apache-storm-1.2.2/status/nimbus/inbox/stormjar-de0f08e3-da44-45b9-9d06-63b4a4763eea.jar' (6365343 bytes)
    [==================================================] 6365343 / 6365343
    File 'stormstack-0.0.1-SNAPSHOT.jar' uploaded to '/opt/app/apache-storm-1.2.2/status/nimbus/inbox/stormjar-de0f08e3-da44-45b9-9d06-63b4a4763eea.jar' (6365343 bytes)
    3032 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/app/apache-storm-1.2.2/status/nimbus/inbox/stormjar-de0f08e3-da44-45b9-9d06-63b4a4763eea.jar
    3032 [main] INFO  o.a.s.StormSubmitter - Submitting topology worldcount in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-5776098762847491014:-8580003487138287892","topology.workers":3,"topology.debug":false}
    3032 [main] WARN  o.a.s.u.Utils - STORM-VERSION new 1.2.2 old 1.2.2
    3232 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: wordcount
    

    StormUI Summary


    wordcount Topology
    Acker任务默认是每个worker进程启动一个executor线程来执行,可以在topology中取消acker任务,这样的话就不会多出来一个executor和任务了




    关闭Topology

    [root@ip101 app]# storm kill wordcount
    Running: /opt/app/jdk1.8.0_181/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/app/apache-storm-1.2.2 -Dstorm.log.dir=/opt/app/apache-storm-1.2.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/app/apache-storm-1.2.2/*:/opt/app/apache-storm-1.2.2/lib/*:/opt/app/apache-storm-1.2.2/extlib/*:/opt/app/apache-storm-1.2.2/extlib-daemon/*:/opt/app/apache-storm-1.2.2/conf:/opt/app/apache-storm-1.2.2/bin org.apache.storm.command.kill_topology worldcount
    5052 [main] WARN  o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
    5155 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : ip101:6627
    5233 [main] INFO  o.a.s.c.kill-topology - Killed topology: wordcount
    

    Storm list

    [root@ip101 app]# storm list
    Running: /opt/app/jdk1.8.0_181/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/app/apache-storm-1.2.2 -Dstorm.log.dir=/opt/app/apache-storm-1.2.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/app/apache-storm-1.2.2/*:/opt/app/apache-storm-1.2.2/lib/*:/opt/app/apache-storm-1.2.2/extlib/*:/opt/app/apache-storm-1.2.2/extlib-daemon/*:/opt/app/apache-storm-1.2.2/conf:/opt/app/apache-storm-1.2.2/bin org.apache.storm.command.list
    10972 [main] WARN  o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
    11151 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : ip101:6627
    Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
    -------------------------------------------------------------------
    worldcount           ACTIVE     18         3            137   
    
  • 相关阅读:
    看书笔记《python基础》__1
    MQTT
    杂记
    类型转化
    soc
    时钟同步
    设置地址
    清理日志
    pandas_matplot_seaborn
    Qt_Quick开发实战精解_4
  • 原文地址:https://www.cnblogs.com/xidianzxm/p/10764546.html
Copyright © 2011-2022 走看看