zoukankan      html  css  js  c++  java
  • 大数据入门第十六天——流式计算之storm详解(三)集群相关进阶

    一、集群提交任务流程分析

      1.集群提交操作

        参考:https://www.jianshu.com/p/6783f1ec2da0

      2.任务分配与启动流程

        参考:https://www.cnblogs.com/heitaok/p/5531535.html

     二、相关目录树

      1.组件本地目录树

      

      2.storm zk目录树

      

     三、集群通信

      Worker间的通信经常需要通过网络跨节点进行,Storm使用ZeroMQ或Netty(0.9以后默认使用)作为进程间通信的消息框架。

      Worker进程内部通信:不同worker的thread通信使用LMAX Disruptor来完成。

        不同topologey之间的通信,Storm不负责,需要自己想办法实现,例如使用kafka等;

       通信图解:

        

      相关博文参考:http://blog.csdn.net/bbaiggey/article/details/55510010?locationNum=10&fps=1

    四、消息容错机制ack-fail

      1.概述

      l 在storm中,可靠的信息处理机制是从spout开始的。

      l 一个提供了可靠的处理机制的spout需要记录他发射出去的tuple,当下游bolt处理tuple或者子tuple失败时spout能够重新发射。

      l Storm通过调用Spout的nextTuple()发送一个tuple。为实现可靠的消息处理,首先要给每个发出的tuple带上唯一的ID,并且将ID作为参数传递给  SoputOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), msgId); messageid就是用来标示唯一的tuple的,而rootid是随机生成的

    给每个tuple指定ID告诉Storm系统,无论处理成功还是失败,spout都要接收tuple树上所有节点返回的通知。

      如果处理成功,spout的ack()方法将会对编号是 msgId的消息应答确认;如果处理失败或者超时,会调用fail()方法。

      2.基本实现

      Storm 系统中有一组叫做"acker"的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。

      acker任务保存了spout id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为"ack val", 它是树中所有消息的随机id的异或计算结果。

      ack val表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息id发送过来做异或。 每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被完全处理了

    要实现ack机制:
    1,spout发射tuple的时候指定messageId
    2,spout要重写BaseRichSpout的fail和ack方法
    3,spout对发射的tuple进行缓存(否则spout的fail方法收到acker发来的messsageId,
      spout也无法获取到发送失败的数据进行重发),看看系统提供的接口,
      只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,
      用户如何取得原来的msg貌似需要自己cache,然后用这个msgId去查询,太坑爹了
    3,spout根据messageId对于ack的tuple则从缓存队列中删除,对于fail的tuple可以选择重发。 4,设置acker数至少大于0;Config.setNumAckers(conf, ackerParal);

      3.代码示例

    package ackfail;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.topology.TopologyBuilder;
    
    /**
     * Created by maoxiangyi on 2016/4/25.
     */
    public class MyAckFailTopology {
    
        public static void main(String[] args) throws Exception {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("mySpout", new MySpout(), 1);
            topologyBuilder.setBolt("mybolt1", new MyBolt1(), 1).shuffleGrouping("mySpout");
    
            Config conf = new Config();
            String name = MyAckFailTopology.class.getSimpleName();
            if (args != null && args.length > 0) {
                String nimbus = args[0];
                conf.put(Config.NIMBUS_HOST, nimbus);
                conf.setNumWorkers(1);
                StormSubmitter.submitTopologyWithProgressBar(name, conf, topologyBuilder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology(name, conf, topologyBuilder.createTopology());
                Thread.sleep(60 * 60 * 1000);
                cluster.shutdown();
            }
        }
    }
    MyAckFailTopology
    package ackfail;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IControlSpout;
    import backtype.storm.topology.IRichSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.UUID;
    
    /**
     * Created by maoxiangyi on 2016/4/25.
     */
    public class MySpout extends BaseRichSpout {
        private SpoutOutputCollector collector;
        private Random rand;
        private Map<String,Values> buffer = new HashMap<>();
    
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
            rand = new Random();
        }
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void nextTuple() {
            String[] sentences = new String[]{"the cow jumped over the moon",
                    "the cow jumped over the moon",
                    "the cow jumped over the moon",
                    "the cow jumped over the moon", "the cow jumped over the moon"};
            String sentence = sentences[rand.nextInt(sentences.length)];
            String messageId = UUID.randomUUID().toString().replace("-", "");
            Values tuple = new Values(sentence);
            collector.emit(tuple, messageId);
            buffer.put(messageId,tuple);
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void ack(Object msgId) {
            System.out.println("消息处理成功,id= " + msgId);
            buffer.remove(msgId);
        }
    
        @Override
        public void fail(Object msgId) {
            System.out.println("消息处理失败,id= " + msgId);
            Values tuple = buffer.get(msgId);
            collector.emit(tuple,msgId);
        }
    }
    MySpout
    package ackfail;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.*;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     * Created by maoxiangyi on 2016/4/25.
     */
    public class MyBolt1 extends BaseRichBolt {
        private OutputCollector collector;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(Tuple input) {
            String sentence = input.getString(0);
            String[] words = sentence.split(" ");
            for (String word : words) {
                word = word.trim();
                if (!word.isEmpty()) {
                    word = word.toLowerCase();
                    collector.emit(input, new Values(word));
                }
            }
            collector.ack(input);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    
    }
    MyBolt1

      4.参考阅读

        https://www.cnblogs.com/intsmaze/p/5918087.html

  • 相关阅读:
    [转载]我的PMP复习备考经验谈(下篇)——一本关于PMP备考的小指南
    安装MongoDB遇到问题
    安装MongoDB遇到问题
    (热死你)Resin https ssl Linux 配置,实战可用
    高性能web服务器(热死你)Resin Linux的安装、配置、部署,性能远超Nginx支持Java、PHP等
    我最近用Python写了一个算法,不需要写任何规则就能自动识别一个网页的内容
    20161230实时量化监控,成效显著,实在忍不住要给大家秀一把
    16年收官之战,堪称完美,祝愿大家2017一举成名天下闻,虎啸龙吟展宏图
    我3年前开发的IM即时通讯一直没勇气推出,现在智能时代了,有什么可以结合的地方吗?
    忙活了一周时间,开发了一个年会抽奖系统,免费开放给大家(含操作视频及下载地址)
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8521070.html
Copyright © 2011-2022 走看看