zoukankan      html  css  js  c++  java
  • 大数据入门第十六天——流式计算之storm详解(二)常用命令与wc实例

    一、常用命令

      1.提交命令

    提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
    torm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount

       2.杀死任务

    storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
    storm kill topology-name -w 10

      3.停用任务

    storm deactivte  【拓扑名称】
    storm deactivte topology-name
    #我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。
    销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。

      4.启用任务

    storm activate【拓扑名称】
    storm activate topology-name

      5.重新部署任务

    storm rebalance  【拓扑名称】
    storm rebalance topology-name
    再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。
    再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑。

      更多命令,参考:http://blog.csdn.net/u010003835/article/details/52123807

      完整命令,参考官网文档:http://storm.apache.org/releases/1.0.6/Command-line-client.html

    二、wordCount示例程序

     ###以下内容可以替换为Jstrom的依赖!包是backtype的topologyBuilder!

       1.引入依赖

    <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.0.6</version>
        <scope>provided</scope>
    </dependency>

       JStorm是阿里巴巴基于storm的二次开发,完全兼容storm!

    <!-- https://mvnrepository.com/artifact/com.alibaba.jstorm/jstorm-core -->
            <dependency>
                <groupId>com.alibaba.jstorm</groupId>
                <artifactId>jstorm-core</artifactId>
                <version>2.1.1</version>
                <!--<scope>provided</scope>-->
            </dependency>

      // 本地提交时请注释掉作用域(provided不参与打包)

      2。编写相关程序

    参考:http://blog.csdn.net/u010454030/article/details/52576346

        http://m635674608.iteye.com/blog/2221179

    package com.jiangbei;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    /**
     * wordcount类
     *
     * @author zcc ON 2018/3/6
     **/
    public class WordCount {
        public static void main(String[] args) throws Exception{
            // 创建TopologyBuilder用来创建topology
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("mySpout", new MySpout(), 1);
            builder.setBolt("split", new MyBolt1(), 10).shuffleGrouping("mySpout");
            builder.setBolt("count", new MyBolt2(), 2).fieldsGrouping("split", new Fields("word"));
            // 创建configuration
            Config config = new Config();
            config.setNumWorkers(2);
            // 本地模式很有用
            // config.setDebug(true);
    
            // 向集群提交
            // StormSubmitter.submitTopologyWithProgressBar("wordcount", config,builder.createTopology());
    
            // 本地模式
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("wordcount", config, builder.createTopology());
        }
    }
    WordCount
    package com.jiangbei;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     * 自定义spout
     *
     * @author zcc ON 2018/3/6
     **/
    public class MySpout extends BaseRichSpout{
        private SpoutOutputCollector collector;
        /**
         * 初始化方法
         * @param map
         * @param topologyContext
         * @param spoutOutputCollector
         */
        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.collector = spoutOutputCollector;
        }
    
        /**
         * storm框架的操作(类似于while true中的循环体)
         */
        @Override
        public void nextTuple() {
            // 这里的Values是arrayList的一个子类
            collector.emit(new Values("i love china"));
        }
    
        /**
         * 声明tuple发送流
         * @param outputFieldsDeclarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("love"));
        }
    }
    MySpout
    package com.jiangbei;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     * 自定义bolt
     *
     * @author zcc ON 2018/3/6
     **/
    public class MyBolt1 extends BaseRichBolt{
        private OutputCollector collector;
        /**
         * 初始化方法
         * @param map
         * @param topologyContext
         * @param outputCollector
         */
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }
    
        /**
         * 循环调用的循环体
         * @param tuple
         */
        @Override
        public void execute(Tuple tuple) {
            // 从上一步中的tuple取出value,由于知道是String,而values是list的子类,故通过角标即可
            String line = tuple.getString(0);
            String[] words = line.split(" ");
            for (String word : words) {
                collector.emit(new Values(word, 1));
            }
        }
    
        /**
         * 声明方法
         * @param outputFieldsDeclarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word","num"));
        }
    }
    MyBolt1
    package com.jiangbei;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 自定义bolt
     *
     * @author zcc ON 2018/3/6
     **/
    public class MyBolt2 extends BaseRichBolt{
        private OutputCollector collector;
        private Map<String, Integer> map = new HashMap<>();
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }
    
        @Override
        public void execute(Tuple tuple) {
            String word = tuple.getString(0);
            Integer num = tuple.getInteger(1);
            if (map.containsKey(word)) {
                Integer count = map.get(word);
                map.put(word, count + num);
            } else {
                map.put(word, 1);
            }
            System.out.println("count==========>" + map);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
    MyBolt2

      本地运行直接运行即可!

      以上的spout、bolt等都是采用的自定义的,主要通过继承BaseRichSpout等来实现!;这里解释一下 其中的一些概念与相关类:

        最后一个bolt必须按照field进行分组,这样才能进行计数!,这里的new Field()里的值必须是上游里面declare的值。才能对应上。

        SpoutOutputCollector——对象提供了发射tuple的方法

      整个过程原理图如下所示:

      

      3.Stream Grouping详解

    Storm里面有7种类型的stream grouping

    Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。(类似MR的hash算法)

    Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

    All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

    Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

    Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。

    Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。

    Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

       相关的图解,参考https://www.cnblogs.com/kqdongnanf/p/4634607.html

  • 相关阅读:
    跳出iframe
    leetcode 225. Implement Stack using Queues
    leetcode 206. Reverse Linked List
    leetcode 205. Isomorphic Strings
    leetcode 203. Remove Linked List Elements
    leetcode 198. House Robber
    leetcode 190. Reverse Bits
    leetcode leetcode 783. Minimum Distance Between BST Nodes
    leetcode 202. Happy Number
    leetcode 389. Find the Difference
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8513989.html
Copyright © 2011-2022 走看看