zoukankan      html  css  js  c++  java
  • Storm初探

    Storm是一种分布式流式处理技术,这一点和Spark与Hadoop的批处理特性有明显的区别。

    在数据连续产生,响应时延要求较低的场景中,Storm具有Spark不可比拟的优势。

    网络性能监控系统中,Storm可以在秒级Dashboard监控,分钟级告警监控中大显生手。

    学习任何技术,首先从Hello Wold开始,Storm也不例外,下面代码实现了这样一个例子:

    (1) NamesReader Spout读取一行名字字符串,发送给NameSpliter;

    (2) NameSpliter Bolt按照空格分割名字字符串,每个名字发送给HelloWorld;

    (3) HelloWorld Bolt打印Hello world + 名字。

    注:Spout是Storm有向网络的起始节点,Bolt是Storm有向网络的其他节点。数据在Storm有向网络中流动,节点则可以对流经的数据进行处理。

    1、 名字字符串读取Spout

    package com.coshaho.learn.storm;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    
    /**
     * 
     * NamesReaderSpout.java Create on 2017年6月4日 下午10:57:32    
     *    
     * 类功能说明:   读取名字列表并派发
     *
     * Copyright: Copyright(c) 2013 
     * Company: COSHAHO
     * @Version 1.0
     * @Author coshaho
     */
    public class NamesReaderSpout implements IRichSpout
    {
        private static final long serialVersionUID = 1L;
        private SpoutOutputCollector collector;
    
        @SuppressWarnings("rawtypes")
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
        {
            this.collector = collector;
        }
        
        public void nextTuple() 
        {
            List<Object> list = new ArrayList<Object>();
            try 
            {
                Thread.sleep(5 * 1000);
            } 
            catch (InterruptedException e) 
            {
                e.printStackTrace();
            }
            System.out.println();
            list.add("刘备 关羽 张飞");
            // 第一个参数是传递的业务数据,第二个参数是消息标识,用于追踪消息是否正确处理
            this.collector.emit(list, "stream");
            list.clear();
            list.add("曹操 郭嘉 荀彧");
            this.collector.emit(list, "stream");
            
        }
        
        public void declareOutputFields(OutputFieldsDeclarer declarer) 
        {
            // 必须设置,否则topo启动失败,names对应传递消息第一个元素,即list(0)
            declarer.declare(new Fields("names"));
        }
    
        public void close() {
            // TODO Auto-generated method stub
        }
    
        public void activate() {
            // TODO Auto-generated method stub
        }
    
        public void deactivate() {
            // TODO Auto-generated method stub
        }
    
        public void ack(Object msgId) {
            // TODO Auto-generated method stub
        }
    
        public void fail(Object msgId) {
            // TODO Auto-generated method stub
        }
    
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    }

    2、 名字字符串分割Bolt

    package com.coshaho.learn.storm;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    
    /**
     * 
     * NamesSpliterBolt.java Create on 2017年6月4日 下午10:58:08    
     *    
     * 类功能说明:   名字列表按空格分割
     *
     * Copyright: Copyright(c) 2013 
     * Company: COSHAHO
     * @Version 1.0
     * @Author coshaho
     */
    public class NamesSpliterBolt implements IRichBolt
    {
        private static final long serialVersionUID = 1L;
        private OutputCollector collector;
        
        @SuppressWarnings("rawtypes")
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) 
        {
            this.collector = collector;
        }
    
        public void execute(Tuple input) 
        {
            // 打印线程号用于追踪Storm的分配策略
            Thread current = Thread.currentThread();
            String names = input.getString(0);
            System.out.println("准备拆分" + names + "。当前线程号是" + current.getId() + "。");
            List<Tuple> inputList = new ArrayList<Tuple>();
            inputList.add(input);
            String[] nameArray = names.split(" ");
            for(String name : nameArray)
            {
                List<Object> splitList = new ArrayList<Object>();
                splitList.add(name);
                collector.emit(inputList, splitList);
            }
            collector.ack(input);
        }
        
        public void declareOutputFields(OutputFieldsDeclarer declarer) 
        {
            // 必须设置,否则topo启动失败
            declarer.declare(new Fields("name"));
        }
    
        public void cleanup() {
            // TODO Auto-generated method stub
        }
    
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    }

    3、 HelloWorld Bolt

    package com.coshaho.learn.storm;
    
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Tuple;
    
    /**
     * 
     * HelloWorldBolt.java Create on 2017年6月4日 下午10:58:26    
     *    
     * 类功能说明:   Storm Hello World
     *
     * Copyright: Copyright(c) 2013 
     * Company: COSHAHO
     * @Version 1.0
     * @Author coshaho
     */
    public class HelloWorldBolt implements IRichBolt
    {
        private static final long serialVersionUID = 1L;
        private OutputCollector collector;
    
        @SuppressWarnings("rawtypes")
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) 
        {
            this.collector = collector;
        }
    
        public void execute(Tuple input) 
        {
            // 打印线程号用于追踪Storm的分配策略
            Thread current = Thread.currentThread();
            String name = input.getString(0);
            System.out.println("你好," + name + "。欢迎来到Storm世界。当前线程号是" + current.getId() + "。");
            collector.ack(input);
        }
    
        public void cleanup() {
            // TODO Auto-generated method stub
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO Auto-generated method stub
        }
    
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    }

    4、 Storm TOPO网络任务启动

    package com.coshaho.learn.storm;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    public class StormTest 
    {
        public static void main(String[] args) throws InterruptedException
        {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("names-reader", new NamesReaderSpout());
            // 启动两个名字分割Task,名字列表随机分配给一个Task
            builder.setBolt("names-spliter", new NamesSpliterBolt(), 2)
                .shuffleGrouping("names-reader");
            // 启动两个Hello World Task,相同名字发送到同一个Task
            builder.setBolt("hello-world", new HelloWorldBolt(), 2)
                .fieldsGrouping("names-spliter", new Fields("name"));
            
            Config conf = new Config();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("storm-test", conf, builder.createTopology());
        }
    }

    5、 执行结果

    准备拆分刘备 关羽 张飞。当前线程号是85。
    你好,刘备。欢迎来到Storm世界。当前线程号是79。
    你好,关羽。欢迎来到Storm世界。当前线程号是81。
    你好,张飞。欢迎来到Storm世界。当前线程号是81。
    准备拆分曹操 郭嘉 荀彧。当前线程号是87。
    你好,荀彧。欢迎来到Storm世界。当前线程号是79。
    你好,曹操。欢迎来到Storm世界。当前线程号是81。
    你好,郭嘉。欢迎来到Storm世界。当前线程号是81。
    
    准备拆分刘备 关羽 张飞。当前线程号是87。
    准备拆分曹操 郭嘉 荀彧。当前线程号是85。
    你好,荀彧。欢迎来到Storm世界。当前线程号是79。
    你好,曹操。欢迎来到Storm世界。当前线程号是81。
    你好,郭嘉。欢迎来到Storm世界。当前线程号是81。
    你好,刘备。欢迎来到Storm世界。当前线程号是79。
    你好,关羽。欢迎来到Storm世界。当前线程号是81。
    你好,张飞。欢迎来到Storm世界。当前线程号是81。

    6、 maven依赖

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
    </dependency>
  • 相关阅读:
    读后感
    mysql分库分表的基本方法
    pdo接口用法
    视频技术基础
    【原创】shell易错语法汇总
    php底层的运行机制
    mysql统计函数
    etc/shadow 登陆口令破解
    JAVA学习(方法重载)
    JAVA学习(方法的定义及调用)
  • 原文地址:https://www.cnblogs.com/coshaho/p/6942517.html
Copyright © 2011-2022 走看看