zoukankan      html  css  js  c++  java
  • Storm里面fieldsGrouping和Field的概念详解

    这个Field通常和fieldsGrouping分组机制一起使用,这个Field特别难理解,我自己也是在网上看了好多文章,感觉依旧讲的不是很清楚,是似而非,没有抓到重点。这个问题足足困扰了我3-4天时间,一直理解不了Field的概念,

    当前我觉得new Fields("word")就相当于表的表头,就是定义这个域,这个域里面放的东西,是emit进去的

    如果在declareOutputFields方法中new Fields("word1","word2")有2个及以上的fields,则在emit数据时new Value要与其对应(相当于key与value的关系),然后在topology组装时,fieldsGrouping中的new Fields()可以为new Fields("word1")或new Fields("word2")或new Fields("word1",”word2")来指定接受上游spout或bolt的哪些fields

    官方文档里有这么一句话:“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”

    一个task就是一个处理逻辑的实例,所以fields能根据tuple stream的id,也就是下面定义的xxx
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("xxx"));
    }
    xxx所代表的具体内容会由某一个task来处理,并且同一个xxx对应的内容,处理这个内容的task实例是同一个。

    比如说:

    bolt第一次emit三个流,即xxx有luonq pangyang qinnl三个值,假设分别建立三个task实例来处理:

    luonq -> instance1
    pangyang -> instance2
    qinnl -> instance3

    然后第二次emit四个流,即xxx有luonq qinnanluo py pangyang四个值,假设还是由刚才的三个task实例来处理:
    luonq -> instance1
    qinnanluo -> instance2
    py -> instance3
    pangyang -> instance2

    然后第三次emit两个流,即xxx有py qinnl两个值,假设还是由刚才的三个task实例来处理:
    py -> instance3
    qinnl -> instance3

    最后我们看看三个task实例都处理了哪些值,分别处理了多少次:

    instance1: luonq(处理2次)
    instance2: pangyang(处理2次) qinnanluo(处理1次)
    instance3: qinnl(处理2次) py(处理2次)

    结论:
    1. emit发出的值第一次由哪个task实例处理是随机的,此后再次出现这个值,就固定由最初处理他的那个task实例再次处理,直到topology结束

    2. 一个task实例可以处理多个emit发出的值

    3. 和shuffle Grouping的区别就在于,shuffle Grouping当emit发出同样的值时,处理他的task是随机的


    例子1:
    第一步:定义了一个表头
    public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
            declarer.declare(new Fields("word"));
        }
    第二步:往这个Field空间里面emit进去内容(可以是Bolt和Spolt)
    public void execute(Tuple input, BasicOutputCollector collector)
        {
            String sentence = input.getString(0);
            String[] words = sentence.split(" ");
            for (String word : words)
            {
                word = word.trim();
                if (!word.isEmpty())
                {
                    word = word.toLowerCase();
                    collector.emit(new Values(word));
                }
            }
        }
    第三步:关联步骤
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("word-reader",new WordReader());
    builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
    Integer number = 2;
    builder.setBolt("word-counter", new WordCounter(), 4).fieldsGrouping("word-normalizer", new Fields("word"));

    第四步:
    最终实现的结果:
    Field:Word
                the
                sporm
                is
                ...

    例子2:

    第一步:
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
          declarer.declare(new Fields("word", "count"));
    }

    第二步:
    public void execute(Tuple tuple, BasicOutputCollector collector)
     {
                String word = tuple.getString(0);
                Integer count = counts.get(word);
                if (count == null)
                    count = 0;
                count++;
                counts.put(word, count);
                collector.emit(new Values(word, count));
    }
    第三步:
    Fields("word", "count")
                “is”,1
                “sporm”,3
                “the”,2
                  .....
    例子3:
    D:.....WorkspacesMyEclipse 8.5igDataexamples-ch06-real-life-app-mastersrcmainjavastormanalytics....
    第一步:
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("read-feed", new UsersNavigationSpout(), 3);
    builder.setBolt("get-categ", new GetCategoryBolt(), 3).shuffleGrouping("read-feed");
    builder.setBolt("user-history", new UserHistoryBolt(), 5).fieldsGrouping("get-categ", new Fields("user"));

    第二步:发送者输出是三个结构体:Fields("user","product", "categ")
    GetCategoryBolt.java
    public void execute(Tuple input, BasicOutputCollector collector)
     {
            NavigationEntry entry = (NavigationEntry)input.getValue(1);
            if("PRODUCT".equals(entry.getPageType())){
                try {
                    String product = (String)entry.getOtherData().get("product");

                    // Call the items API to get item information
                    Product itm = reader.readItem(product);
                    if(itm ==null)
                        return ;

                    String categ = itm.getCategory();

                    collector.emit(new Values(entry.getUserId(), product, categ));

                } catch (Exception ex) {
                    System.err.println("Error processing PRODUCT tuple"+ ex);
                    ex.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("user","product", "categ"));
        }

    第三步:new Fields("user"))只取Fields("user","product", "categ"))中的User
    builder.setBolt("user-history", new UserHistoryBolt(), 5).fieldsGrouping("get-categ", new Fields("user"));
    ---------------------
    作者:VessalasdXZ
    来源:CSDN
    原文:https://blog.csdn.net/vessalasd1/article/details/50472123
    版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    PHP数据类型
    Windows定时备份Mysql数据库
    Linux定时删除n天前日志
    使用file_get_contents() 发送GET、POST请求
    使用Git工具批量拉取代码
    Git常用命令
    点击开关按钮,通过改变类名切换按钮
    两个行内元素的间隙问题
    vue和angular双向数据绑定原理
    原生js实现 双向数据绑定
  • 原文地址:https://www.cnblogs.com/wangjing666/p/10025458.html
Copyright © 2011-2022 走看看