zoukankan      html  css  js  c++  java
  • Storm Topology及分组原理

    Storm的通信机制,需要满足如下一些条件以满足Storm的语义。

    1、建立数据传输的缓冲区。在通信连接没有建立之前把发送的数据缓存起来。数据发送方可以在连接建立之前发送消息,而不需要等连接建立起来,可是的接收方是独立运行的。

    2、在消息传输层保证消息最多只能发送一次,Storm系统有ACK机制,是的没有被发送成功的消息会被重发,若消息层面也重发,会导致消息发送多次。

    这种消息机制由两个接口来定义,backtype.storm.messaging.IContext和backtype.storm.messaging.IConnection.

    IContext负责客户端和服务器端建立的连接,主要有四个方法。

    1、prepare(Map stormConf):总从Storm定义的prepare方法,可以接收storm的配置。

    2、term():终止,方法会在worker卸载这个传输插件的时候调用,自定义实现时可以在这里释放占用的资源。

    3、bind(String topologyId,int port):建立服务器端的连接。

    4、connect(String stormId,String host,int port):建立一个客户端的连接。

    IConnect定义了在IContext上发送、接收数据的接口。

    1、recv(int flag):接收消息。

    2、send(int taskId,byte[] payload):发送消息。

    3、close():该连接关闭的时候调用,释放相关资源。

    Topology原理整理

      从运行时Topology的实际执行过程角度,作业是由多个组件的实例,也即任务,按照构造时简历的逻辑顺序呢和配置的并发度,形成的数据流图结构。

      流(stream)是Storm中对传递的数据进行的抽象,流是时间上无限的数据项Tuple序列。Spout是Stream的源,为Topology从特定数据源获取数据项,并向作业中发射(emit)形成Stream。(项目中使用了kafkaspout,接收后进行数据校验再使用emit发送给bolt),bolt可以同时接受任意多个上游送达的Stream作为输入,进行数据的处理过程,也可以在bolt做完处理后执行(emit)发射新的Stream继续给下游的Bolt进行处理。

      Stream中的Tuple可以被指定结构,由一个或多个域(field)组成。Tuple的定义不必是严格统一的,而是可以在每个spout,bolt中定义。默认情况下Tuple可以包含基本类型,如integers、longs、shorts、bytes、strings、doubles、floats、booleans和byte arrays.

    流组模式

    1、Shuffle Grouping 随机分组

    public void createTopology(TopologyBuilder builder){
        kafkaSpout kafkaspout = getKafkaSpout(topicName);
        //Topology中增加一个Spout
        builder.setSpout(...)
        //在Topology中增加一个Bolt,可设置并行度,以随机分组的方式发送,shuffleGrouping后的参数为源组建的Id
        builder.setBolet(boltName,new BlackListBolt(),3).shuffleGrouping(spoutName);
    }

    在这种流组模式下,源组件将其发送的数据项,以随机的方式向其所有目标组件发送,可以保证每个目标组件收到数量近似的Tuple。

    2、All Grouping 副本分组

    //allGrouping(java.lang.String componentId)
    //allGrouping(java.lang.String componentId,java.lang.String streamId)
    //参数streamId是声明的流的标识
    builder.setBolet(boltName,new BlackListBolt(),3).allGrouping(spoutName,"signals“);

    在这种模式下,源组件将其发送的数据项,以副本的形式向其所有目标组件发送,可以保证每个目标组件均收到同一个Tuple,就好比zookeeper的配置文件同步一样,每个bolt都会收到同一份。

    3、Global Grouping 全局分组

    这种模式下,源组件将其发送的数据项,全部发送给目标组件的某一个实例,而且该实例是这个组件中ID最小的那个任务。可以保证所有数据项只会被目标组件的一份实例(一个bolt)所处理

    builder.setBolet(boltName,new BlackListBolt(),3).globalGrouping(SpoutName);

    4.Fiellds Grouping 按域分组

    builder.setBolet(boltName,new BlackListBolt(),3).fieldsGrouping(spoutName,new Field("域名");

    源组件将其发送的数据项,按Tuple中指定域的值分组,向下游目标组件发送,可以保证拥有相同域组合的值的Tuple,被发送给同一个Bolt.

    5、Direct Grouping 直接分组

    builder.setSpout("kafkaSpout",topicSpout)
    builder.setBolt(boltname1,new boltName1(),1).shuffleGrouping("kafkaSpout");
    //以直接分组的模式接收上述bolt发送的数据项
    builder.setBolt(boltname2,new boltname2(),2).directGrouping(boltname1);

    源组件将其发送的数据项,以直接指定目标组件的方式发送,可以使指定组件接收给定的Tuple.需要注意的是,接收bolt的executle()函数中,哟啊使用emitDirect()替代emit,用于向指定的具名流中发送数据项

    构建Topology

    构建TopologyBuilder主要给出了三类方法:创建Topology、增加bolt和增加Spout的方法。setBolt和setSpout接口各有不同多种重载方法,均返回用于声明组件输入的对象。

    1、id:组件(spout、Bolt)的标识,字符串类型,若需要引用该组件,就使用这里指定的标识ID。比如使用"kafkaSpout"

    2、bolt:添加的bolt对象,再setBolt的重载方法中,存在IRichBolt和IBasicBolt两类bolt参数,项目中用到的是IRichBolt,区别在于,BasicBolt用于非聚集处理,能够自动进行(anchoring)和(acking)

    3、spout:添加的Spout对象,在setSpout方法中该参数是IRichSpout类型的Spout接口。

    4、parallelism_hint:并行度,数值型参数。设置组件运行时将要被分配的线程数量。

    参考:《Storm 大数据流式计算及应用实践》

  • 相关阅读:
    sql2slack alash3al 开源的又个轻量级工具
    pgspider fetchq 扩展docker镜像
    godns 集成coredns 的demo
    godns 简单dnsmasq 的dns 替换方案
    aviary.sh 一个基于bash的分布式配置管理工具
    使用coredns 的template plugin实现一个xip 服务
    nginx 代理 coredns dns 服务
    基于nginx proxy dns server
    几个不错的geodns server
    spring boot rest api 最好添加servlet.context-path
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/5487987.html
Copyright © 2011-2022 走看看