zoukankan      html  css  js  c++  java
  • Storm里面fieldsGrouping和Field参数和 declareOutputFields

    Fields,个人理解,类似于一张表,你取那些字段以及这些字段所对应的数据给后面的bolt用

    这个Field通常和fieldsGrouping分组机制一起使用,这个Field特别难理解,我自己也是在网上看了好多文章,感觉依旧讲的不是很清楚,是似而非,没有抓到重点。这个问题足足困扰了我3-4天时间,一直理解不了Field的概念,

    当前我觉得new Fields("word")就相当于表的表头,就是定义这个域,这个域里面放的东西,是emit进去的

    Spouts--->Bolts;
    Bolts---->Bolts;
    Field:"Word"
                the
                sporm
                is
                ...

    例子1:
    第一步:定义了一个表头
    public void declareOutputFields(OutputFieldsDeclarer declarer)
        {
            declarer.declare(new Fields("word"));
        }
    第二步:往这个Field空间里面emit进去内容(可以是Bolt和Spolt)
    public void execute(Tuple input, BasicOutputCollector collector)
        {
            String sentence = input.getString(0);
            String[] words = sentence.split(" ");
            for (String word : words)
            {
                word = word.trim();
                if (!word.isEmpty())
                {
                    word = word.toLowerCase();
                    collector.emit(new Values(word));
                }
            }
        }
    第三步:关联步骤
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("word-reader",new WordReader());
    builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
    Integer number = 2;
    builder.setBolt("word-counter", new WordCounter(), 4).fieldsGrouping("word-normalizer", new Fields("word"));

    第四步:
    最终实现的结果:
    Field:Word
                the
                sporm
                is
                ...

    例子2:

    第一步:
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
          declarer.declare(new Fields("word", "count"));
    }

    第二步:
    public void execute(Tuple tuple, BasicOutputCollector collector)
     {
                String word = tuple.getString(0);
                Integer count = counts.get(word);
                if (count == null)
                    count = 0;
                count++;
                counts.put(word, count);
                collector.emit(new Values(word, count));
    }
    第三步:
    Fields("word", "count")
                “is”,1
                “sporm”,3
                “the”,2
                  .....
    例子3:
    D:.....WorkspacesMyEclipse 8.5igDataexamples-ch06-real-life-app-mastersrcmainjavastormanalytics....
    第一步:
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("read-feed", new UsersNavigationSpout(), 3);
    builder.setBolt("get-categ", new GetCategoryBolt(), 3).shuffleGrouping("read-feed");
    builder.setBolt("user-history", new UserHistoryBolt(), 5).fieldsGrouping("get-categ", new Fields("user"));

    第二步:发送者输出是三个结构体:Fields("user","product", "categ")
    GetCategoryBolt.java
    public void execute(Tuple input, BasicOutputCollector collector)
     {
            NavigationEntry entry = (NavigationEntry)input.getValue(1);
            if("PRODUCT".equals(entry.getPageType())){
                try {
                    String product = (String)entry.getOtherData().get("product");

                    // Call the items API to get item information
                    Product itm = reader.readItem(product);
                    if(itm ==null)
                        return ;

                    String categ = itm.getCategory();

                    collector.emit(new Values(entry.getUserId(), product, categ));

                } catch (Exception ex) {
                    System.err.println("Error processing PRODUCT tuple"+ ex);
                    ex.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("user","product", "categ"));
        }

    第三步:new Fields("user"))只取Fields("user","product", "categ"))中的User
    builder.setBolt("user-history", new UserHistoryBolt(), 5).fieldsGrouping("get-categ", new Fields("user"));


    declareOutputFields方法中声明了该bolt/spout输出的字段个数,供下游使用,在该bolt中的execute方法中,emit发射的字段个数必须和声明的相同


    参考:https://blog.csdn.net/vessalasd1/article/details/50472123

  • 相关阅读:
    oracle导入dmp数据库文件
    Merge into的使用详解-你Merge了没有【转】
    远程调试
    安卓Activity、service是否处于同一进程
    AIDL机制实现进程间的通讯实例
    安卓android:scaleType属性
    oracle索引
    Json-lib用法
    浅谈position: absolute和position:relative
    Tab Layout教程
  • 原文地址:https://www.cnblogs.com/51python/p/11005880.html
Copyright © 2011-2022 走看看