zoukankan      html  css  js  c++  java
  • Storm的开发使用

    一、开发

    * 假定是用IDEA工具开发,这里实现的是上面(2)类型的2层Bolt实例,Spout -> Bolt1 -> Bolt2

    1.创建Maven项目

    项目名是StormProcessor,包名是com.clotho.storm。后面运行命令时会用到。

    2.配置Maven

    在pom.xml的<dependencies>和</dependencies>中间加入以下内容:

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>2.3.0</version>
        <scope>provided</scope>
        <!--<scope>${provided.scope}</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-client</artifactId>
        <version>2.3.0</version>
    </dependency>

    * 如果pom.xml有其它Hadoop组件的引用,运行时有可能会报错。例如:

    “Detected both log4j-over-slf4j.jar AND bound slf4j-log4j12.jar on the class path”

    遇到这种情况,有2个选择:

    a.独立一个项目开发Storm程序

    b.exclude掉引发异常的依赖项(在pom.xml编辑界面:右键 -> Maven -> Show Dependencies -> 点击Show Conflicts/Duplicates按钮查看)

    3.Spout类

    可以实现IRichSpout接口,也可以扩展BaseRichSpout类,BaseRichSpout类也是实现IRichSpout接口

    Spout一般是负责获取数据或生成数据,并发送给下一层Bolt处理。

    public class TextSpout extends BaseRichSpout
    {
        private SpoutOutputCollector collector;
        private static final String field = "text";
        private int count = 0;
        private String[] message = {
                "这是 第 1 条 信息 ~ 统计",
                "这是 第 2 条 信息 ~ 统计 测试",
                "这是 第 3 条 信息 ~ 测试"
        };
    
        /**
         * 在Spout初始化时被调用
         *
         * @param map
         * @param context
         * @param collector
         */
        @Override
        public void open(Map map, TopologyContext context, SpoutOutputCollector collector)
        {
            System.out.println("open:" + map.get("test"));
            this.collector = collector;
        }
    
        /**
         * Spout实现的核心方法,会被循环调用
         */
        @Override
        public void nextTuple()
        {
            if (count < message.length)
            {
                System.out.println("第" + count + "次开始发送数据..");
                collector.emit(new Values(message[count]));
            }
            count++;
        }
    
        /**
         * 声明要输出的Tuple的字段名称
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
            System.out.println("定义格式...");
            declarer.declare(new Fields(field));
        }
    }

    4.第1层的Bolt类

    可以实现IRichBolt接口,也可以扩展BaseRichBolt类,BaseRichBolt类也是实现IRichBolt接口

    这层Bolt是负责把前面传过来的文本进行空格分词,并传递给下一层Bolt

    public class SplitBolt extends BaseRichBolt
    {
        private OutputCollector collector;
        private static final String field1 = "text";
        private static final String field2 = "count";
    
        /**
         * 在Bolt启动前执行,提供Bolt启动环境配置的入口
         */
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector)
        {
            System.out.println("prepare:" + map.get("test"));
            this.collector = collector;
        }
    
        /**
         * Bolt实现的核心方法
         */
        @Override
        public void execute(Tuple tuple)
        {
            String message = tuple.getStringByField(field1);
            System.out.println("开始分割单词:" + message);
            String[] words = message.toLowerCase().split(" ");
            for (String word : words)
            {
                collector.emit(new Values(word)); //向下一个Bolt发送数据
            }
        }
    
        /**
         * 声明数据格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
            declarer.declare(new Fields(field2));
        }
    
        /**
         * Storm在终止一个Bolt之前会调用这个方法
         */
        @Override
        public void cleanup()
        {
            System.out.println("TestBolt的资源释放");
        }
    }

    5.第2层的Bolt类

    这层Bolt是负责对上一层传来的词进行统计

    public class StatBolt extends BaseRichBolt
    {
        private static final String field = "count";
        private int count = 1;
    
        /**
         * 保存单词和对应的计数
         */
        private HashMap<String, Integer> counts = null;
    
        /**
         * @param map
         * @param context
         * @param collector
         */
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector)
        {
            System.out.println("prepare:" + map.get("test"));
            counts = new HashMap<String, Integer>();
        }
    
        /**
         * @param tuple
         */
        @Override
        public void execute(Tuple tuple)
        {
            String message = tuple.getStringByField(field);
            System.out.println("第" + count + "次统计单词出现的次数");
            if (!counts.containsKey(message))
            {
                counts.put(message, 1);
            }
            else
            {
                counts.put(message, counts.get(message) + 1);
            }
            count++;
        }
    
        /**
         *
         */
        @Override
        public void cleanup()
        {
            System.out.println("===========开始显示单词数量============");
            for (Map.Entry<String, Integer> entry : counts.entrySet())
            {
                System.out.println(entry.getKey() + ": " + entry.getValue());
            }
            System.out.println("===========结束============");
            System.out.println("Test2Bolt的资源释放");
        }
    
        /**
         * 声明数据格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
        }
    }

    6.Topology类

    负责绑定Spout和Bolt1、Bolt2,并提交给Storm

    public class CountBolt extends BaseRichBolt
    {
        private static final String field = "count";
        private int count = 1;
    
        /**
         * 保存单词和对应的计数
         */
        private HashMap<String, Integer> counts = null;
    
        /**
         * @param map
         * @param context
         * @param collector
         */
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector)
        {
            System.out.println("prepare:" + map.get("test"));
            counts = new HashMap<String, Integer>();
        }
    
        /**
         * @param tuple
         */
        @Override
        public void execute(Tuple tuple)
        {
            String message = tuple.getStringByField(field);
            System.out.println("第" + count + "次统计单词出现的次数");
            if (!counts.containsKey(message))
            {
                counts.put(message, 1);
            }
            else
            {
                counts.put(message, counts.get(message) + 1);
            }
            count++;
        }
    
        /**
         *
         */
        @Override
        public void cleanup()
        {
            System.out.println("===========开始显示单词数量============");
            for (Map.Entry<String, Integer> entry : counts.entrySet())
            {
                System.out.println(entry.getKey() + ": " + entry.getValue());
            }
            System.out.println("===========结束============");
        }
    
        /**
         * 声明数据格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
        }
    }

    二、启动

    1.编译成jar文件并上传到服务器

    假定项目在E:\Code\StormProcess\目录

    cd e:\Code\StormProcess
    mvn package

    打包后,在E:\Code\StormProcess\target\目录下会有StormProcess-1.0.jar文件,把文件上传到服务器

    2.用命令提交到Storm执行

    (1) 有2种方式

    a.使用代码中的命名作为拓扑名

    storm jar StormProcess-1.0.jar com.clotho.storm.StatTopology

    b.运行时在命令中指定拓扑名

    storm jar StormProcess-1.0.jar com.clotho.storm.StatTopology WordCount

    (2) 验证

    storm list

    * 也可以在Storm UI查看

    (3) 删除拓扑(Topology)

    storm kill StormProcess

    * 也可以在Storm UI删除,进入具体拓扑(Topology),点击Topology actions的kill按钮

    附录:

    Storm的原理和机制

    https://www.cnblogs.com/live41/p/15560493.html

    Storm的安装与部署

    https://www.cnblogs.com/live41/p/15555719.html

  • 相关阅读:
    android 4.0 中出错 java.lang.UnsupportedOperationException
    怎么确定你的CPU是否支持64位虚拟化
    宽度百分比单位的转换公式
    Test SRM Level Three: LargestCircle, Brute Force
    802.11(wifi)的MAC层功能
    zookeeper集群的python代码测试
    mysqldump 命令的使用
    xp硬盘安装Fedora14 过程记录及心得体会(fedora14 live版本680M 和fedora14 DVD版本3.2G的选择)
    ContentProvider的使用
    基于 Java 2 运行时安全模型的线程协作--转
  • 原文地址:https://www.cnblogs.com/live41/p/15563263.html
Copyright © 2011-2022 走看看