zoukankan      html  css  js  c++  java
  • Storm常用操作命令及WordCount

    Storm常用操作命令

    1、任务提交命令:storm jar jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】

    storm jar /export/servers/storm/examples/storm-starter/storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology  wordcount

    与hadoop不同的是:不需要指定输入输出路径

    hadoop jar /usr/local/wordcount.jar /data.txt /wcout

     

    2、杀死任务命令:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)

    storm kill topology-name -w 10

    3、停用任务命令:storm deactive  【拓扑名称】

    storm deactive topology-name

    我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spoutsnextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。

    4、启用任务命令:storm activate 【拓扑名称】

    storm activate topology-name

    5、重新部署任务命令:storm rebalance  【拓扑名称】

    storm rebalance topology-name

    再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配worker,并重启拓扑。

    StormWordCount(重点掌握

    WordCount分析:

    Java版本:

    1、读取文件中的数据,一行一行的读取;

    2、将读到的数据进行切割;

    3、对切割后的数组中的单词进行计算。

    Hadoop版本:

    1、按行读取文件中的数据;

    2、在Mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;

    3、接收Mapper()中输出的数据数组,在Reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。

    Storm版本:

    1、Spout从外部数据源中读取数据,随机发送一个元组对象出去;

    2、SplitBolt接收Spout中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;

    3、WordCountBolt接收SplitBolt中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。

    StormWordCount代码实现及分析(重点掌握

    在IDEA中创建一个Maven项目,先在pom.xml添加依赖--->import changes

    创建Maven项目步骤:

    使用IDEA编辑器创建一个Maven项目
    前提:假设您已经安装好了IDEA编辑器,由于编辑器自带Maven插件,不需要单独安装maven。当然IDEA本身是支持安装外部的maven的。
    1、打开编辑器
    笔者使用的是14.1 不是当前最新编辑器
    2、创建maven项目第一步:
    依次点击软件左上角的File->new->project
    然后选择maven,并点击next。在这一步有一个需要注意的地方,就是为你的项目选择JDK或者SDK.如果您之前没有配置过JDK,可以点击new按钮,设置您JDK的home目录。
    3、填写maven项目的groupid,和artifactid。然后点击下一步
    一般来讲,groupid写您的公司及部门或项目的名称,比如:com.ahu
    artifactid写您的子项目或者子模块的名字,比如当前项目是创建maven项目,我们可以将artifactid写成:stormwordcount
    version可以不用修改
    4、填写项目名称及指定项目所在的目录
    projectName:StormWordCount
    location:任意地址---->比如:E:StormWordCount
    至此,创建maven项目完毕。

    1     <dependencies>
    2         <dependency>
    3             <groupId>org.apache.storm</groupId>
    4             <artifactId>storm-core</artifactId>
    5             <version>0.9.5</version>
    6             <!-- <scope>provided</scope>-->
    7         </dependency>
    8     </dependencies>

    然后在写相关代码:

    项目主要流程:

     1 package com.ahu.storm;
     2 
     3 
     4 import backtype.storm.Config;
     5 import backtype.storm.LocalCluster;
     6 import backtype.storm.StormSubmitter;
     7 import backtype.storm.generated.AlreadyAliveException;
     8 import backtype.storm.generated.InvalidTopologyException;
     9 import backtype.storm.topology.TopologyBuilder;
    10 import backtype.storm.tuple.Fields;
    11 
    12 /**
    13  * Created by ahu_lichang on 2017/5/18.
    14  */
    15 public class WordCountTopologyMain {
    16     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    17         //1、准备一个TopologyBuilder
    18         //storm框架支持多语言,在Java环境下创建一个拓扑,需要使用TopologyBuilder
    19         TopologyBuilder topologyBuilder = new TopologyBuilder();
    20         //MySpout类,在已知的英文句子中,所及发送一条句子出去
    21         topologyBuilder.setSpout("mySpout", new MySpout(), 2);
    22         //MySplitBolt类,主要是将一行一行的文本内容切割成单词
    23         topologyBuilder.setBolt("mybolt1", new MySplitBolt(), 2).shuffleGrouping("mySpout");
    24         //MyCountBolt类,负责对单词的频率进行累加
    25         topologyBuilder.setBolt("mybolt2", new MyCountBolt(), 4).fieldsGrouping("mybolt1", new Fields("word"));
    26         /**
    27          * i
    28          * am
    29          * lilei
    30          * love
    31          * hanmeimei
    32          */
    33         //2、创建一个configuration,用来指定当前topology 需要的worker的数量
    34         //启动topology的配置信息
    35         Config config = new Config();
    36         //定义你希望集群分配多少个工作进程给你来执行这个topology
    37         config.setNumWorkers(2);
    38 
    39         //3、提交任务  -----两种模式 本地模式和集群模式
    40         //这里将拓扑名称写死了mywordcount,所以在集群上打包运行的时候,不用写拓扑名称了!也可用arg[0]
    41         StormSubmitter.submitTopology("mywordcount", config, topologyBuilder.createTopology());
    42         //LocalCluster localCluster = new LocalCluster();
    43         //localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());
    44     }
    45 }

    MySpout的实现及生命周期:

     1 package com.ahu.storm;
     2 
     3 import backtype.storm.spout.SpoutOutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.OutputFieldsDeclarer;
     6 import backtype.storm.topology.base.BaseRichSpout;
     7 import backtype.storm.tuple.Fields;
     8 import backtype.storm.tuple.Values;
     9 
    10 import java.util.Map;
    11 
    12 /**
    13  * Created by ahu_lichang on 2017/5/18.
    14  */
    15 public class MySpout extends BaseRichSpout {
    16     //用来收集Spout输出的Tuple
    17     SpoutOutputCollector collector;
    18 
    19     //初始化方法
    20     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    21         this.collector = collector;
    22     }
    23 
    24     //storm 框架在 while(true) 调用nextTuple方法
    25     public void nextTuple() {
    26         collector.emit(new Values("i am lilei love hanmeimei"));
    27     }
    28 
    29     //消息源可以发射多条消息流stream.多条消息流可以理解为多种类型的数据
    30     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    31         declarer.declare(new Fields("sentence"));
    32     }
    33 }

    MySplitBolt的实现及生命周期:

     1 package com.ahu.storm;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.OutputFieldsDeclarer;
     6 import backtype.storm.topology.base.BaseRichBolt;
     7 import backtype.storm.tuple.Fields;
     8 import backtype.storm.tuple.Tuple;
     9 import backtype.storm.tuple.Values;
    10 
    11 import java.util.Map;
    12 
    13 /**
    14  * Created by ahu_lichang on 2017/5/18.
    15  */
    16 public class MySplitBolt extends BaseRichBolt {
    17     OutputCollector collector;
    18 
    19     //初始化方法
    20     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    21         this.collector = collector;
    22     }
    23 
    24     // 被storm框架 while(true) 循环调用  传入参数tuple
    25     //input内容是句子,execute方法将句子切割成单词发出
    26     public void execute(Tuple input) {
    27         String line = input.getString(0);
    28         String[] arrWords = line.split(" ");
    29         for (String word : arrWords) {
    30             collector.emit(new Values(word, 1));
    31         }
    32     }
    33 
    34     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    35         declarer.declare(new Fields("word", "num"));
    36     }
    37 }

    MyCountBolt的实现及生命周期:

     1 package com.ahu.storm;
     2 
     3 
     4 import backtype.storm.task.OutputCollector;
     5 import backtype.storm.task.TopologyContext;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.topology.base.BaseRichBolt;
     8 import backtype.storm.tuple.Tuple;
     9 
    10 import java.util.HashMap;
    11 import java.util.Map;
    12 
    13 /**
    14  * Created by ahu_lichang on 2017/5/18.
    15  */
    16 public class MyCountBolt extends BaseRichBolt {
    17     OutputCollector collector;
    18     //用来保存最后计算的结果key=单词,value=单词个数
    19     Map<String, Integer> map = new HashMap<String, Integer>();
    20 
    21     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    22         this.collector = collector;
    23     }
    24 
    25     public void execute(Tuple input) {
    26         String word = input.getString(0);
    27         Integer num = input.getInteger(1);
    28         System.out.println(Thread.currentThread().getId() + "    word:" + word);
    29         if (map.containsKey(word)) {
    30             Integer count = map.get(word);
    31             map.put(word, count + num);
    32         } else {
    33             map.put(word, num);
    34         }
    35         System.out.println("count:" + map);
    36     }
    37 
    38     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    39         //不输出
    40     }
    41 }

    两种运行模式

    1、本地模式:直接在IDEA中的WordCountTopologyMain运行即可在控制台观察到输出结果

    2、集群模式:

    要打包运行。打包方法:

    将jar包上传到storm1上,去运行storm /root/stormwordcount.XXXX.jar  com.ahu.storm.WordCountTopologyMain 

    注意:这样打包运行的时候,会出错:NoClassDefFoundError: backtype/storm/topology/IRichSpout

    这是因为打包的时候,有的jar包没有打到里面去,打包方式不对!需要在pom.xml指定一个Build,指定打包的方式,将所有的依赖都打成jar。

     1     <build>
     2         <plugins>
     3             <plugin>
     4                 <artifactId>maven-assembly-plugin</artifactId>
     5                 <configuration>
     6                     <descriptorRefs>
     7                         <descriptorRef>jar-with-dependencies</descriptorRef>
     8                     </descriptorRefs>
     9                    <!-- <archive>
    10                         <manifest>
    11                             <mainClass>com.ahu.storm.hadoop.mapreduce.wordcount.WordCount</mainClass>
    12                         </manifest>
    13                     </archive>-->
    14                 </configuration>
    15                 <executions>
    16                     <execution>
    17                         <id>make-assembly</id>
    18                         <phase>package</phase>
    19                         <goals>
    20                             <goal>single</goal>
    21                         </goals>
    22                     </execution>
    23                 </executions>
    24             </plugin>
    25         </plugins>
    26     </build>

    这样再打包运行,就不会出错了!运行成功后,可以在worker运行的机器上查看日志:/export/servers/storm/logs/下查看,tail -100f worker-6701.log.1

    Storm具体的任务执行流程图

    Stream Grouping详解

    Storm里面有7种类型的stream grouping

    l Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

    Fields Grouping按字段分组,比如按userid来分组,具有同样useridtuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task

    l All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

    l Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task

    l Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。

    l Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid OutputCollector.emit方法也会返回taskid)。

    Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

  • 相关阅读:
    Power BI for Office 365(八)共享查询
    Power BI for Office 365(七) Power BI站点
    Power BI for Office 365(六)Power Map简介
    Power BI for Office 365(五)Power View第二部分
    Power BI for Office 365(四)Power View第一部分
    Power BI for Office 365(三)Power Pivot
    Power BI for Office 365(二)Power Query
    java 继承、重载、重写与多态
    Android 热修复方案Tinker(一) Application改造
    阿里最新热修复Sophix与QQ超级补丁和Tinker的实现与总结
  • 原文地址:https://www.cnblogs.com/ahu-lichang/p/6871920.html
Copyright © 2011-2022 走看看