zoukankan      html  css  js  c++  java
  • Storm(2015.08.12笔记)

    2015.08.12Storm

     

    一、Storm简介

    Storm是Twitter开源的一个类似于Hadoop的实时数据处理框架。

     

    Storm能实现高频数据和大规模数据的实时处理。

    官网资料显示storm的一个节点在1秒钟能够处理100万个100字节的消息(IntelE5645@2.4Ghz的CPU,24GB的内存)

    (storm +kafka+flume)

     

     

    二、HADOOP与STORM比较

    数据来源:HADOOP处理的是HDFS上TB级别的数据(历史数据),STORM是处理的是实时新增的某一笔数据(实时数据);

     

    处理过程:HADOOP是分MAP阶段和REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT);

     

    是否结束:HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始;

     

    处理速度:HADOOP是以处理HDFS上大量数据为目的,处理速度慢,STORM是只要处理新增的某一笔数据即可,可以做到很快;

     

    适用场景:HADOOP是在要处理批量数据时用的,不讲究时效性,STORM是要处理某一新增数据时用的,要讲时效性;

     

     

     

     

    package storm;

     

    import java.util.Map;

     

    import backtype.storm.Config;

    import backtype.storm.LocalCluster;

    import backtype.storm.spout.SpoutOutputCollector;

    import backtype.storm.task.OutputCollector;

    import backtype.storm.task.TopologyContext;

    import backtype.storm.topology.OutputFieldsDeclarer;

    import backtype.storm.topology.TopologyBuilder;

    import backtype.storm.topology.base.BaseRichBolt;

    import backtype.storm.topology.base.BaseRichSpout;

    import backtype.storm.tuple.Fields;

    import backtype.storm.tuple.Tuple;

    import backtype.storm.tuple.Values;

     

    /**

    * 数据累加的操作,spout产生数据(在这里自己产生),bolt对数据累加求和

    * @author Administrator

    *

    */

    public class LocalStormTopology {//都在一个类里面实现,需要写一个静态name类

        public static class DataSourceSpout extends BaseRichSpout{//spout继承BaseRichSpout类,它有3个未实现的方法

            private Map conf;//第一个未实现的方法。不知道会不会用上,先保存

            private TopologyContext context;

            private SpoutOutputCollector collector;

              

            

            /**

             * 本实例运行的是被调用一次,只能执行一次。

             */

            public void open(Map conf, TopologyContext context,

                    SpoutOutputCollector collector) {//第一个未实现的方法,本实例运行时被调用一次,Map conf配置参数,可以获取topology的配置信息,TopologyContext理解为Topology的上下文,collector,发射器,将spout产生的数据发射出去

                this.conf = conf;

                this.context = context;

                this.collector = collector;

            }

            /**

             * 死循环的调用,心跳

             */

            int i=0;

            public void nextTuple() {//第二个未实现的方法,程序运行过程中不断被调用,调用此方法会不断产生数据

                System.out.println("spout:"+i);//打印每次产生的数据

                this.collector.emit(new Values(i++));//将产生的数据发射出去,发射需要emit这个方法,接受的是tuple,是list类型,tuple里面放的是列表的数据,里面是封装了列表;storm里面封装了一个values类,new Values相当于一个tuple,直接将i传进去,点进去,他继承了ArrayList,相当于创建了一个list,往list里面添加一个元素,每次都会发送数据,所以i++,每次递增加一

                try {

                    Thread.sleep(1000);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

            /**

             * declarer方法的意思是声明输出的内容

             */

            public void declareOutputFields(OutputFieldsDeclarer declarer) {//第三个未实现的方法,spout只管发射数据,不指定目标,需要指定发射数据的的名称,bolt读取有标示的数据

                declarer.declare(new Fields("num"));//declare方法接受参数fields,就new一个Fields,Fileds可接受一个可变参数(3个点)或者传一个list列表,需要指定一个字段名称,自定义为num。前面tuple封装了一个数据,这里对应也是一个字段,如果前面发射两个参数(i++,i+2),这里就指定2个字段('num''num'),bolt可通过num(num2)字段获取i++数据(i+2)。spout发射的数据与字段num做了关联

            }

        }

        

        //写一个静态name类,也要实现3个未实现的方法

        

        public static class Sumbolt extends BaseRichBolt{

            private Map stormConf;

            private TopologyContext context;

            private OutputCollector collector;

            public void prepare(Map stormConf, TopologyContext context,

                    OutputCollector collector) {//第一个未实现的方法

                this.stormConf = stormConf;

                this.context = context;

                this.collector = collector;//只会用到这个

            }

            int sum = 0;

            public void execute(Tuple input) {//这个方法,bolt也有个死循环,不断读取数据,每次获取的也是一个Tuple

                //input.getInteger(0);//通过get方法Integer传递的是参数,获取的数据是列表的脚标,获取的是list的元素,不建议使用这种方式。

                Integer value = input.getIntegerByField("num");//因为之前已经指定了num字段,所以通过num字段获取i++的值,使用declare方法指定的字段

                sum+=value;//因为bolt需要对获取的字段的值累加求和

                System.out.println("sum:"+sum);//直接将sum打印出来,打印每次累加求和的结果

                //this.collector.emit(tuple);

            }

     

            public void declareOutputFields(OutputFieldsDeclarer declarer) {

                //因为这个bolt已经是最后一个bolt,bolt不需要往外发射数据,这里不需要定义字段

            }

        }

          

          

        

        //需要topology。main函数将spout和bolt组装在一起

        public static void main(String[] args) {

            TopologyBuilder topologyBuilder = new TopologyBuilder();//先new TopologyBuilder,

            topologyBuilder.setSpout("spout_id", new DataSourceSpout());//首先设置spout,id简单自定义为spout_id,后面需要具体指定spout类

            topologyBuilder.setBolt("bolt_id", new Sumbolt()).shuffleGrouping("spout_id");//需要将spout和bolt连接起来,bolt接spout,在bolt调用一个方法shuffleGrouping(指定'spout_id')

            

            LocalCluster localCluster = new LocalCluster();//组装后需要运行,在本地运行,造一个本地的轨道

            localCluster.submitTopology("topology", new Config(), topologyBuilder.createTopology());//第一个是topology的名称,第二个是配置参数是map结构,storm提供了一个配置类,new Config(点进去,继承了HashMap),后面需要一个storm Topology,我们指定为topologyBuilder.createTopology()

        }

     

    }

     

     

     

  • 相关阅读:
    ESlint中console.log报错问题
    for、forEach、for in、for of用法
    如何覆盖elementUI样式
    什么是闭包(closure),为什么要用它?
    写一个通用的事件侦听器函数
    javascripts 浅拷贝和深拷贝
    箭头函数
    用 async/await 来处理异步
    DOM事件类
    arguments 详解
  • 原文地址:https://www.cnblogs.com/liuyifeng/p/5690906.html
Copyright © 2011-2022 走看看