zoukankan      html  css  js  c++  java
  • 57.storm拓扑结构调整

    几个概念

    Topology(拓扑):Spout、Bolt组成的一个完整的流程结构;

    Stream Grouping:流分组、数据的分发方式;

    Spout:直译 水龙头,也就是 消息源 的意思;

    Bolt:螺栓、处理器。很形象,水从上面的那个“水龙头”流出来,经过第一个螺栓,经过第二个螺栓,经过第三第四个螺栓...

    Worker:工作进程

    Executor:执行器、task的线程;

    Task:具体执行的任务;

    Configuration:配置。

    实际操作

    回顾

    在上上节(55节),本地模式跑起来的,只有一个JVM(虽然分配了两个,cfg.setNumWorkers(2); 但是在本地跑的时候只可能启动一个JVM),那么拓扑的执行情况应该是下面这样的,一个worker下有几个Executor,每个Executor分别对应一个Spout或者Bolt(图片中的bolt和Spout名称和代码不一致,请对号入座):

    结构调整

    如果我们把带码稍作改动(只需要改Topology)

     1 import backtype.storm.Config;
     2 import backtype.storm.LocalCluster;
     3 import backtype.storm.StormSubmitter;
     4 import backtype.storm.topology.TopologyBuilder;
     5 import bhz.bolt.PrintBolt;
     6 import bhz.bolt.WriteBolt;
     7 import bhz.spout.PWSpout;
     8 
     9 public class PWTopology2 {
    10 
    11     public static void main(String[] args) throws Exception {
    12         
    13         Config cfg = new Config();
    14         cfg.setNumWorkers(2);//设置使用俩个工作进程
    15         cfg.setDebug(false);
    16         TopologyBuilder builder = new TopologyBuilder();
    17         //设置sqout的并行度和任务数(产生2个执行器和俩个任务)
    18         builder.setSpout("spout", new PWSpout(), 2);//.setNumTasks(2);
    19         //设置bolt的并行度和任务数:(产生2个执行器和4个任务)
    20         builder.setBolt("print-bolt", new PrintBolt(), 2).shuffleGrouping("spout").setNumTasks(4);
    21         //设置bolt的并行度和任务数:(产生6个执行器和6个任务)
    22         builder.setBolt("write-bolt", new WriteBolt(), 6).shuffleGrouping("print-bolt");
    23         
    24         
    25         //1 本地模式
    26 //        LocalCluster cluster = new LocalCluster();
    27 //        cluster.submitTopology("top2", cfg, builder.createTopology());
    28 //        Thread.sleep(10000);
    29 //        cluster.killTopology("top2");
    30 //        cluster.shutdown();
    31         
    32         //2 集群模式
    33         StormSubmitter.submitTopology("top2", cfg, builder.createTopology());
    34         
    35     }
    36 }

    1.谈谈本地为什么会生成那么多文件

    如果我们以本地模式启动,那么运行结果将会是这样的,temp文件夹里有6个文件

    那么为什么会产生6个文件呢?注意代码的第22行

    这里设置6个执行器来执行WriteBolt,默认每个执行器是一个task,也就是有6个task;再来看看WriteBolt是怎么形成文件的:

    可以看出,每个线程都会在099_test下形成一个文件。

    2.spout部分的拓扑结构调整

    假如,我们只修改这一行代码

    那么在只有一个JVM的情况下,整体拓扑结构应该类似这样的:

    3.如果完全修改成上面的代码,并且可以启动多个JVM的话,呢么拓扑结构应该是下面这样的(图片中的bolt和Spout名称和代码不一致,请对号入座):

     

     针对上面的这种拓扑结构的总结:

    遇到一点问题,storm supervisor & 启动报错java.lang.RuntimeException: java.io.EOFException:

    解决办法:删除storm.yaml中配置的storm.local.dir指向的目录中的supervisor和workers两个目录,再次启动即可。

  • 相关阅读:
    CPU和Memory压力测试方法
    WIN 系统怎么样查看EXpressCache功能
    OGG 源端与目标端 约束不一致
    oracle 查询角色具有的权限
    Mysql 主从一致校验工具------Maatkit工具包
    从库找不到对应的被删除的记录
    python安装包是出现错误解决
    mysql5.7用户密码策略问题
    Centos7上安装docker
    SQL通过身份证获取信息
  • 原文地址:https://www.cnblogs.com/sigm/p/6882952.html
Copyright © 2011-2022 走看看