zoukankan      html  css  js  c++  java
  • Storm之详解spout、blot

    1、Topology的构造
    backtype.storm.topology.TopologyBuilder

    2、Spout组件的编写
    实现接口 backtype.storm.topology.IRichSpout;
    或者继承backtype.storm.topology.base.BaseRichSpout;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    // TODO Auto-generated method stub

    }
    open 方法,是spout的组件初始化方法,而且Spout实例创建后首先被调用,只调用一次


    @Override
    public void close() {
    // 对于资源的释放关闭,可以在该方法中实现
    }

    @Override
    public void nextTuple() {
    // 实现如何从数据源上获取数据的逻辑
    // 以及向后面的组件bolt发射数据
    }

    nextTuple 循环调用


    @Override
    public void ack(Object msgId) {

    }

    Topology启用了消息可靠性保障机制,当某个Tuple在Topology上处理成功后,调用ack方法执行一些消息处理成功后该干的事情

    @Override
    public void fail(Object msgId) {
    // Topology启用了消息可靠性保障机制,某个Tuple在后面处理失败,该干什么

    // 比如重试,重试达到最大可重试(比如三次)就丢弃
    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // 声明向后面组件发射的Tuple keys依次是什么

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    // 设置该组件Spout一些专用的参数
    return null;
    }

    kafkaSpout 向后发射的Tuple {"str":"msg"}

    注意点:
    Topology中使用的一些类,最好都要实现序列化接口 java.io.Serializable

    3、Bolt组件
    实现backtype.storm.topology.IRichBolt
    或者继承backtype.storm.topology.base.BaseRichBolt

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    //类似于spout中open方法

    }

    SpoutOutputCollector spout组件中tuple的发射器

    OutputCollector bolt组件中tuple发射器

    @Override
    public void execute(Tuple input) {
    // TODO Auto-generated method stub

    }
    execute 类似于Spout的nextTuple方法


    @Override
    public void cleanup() {
    // TODO Auto-generated method stub

    }
    类似于spout中close方法

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // 声明向后面组件发射的Tuple keys依次是什么

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    // 设置该组件Spout一些专用的参数
    return null;
    }

    4、数据流分组 方式
    shuffleGrouping 随机分配
    fieldsGrouping 根据key分组进行分配
    globalGrouping 全局分组 只会将tuple往后面组件中固定一个上发送

    5、消息可靠性保障机制

    启用消息可靠性保障机制:ack、fail

    Spout端:

    1)发射器发射tuple时,需要指定一个msgID
    collector.emit(new Values(sentence),mssageId );

    2)使用缓存所发射的tuple,Map key=msgID,value = Values

    private Map<Object,Values> tuples;

    3)ack方法
    // 确认发射成功,将tuple从缓存中移除
    tuples.remove(msgId);


    4)fail方法
    重试
    // 重试
    Values values = tuples.get(msgId);

    // 重新发射
    collector.emit(values,msgId );


    Bolt端:
    1)如果bolt端继续往后面组件发射,需要锚定前面的tuple
    // 启用消息可靠性保障机制,需要锚定接收到tuple
    collector.emit(input,new Values(word));

    2)处理完tuple后
    // 确认处理结束
    collector.ack(input);

    try{
    }catch{
    // 处理失败
    collector.fail(input);
    }

  • 相关阅读:
    使用 Istio 进行 JWT 身份验证(充当 API 网关)
    DNS 私有域的选择:internal.xxx.com/lan.xxx.com 还是 xxx.local/xxx.srv?
    「Bug」K8s 节点的 IP 地址泄漏,导致 IP 被耗尽
    Linux网络学习笔记(二):域名解析(DNS)——以 CoreDNS 为例
    Linux 发行版的选用(服务器和个人桌面)
    「Bug」VMware 虚拟机的关机测试中,Ubuntu 明显比 CentOS 慢
    VMware vSphere :服务器虚拟化
    「Bug」ubuntu 使用国内 apt 源构建 docker 时提示 hash 不匹配
    留言板
    Idea 自定义快捷代码输入 如syso => System.out.println()
  • 原文地址:https://www.cnblogs.com/WardSea/p/7366163.html
Copyright © 2011-2022 走看看