zoukankan      html  css  js  c++  java
  • Storm的开发使用

    一、开发

    * 假定是用IDEA工具开发,这里实现的是上面(2)类型的2层Bolt实例,Spout -> Bolt1 -> Bolt2

    1.创建Maven项目

    项目名是StormProcessor,包名是com.clotho.storm。后面运行命令时会用到。

    2.配置Maven

    在pom.xml的<dependencies>和</dependencies>中间加入以下内容:

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>2.3.0</version>
        <scope>provided</scope>
        <!--<scope>${provided.scope}</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-client</artifactId>
        <version>2.3.0</version>
    </dependency>

    * 如果pom.xml有其它Hadoop组件的引用,运行时有可能会报错。例如:

    “Detected both log4j-over-slf4j.jar AND bound slf4j-log4j12.jar on the class path”

    遇到这种情况,有2个选择:

    a.独立一个项目开发Storm程序

    b.exclude掉引发异常的依赖项(在pom.xml编辑界面:右键 -> Maven -> Show Dependencies -> 点击Show Conflicts/Duplicates按钮查看)

    3.Spout类

    可以实现IRichSpout接口,也可以扩展BaseRichSpout类,BaseRichSpout类也是实现IRichSpout接口

    Spout一般是负责获取数据或生成数据,并发送给下一层Bolt处理。

    public class TextSpout extends BaseRichSpout
    {
        private SpoutOutputCollector collector;
        private static final String field = "text";
        private int count = 0;
        private String[] message = {
                "这是 第 1 条 信息 ~ 统计",
                "这是 第 2 条 信息 ~ 统计 测试",
                "这是 第 3 条 信息 ~ 测试"
        };
    
        /**
         * 在Spout初始化时被调用
         *
         * @param map
         * @param context
         * @param collector
         */
        @Override
        public void open(Map map, TopologyContext context, SpoutOutputCollector collector)
        {
            System.out.println("open:" + map.get("test"));
            this.collector = collector;
        }
    
        /**
         * Spout实现的核心方法,会被循环调用
         */
        @Override
        public void nextTuple()
        {
            if (count < message.length)
            {
                System.out.println("第" + count + "次开始发送数据..");
                collector.emit(new Values(message[count]));
            }
            count++;
        }
    
        /**
         * 声明要输出的Tuple的字段名称
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
            System.out.println("定义格式...");
            declarer.declare(new Fields(field));
        }
    }

    4.第1层的Bolt类

    可以实现IRichBolt接口,也可以扩展BaseRichBolt类,BaseRichBolt类也是实现IRichBolt接口

    这层Bolt是负责把前面传过来的文本进行空格分词,并传递给下一层Bolt

    public class SplitBolt extends BaseRichBolt
    {
        private OutputCollector collector;
        private static final String field1 = "text";
        private static final String field2 = "count";
    
        /**
         * 在Bolt启动前执行,提供Bolt启动环境配置的入口
         */
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector)
        {
            System.out.println("prepare:" + map.get("test"));
            this.collector = collector;
        }
    
        /**
         * Bolt实现的核心方法
         */
        @Override
        public void execute(Tuple tuple)
        {
            String message = tuple.getStringByField(field1);
            System.out.println("开始分割单词:" + message);
            String[] words = message.toLowerCase().split(" ");
            for (String word : words)
            {
                collector.emit(new Values(word)); //向下一个Bolt发送数据
            }
        }
    
        /**
         * 声明数据格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
            declarer.declare(new Fields(field2));
        }
    
        /**
         * Storm在终止一个Bolt之前会调用这个方法
         */
        @Override
        public void cleanup()
        {
            System.out.println("TestBolt的资源释放");
        }
    }

    5.第2层的Bolt类

    这层Bolt是负责对上一层传来的词进行统计

    public class StatBolt extends BaseRichBolt
    {
        private static final String field = "count";
        private int count = 1;
    
        /**
         * 保存单词和对应的计数
         */
        private HashMap<String, Integer> counts = null;
    
        /**
         * @param map
         * @param context
         * @param collector
         */
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector)
        {
            System.out.println("prepare:" + map.get("test"));
            counts = new HashMap<String, Integer>();
        }
    
        /**
         * @param tuple
         */
        @Override
        public void execute(Tuple tuple)
        {
            String message = tuple.getStringByField(field);
            System.out.println("第" + count + "次统计单词出现的次数");
            if (!counts.containsKey(message))
            {
                counts.put(message, 1);
            }
            else
            {
                counts.put(message, counts.get(message) + 1);
            }
            count++;
        }
    
        /**
         *
         */
        @Override
        public void cleanup()
        {
            System.out.println("===========开始显示单词数量============");
            for (Map.Entry<String, Integer> entry : counts.entrySet())
            {
                System.out.println(entry.getKey() + ": " + entry.getValue());
            }
            System.out.println("===========结束============");
            System.out.println("Test2Bolt的资源释放");
        }
    
        /**
         * 声明数据格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
        }
    }

    6.Topology类

    负责绑定Spout和Bolt1、Bolt2,并提交给Storm

    public class CountBolt extends BaseRichBolt
    {
        private static final String field = "count";
        private int count = 1;
    
        /**
         * 保存单词和对应的计数
         */
        private HashMap<String, Integer> counts = null;
    
        /**
         * @param map
         * @param context
         * @param collector
         */
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector)
        {
            System.out.println("prepare:" + map.get("test"));
            counts = new HashMap<String, Integer>();
        }
    
        /**
         * @param tuple
         */
        @Override
        public void execute(Tuple tuple)
        {
            String message = tuple.getStringByField(field);
            System.out.println("第" + count + "次统计单词出现的次数");
            if (!counts.containsKey(message))
            {
                counts.put(message, 1);
            }
            else
            {
                counts.put(message, counts.get(message) + 1);
            }
            count++;
        }
    
        /**
         *
         */
        @Override
        public void cleanup()
        {
            System.out.println("===========开始显示单词数量============");
            for (Map.Entry<String, Integer> entry : counts.entrySet())
            {
                System.out.println(entry.getKey() + ": " + entry.getValue());
            }
            System.out.println("===========结束============");
        }
    
        /**
         * 声明数据格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
        }
    }

    二、启动

    1.编译成jar文件并上传到服务器

    假定项目在E:\Code\StormProcess\目录

    cd e:\Code\StormProcess
    mvn package

    打包后,在E:\Code\StormProcess\target\目录下会有StormProcess-1.0.jar文件,把文件上传到服务器

    2.用命令提交到Storm执行

    (1) 有2种方式

    a.使用代码中的命名作为拓扑名

    storm jar StormProcess-1.0.jar com.clotho.storm.StatTopology

    b.运行时在命令中指定拓扑名

    storm jar StormProcess-1.0.jar com.clotho.storm.StatTopology WordCount

    (2) 验证

    storm list

    * 也可以在Storm UI查看

    (3) 删除拓扑(Topology)

    storm kill StormProcess

    * 也可以在Storm UI删除,进入具体拓扑(Topology),点击Topology actions的kill按钮

    附录:

    Storm的原理和机制

    https://www.cnblogs.com/live41/p/15560493.html

    Storm的安装与部署

    https://www.cnblogs.com/live41/p/15555719.html

  • 相关阅读:
    LeetCode 345. Reverse Vowels of a String 题解
    LeetCode 344. Reverse String 题解
    LeetCode 27. Remove Element 题解
    LeetCode 61. Rotate List 题解
    LeetCode 19.Remove Nth Node From End of List 题解
    Android耗电量
    Android 使用adb查看和修改电池信息
    Android AOP AspectJ 插桩
    Flask相关用法
    Monkey日志信息的11种Event percentage
  • 原文地址:https://www.cnblogs.com/live41/p/15563263.html
Copyright © 2011-2022 走看看