zoukankan      html  css  js  c++  java
  • storm复习笔记

    1.storm基本介绍

      0.storm是免费、开源、分布式、跨语言、可伸缩、低延迟、容错实时流计算系统。每个节点每秒处理百万元组

      1.流计算,动态数据。mr操作的数据都是静态数据,启动mr读取文件,olap离线计算。实时监控实施舆情调查需要通过storm

    2.storm名词解释

      0.Tuple元组:storm中的主要数据结构,是一个有序元素列表,默认情况下,tuple支持所有的数据类型

      1.Stream流:是tuple元组的序列

      2.Spout:数据流的源头,一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中,也可以读取kafka数据队列的消息。根据需求的不同,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源。一个可靠的 Spout能够在它发送的元组处理失败时重新发送该元组,以确保所有的元组都能得到正确的处理;相对应的,不可靠的 Spout 就不会在元组发送之后对元组进行任何其他的处理。一个 Spout可以发送多个数据流。

      3.Bolt:转接头,逻辑处理单元。spout传递数据给bolt,bolt来进行处理并且产生新的数据输出流。bolt可以执行过滤、聚合、连接和交互

      4.Topology:spout和bolt连接在一起形成了一个bolt,一个topology是一个有向图,顶点就是计算,边就是数据流

      5.Task:storm中的每个spout或者bolt都是一个task

    3.storm和hadoop对比

      storm                  hadoop

    实时流处理                  批处理

    无状态                    有状态

    使用zk协同的主从架构             无zk的主从架构

    4.storm架构

      storm是可以容错的、快速的、没有单点故障的分布式应用

      1.Nimbus:master节点,核心组件,主要工作就是运行topology,分析拓扑并收集运行task,分发可用的task给supervisor,监控topology有没有失败,依靠zk监控top的运行状况

      2.Supervisor:worker节点,每个supervisor有多个worker进程,负责代理task给worker,worker孵化执行线程,最终运行task,

        storm使用内部消息系统在nimbus和supervisor之间进行通信

        接收nimbus指令,管理worker进程完成task的派发

      3.worker:执行特定的topology,worker本身不执行任务,而是孵化executors,让executors去执行任务

      4.Executor:本质上是由worker进程孵化出来的一个线程而已。executor运行的task都属于同一个spout或者bolt

      5.task:执行实际上的任务处理工作,或者是spout或者是bolt

    5.storm的工作流程

      1.nimbus等待提交的top

      2.一旦拓扑提交,就会处理拓扑并且来收集所有任务

      3.nimbus分发任务给所有可用的suoervisor

      4.在特定时间间隔之内,所有的supervisor都会发送心跳给nimbus通知他们还活着

      5.当一个supervisor挂掉之后,他就不再发送新桃给nimbus,这个时候nimbus回分派任务到另外一个supervisor

      6.当nimbus本身挂掉之后,并不会影响supervisor继续执行自己的任务,但是不会接受新的任务

      7.task完成之后,supervisor回继续等待新的task,同时挂掉的nimbus可以通过监控工具自动重启

    6.安装和配置storm

    [s201 ~ s204]
    1.jdk
    2.tar
    3.环境变量
    4.验证安装
    $>source /etc/profile
    $>./storm version
    5.分发安装文件到其他节点。

    6.配置
    [storm/conf/storm.yaml]
    storm.local.dir: "/home/centos/storm"
    storm.zookeeper.servers:
    - "s202"
    - "s203"

    storm.zookeeper.port: 2181

    ### nimbus.* configs are for the master
    nimbus.seeds : ["s201"]

    ### ui.* configs are for the master
    ui.host: 0.0.0.0
    ui.port: 8080

    supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
    7.分发

    8.启动进程
    a)启动s201 nimbus进程
    $>storm nimbus &

    b)启动s202 ~ s204 supervisor进程
    $>storm supervisor &

    c)启动s201的ui进程
    $>storm ui &

    9.通过webui查看
    http://s201:8080/

    7.编程实现CallLog日志统计

    --------------------------

    0.pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.it18zhang</groupId>
    <artifactId>StormDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.3</version>
    </dependency>
    </dependencies>

    </project>
    1.创建Spout
    package com.it18zhang.stormdemo;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;

    /**
    * Spout类,负责产生数据流
    */
    public class CallLogSpout implements IRichSpout{

    //Spout输出收集器
    private SpoutOutputCollector collector;

    //是否完成
    private boolean completed = false;

    //上下文
    private TopologyContext context;

    //随机发生器
    private Random randomGenerator = new Random();

    //
    private Integer idx = 0;

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    this.context = context;
    this.collector = collector;
    }

    public void close() {
    }

    public void activate() {
    }

    public void deactivate() {

    }

    /**
    * 下一个元组
    */
    public void nextTuple() {
    if (this.idx <= 1000) {
    List<String> mobileNumbers = new ArrayList<String>();
    mobileNumbers.add("1234123401");
    mobileNumbers.add("1234123402");
    mobileNumbers.add("1234123403");
    mobileNumbers.add("1234123404");

    Integer localIdx = 0;
    while (localIdx++ < 100 && this.idx++ < 1000) {
    //取出主叫
    String caller = mobileNumbers.get(randomGenerator.nextInt(4));
    //取出被叫
    String callee = mobileNumbers.get(randomGenerator.nextInt(4));
    while (caller == callee) {
    //重新取出被叫
    callee = mobileNumbers.get(randomGenerator.nextInt(4));
    }
    //模拟通话时长
    Integer duration = randomGenerator.nextInt(60);

    //输出元组
    this.collector.emit(new Values(caller, callee, duration));
    }
    }
    }

    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {

    }

    /**
    * 定义输出的字段名称
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("from", "to", "duration"));
    }

    public Map<String, Object> getComponentConfiguration() {
    return null;
    }
    }

    2.创建CreatorBolt
    package com.it18zhang.stormdemo;

    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;

    import java.util.Map;

    /**
    * 创建CallLog日志的Bolt
    */
    public class CallLogCreatorBolt implements IRichBolt {
    //
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector ;
    }

    public void execute(Tuple tuple) {
    //处理通话记录
    String from = tuple.getString(0);
    String to = tuple.getString(1);
    Integer duration = tuple.getInteger(2);
    //产生新的tuple
    collector.emit(new Values(from + " - " + to, duration));
    }

    public void cleanup() {

    }

    /**
    * 设置输出字段的名称
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("call", "duration"));
    }

    public Map<String, Object> getComponentConfiguration() {
    return null;
    }
    }

    3.创建CounterBolt
    package com.it18zhang.stormdemo;

    import org.apache.storm.task.IBolt;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;

    import java.util.HashMap;
    import java.util.Map;

    /**
    * 通话记录计数器Bolt
    */
    public class CallLogCounterBolt implements IRichBolt{

    Map<String, Integer> counterMap;
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.counterMap = new HashMap<String, Integer>();
    this.collector = collector;
    }

    public void execute(Tuple tuple) {
    String call = tuple.getString(0);
    Integer duration = tuple.getInteger(1);

    if (!counterMap.containsKey(call)) {
    counterMap.put(call, 1);
    } else {
    Integer c = counterMap.get(call) + 1;
    counterMap.put(call, c);
    }
    collector.ack(tuple);
    }

    public void cleanup() {
    for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {
    System.out.println(entry.getKey() + " : " + entry.getValue());
    }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("call"));
    }

    public Map<String, Object> getComponentConfiguration() {
    return null;
    }
    }

    4.App
    package com.it18zhang.stormdemo;

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;

    /**
    * App
    */
    public class App {
    public static void main(String[] args) throws InterruptedException {
    TopologyBuilder builder = new TopologyBuilder();
    //设置Spout
    builder.setSpout("spout", new CallLogSpout());
    //设置creator-Bolt
    builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
    //设置counter-Bolt
    builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));

    Config conf = new Config();
    conf.setDebug(true);

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());
    Thread.sleep(10000);

    //停止集群
    cluster.shutdown();
    }
    }


    5.在生产环境的集群上部署storm top
    a)修改提交方式
    [App.java]
    public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    //设置Spout
    builder.setSpout("spout", new CallLogSpout());
    //设置creator-Bolt
    builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
    //设置counter-Bolt
    builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));

    Config conf = new Config();
    conf.setDebug(true);

    /**
    * 本地模式storm
    */
    // LocalCluster cluster = new LocalCluster();
    // cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());
    // Thread.sleep(10000);
    StormSubmitter.submitTopology("mytop", conf, builder.createTopology());
    }
    b)导入jar包.
    maven ...

    c)在centos上运行top
    $>storm jar xxx.jar com.it18zhang.stormdemo.App

    8.使用storm流计算实现wordCount

      1.spout

    package com.it18zhang.wc;
    
    import com.it18zhang.util.Util;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    /**
     * Created by stone on 2018/8/18.
     */
    public class WordCountSpount implements IRichSpout {
        private  TopologyContext context;
        private SpoutOutputCollector collector;
        private Random r = new Random();
        private List<String> stats;
    
        public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
           Util.sendToClient(this,"open()",7777);
            this.context = context;
            this.collector = collector;
            stats  = new ArrayList<String>();
            stats.add("hello world tom");
            stats.add("hello world tomas");
            stats.add("hello world tomasLee");
            stats.add("hello world tomson");
    
    
        }
    
        public void close() {
    
        }
    
        public void activate() {
    
        }
    
        public void deactivate() {
    
        }
    
        public void nextTuple() {
            Util.sendToClient(this,"nextTuple()",7777);
            String line =stats.get(r.nextInt(4));
            collector.emit(new Values(line));
        }
    
        public void ack(Object o) {
    
        }
    
        public void fail(Object o) {
    
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //声明字段
            declarer.declare(new Fields("line"));
        }
    
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

      2.splitBolt

     

    package com.it18zhang.wc;
    
    import com.it18zhang.util.Util;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     * Created by stone on 2018/8/18.
     */
    public class SplitBolt implements IRichBolt {
        private TopologyContext context;
        private OutputCollector collector;
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            Util.sendToClient(this,"prepare()",8888);
            this.context=context;
            this.collector=collector;
        }
    
        public void execute(Tuple tuple) {
            Util.sendToClient(this,"execute()",8888);
            String line =tuple.getString(0);
            String[] arr = line.split(" ");
            for(String s:arr){
                collector.emit(new Values(s,1));
    
            }
        }
    
        public void cleanup() {
    
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","count"));
        }
    
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

      3.counterBolt

     

    package com.it18zhang.wc;
    
    import com.it18zhang.util.Util;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import static java.lang.System.out;
    
    /**
     * Created by stone on 2018/8/18.
     */
    public class CountBolt implements IRichBolt {
        private Map<String,Integer> map;
        private TopologyContext context;
        private OutputCollector collector;
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            Util.sendToClient(this,"prepare()",9999);
            this.context=context;
            this.collector=collector;
            map= new HashMap<String,Integer>();
        }
    
        public void execute(Tuple tuple) {
           Util.sendToClient(this,"execute("+tuple.toString()+")",9999);
            String word = tuple.getString(0);
            Integer count = tuple.getInteger(1);
            if(!map.containsKey(word)){
                map.put(word,1);
            }
            else{
                map.put(word,map.get(word) + count);
            }
        }
    
        public void cleanup() {
            for(Map.Entry entry:map.entrySet()){
                out.println(entry.getKey()+":"+entry.getValue());
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","count"));
        }
    
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

      4.App

    package com.it18zhang.wc;
    
    import com.it18zhang.stormDemo.CallLogBolt;
    import com.it18zhang.stormDemo.CallLogCounterBolt;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    /**
     * Created by stone on 2018/8/18.
     */
    public class App {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            //设置spout
            builder.setSpout("wcspout",new WordCountSpount()).setNumTasks(2);
            //设置bolt
            builder.setBolt("split-bolt",new SplitBolt()).shuffleGrouping("wcspout").setNumTasks(3);//通过shuffle的方式随机发送过去
            //设置counterBolt
            builder.setBolt("counter-bolt",new CountBolt()).fieldsGrouping("split-bolt",new Fields("word")).setNumTasks(4);//按照字段来进行分组
    
            Config conf = new Config();
            conf.setDebug(true);
    
            //LocalCluster cluster = new LocalCluster();
    
            //cluster.submitTopology("wordCount",conf,builder.createTopology());
            //Thread.sleep(20000);
            //cluster.shutdown();
            StormSubmitter.submitTopology("mytop1", conf, builder.createTopology());
        }
    }

    9.设置topology的并发程度和任务

    配置并发度

    1.设置worker数据:conf.setNumWorkers(1);

    2.设置并发暗示:

    TopologyBuilder builder = new TopologyBuilder();
    //设置spout的并发暗示
    builder.setSpout("wcspout",new WordCountSpount(),3).setNumTasks(2);
    //设置split-bolt的并发暗示
    builder.setBolt("split-bolt",new SplitBolt(),4).shuffleGrouping("wcspout").setNumTasks(3);//通过shuffle的方式随机发送过去
    //设置counterBolt
    builder.setBolt("counter-bolt",new CountBolt(),5).fieldsGrouping("split-bolt",new Fields("word")).setNumTasks(4);//按照字段来进行分组

    3.设置任务个数
    4.并发度等于所有任务个数的总和
  • 相关阅读:
    nsstring -->nsdictionary
    卫辉市,
    iPhone 手机内存,
    连接错误,等的好漫长,
    BZOJ 1631==USACO 2007== POJ 3268 Cow Party奶牛派对
    POJ 2837 Til the Cows Come Home
    POJ 1285 确定比赛名次
    Codevs 2833 奇怪的梦境
    Codevs 1063 合并果子
    Codevs 1231 最优布线问题
  • 原文地址:https://www.cnblogs.com/bigdata-stone/p/9502087.html
Copyright © 2011-2022 走看看