zoukankan      html  css  js  c++  java
  • storm入门(二):关于storm中某一段时间内topN的计算入门

    刚刚接触storm 对于滑动窗口的topN复杂模型有一些不理解,通过阅读其他的博客发现有两篇关于topN的非滑动窗口的介绍。然后转载过来。

    下面是第一种:

    Storm的另一种常见模式是对流式数据进行所谓“streaming top N”的计算,它的特点是持续的在内存中按照某个统计指标(如出现次数)计算TOP N,然后每隔一定时间间隔输出实时计算后的TOP N结果。

    流式数据的TOP N计算的应用场景很多,例如计算twitter上最近一段时间内的热门话题、热门点击图片等等。

    下面结合Storm-Starter中的例子,介绍一种可以很容易进行扩展的实现方法:首先,在多台机器上并行的运行多个Bolt,每个Bolt负责一部分数据的TOP N计算,然后再有一个全局的Bolt来合并这些机器上计算出来的TOP N结果,合并后得到最终全局的TOP N结果。

    该部分示例代码的入口是RollingTopWords类,用于计算文档中出现次数最多的N个单词。首先看一下这个Topology结构:

    Topology构建的代码如下:

     
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("word", new TestWordSpout(), 5);
            builder.setBolt("count", new RollingCountObjects(60, 10), 4)
                     .fieldsGrouping("word", new Fields("word"));
            builder.setBolt("rank", new RankObjects(TOP_N), 4)
                     .fieldsGrouping("count", new Fields("obj"));
            builder.setBolt("merge", new MergeObjects(TOP_N))
                     .globalGrouping("rank");
     

    (1)首先,TestWordSpout()是Topology的数据源Spout,持续随机生成单词发出去,产生数据流“word”,输出Fields是“word”,核心代码如下:

     
        public void nextTuple() {
            Utils.sleep(100);
            final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
            final Random rand = new Random();
            final String word = words[rand.nextInt(words.length)];
            _collector.emit(new Values(word));
      }
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
      }
     

    (2)接下来,“word”流入RollingCountObjects这个Bolt中进行word count计算,为了保证同一个word的数据被发送到同一个Bolt中进行处理,按照“word”字段进行field grouping;在RollingCountObjects中会计算各个word的出现次数,然后产生“count”流,输出“obj”和“count”两个Field,其中对于synchronized的线程锁我们也可以换成安全的容器,比如ConcurrentHashMap等组件。核心代码如下:

     
        public void execute(Tuple tuple) {
    
            Object obj = tuple.getValue(0);
            int bucket = currentBucket(_numBuckets);
            synchronized(_objectCounts) {
                long[] curr = _objectCounts.get(obj);
                if(curr==null) {
                    curr = new long[_numBuckets];
                    _objectCounts.put(obj, curr);
                }
                curr[bucket]++;
                _collector.emit(new Values(obj, totalObjects(obj)));
                _collector.ack(tuple);
            }
        }
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("obj", "count"));
        }
     

    (3)然后,RankObjects这个Bolt按照“count”流的“obj”字段进行field grouping;在Bolt内维护TOP N个有序的单词,如果超过TOP N个单词,则将排在最后的单词踢掉,同时每个一定时间(2秒)产生“rank”流,输出“list”字段,输出TOP N计算结果到下一级数据流“merge”流,核心代码如下:

     
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            Object tag = tuple.getValue(0);
            Integer existingIndex = _find(tag);
            if (null != existingIndex) {
                _rankings.set(existingIndex, tuple.getValues());
            } else {
                _rankings.add(tuple.getValues());
            }
            Collections.sort(_rankings, new Comparator<List>() {
                public int compare(List o1, List o2) {
                    return _compare(o1, o2);
                }
            });
            if (_rankings.size() > _count) {
                _rankings.remove(_count);
            }
            long currentTime = System.currentTimeMillis();
            if(_lastTime==null || currentTime >= _lastTime + 2000) {
                collector.emit(new Values(new ArrayList(_rankings)));
                _lastTime = currentTime;
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("list"));
        }
     

    (4)最后,MergeObjects这个Bolt按照“rank”流的进行全局的grouping,即所有上一级Bolt产生的“rank”流都流到这个“merge”流进行;MergeObjects的计算逻辑和RankObjects类似,只是将各个RankObjects的Bolt合并后计算得到最终全局的TOP N结果,核心代码如下:

     
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            List<List> merging = (List) tuple.getValue(0);
            for(List pair : merging) {
                Integer existingIndex = _find(pair.get(0));
                if (null != existingIndex) {
                    _rankings.set(existingIndex, pair);
                } else {
                    _rankings.add(pair);
                }
    
                Collections.sort(_rankings, new Comparator<List>() {
                    public int compare(List o1, List o2) {
                        return _compare(o1, o2);
                    }
                });
    
                if (_rankings.size() > _count) {
                    _rankings.subList(_count, _rankings.size()).clear();
                }
            }
    
            long currentTime = System.currentTimeMillis();
            if(_lastTime==null || currentTime >= _lastTime + 2000) {
                collector.emit(new Values(new ArrayList(_rankings)));
                LOG.info("Rankings: " + _rankings);
                _lastTime = currentTime;
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("list"));
        }
     

    另外,还有一种很聪明的方法,只在execute中插入数据而不emit,而在prepare中进行emit,创建线程根据时间进行监听。

    1. package test.storm.topology;
    2. import test.storm.bolt.WordCounter;
    3. import test.storm.bolt.WordWriter;
    4. import test.storm.spout.WordReader;
    5. import backtype.storm.Config;
    6. import backtype.storm.StormSubmitter;
    7. import backtype.storm.generated.AlreadyAliveException;
    8. import backtype.storm.generated.InvalidTopologyException;
    9. import backtype.storm.topology.TopologyBuilder;
    10. import backtype.storm.tuple.Fields;
    11. public class WordTopN {
    12.     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    13.         if (args == null || args.length < 1) {  
    14.             System.err.println("Usage: N");
    15.             System.err.println("such as : 10");
    16.             System.exit(-1);
    17.         }
    18.         TopologyBuilder builder = new TopologyBuilder();
    19.         builder.setSpout("wordreader", new WordReader(), 2);
    20.         builder.setBolt("wordcounter", new WordCounter(), 2).fieldsGrouping("wordreader", new Fields("word"));
    21.         builder.setBolt("wordwriter", new WordWriter()).globalGrouping("wordcounter");
    22.         Config conf = new Config();
    23.         conf.put("N", args[0]);
    24.         conf.setDebug(false);
    25.         StormSubmitter.submitTopology("topN", conf, builder.createTopology());
    26.     }
    27. }

    这里需要注意的几点是,第一个bolt的分组策略是fieldsGrouping,按照字段分组,这一点很重要,它能保证相同的word被分发到同一个bolt上,
    像做wordcount、TopN之类的应用就要使用这种分组策略。
    最后一个bolt的分组策略是globalGrouping,全局分组,tuple会被分配到一个bolt用来汇总。
    为了提高并行度,spout和第一个bolt均设置并行度为2(我这里测试机器性能不是很高)。

    点击(此处)折叠或打开

    1. package test.storm.spout;
    2. import java.util.Map;
    3. import java.util.Random;
    4. import java.util.concurrent.atomic.AtomicInteger;
    5. import backtype.storm.spout.SpoutOutputCollector;
    6. import backtype.storm.task.TopologyContext;
    7. import backtype.storm.topology.OutputFieldsDeclarer;
    8. import backtype.storm.topology.base.BaseRichSpout;
    9. import backtype.storm.tuple.Fields;
    10. import backtype.storm.tuple.Values;
    11. public class WordReader extends BaseRichSpout {
    12.     private static final long serialVersionUID = 2197521792014017918L;
    13.     private SpoutOutputCollector collector;
    14.     private static AtomicInteger i = new AtomicInteger();
    15.     private static String[] words = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m",
    16.             "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
    17.     @Override
    18.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    19.         this.collector = collector;
    20.     }
    21.     @Override
    22.     public void nextTuple() {
    23.         if (i.intValue() < 100) {
    24.             Random rand = new Random();
    25.             String word = words[rand.nextInt(words.length)];
    26.             collector.emit(new Values(word));
    27.             i.incrementAndGet();
    28.         }
    29.     }
    30.     @Override
    31.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    32.         declarer.declare(new Fields("word"));
    33.     }
    34. }

    spout的作用是随机发送word,发送100次,由于并行度是2,将产生2个spout实例,所以这里的计数器使用了static的AtomicInteger来保证线程安全。


    点击(此处)折叠或打开

    1. package test.storm.bolt;
    2. import java.util.ArrayList;
    3. import java.util.Collections;
    4. import java.util.Comparator;
    5. import java.util.HashMap;
    6. import java.util.List;
    7. import java.util.Map;
    8. import java.util.Map.Entry;
    9. import java.util.concurrent.ConcurrentHashMap;
    10. import backtype.storm.task.OutputCollector;
    11. import backtype.storm.task.TopologyContext;
    12. import backtype.storm.topology.IRichBolt;
    13. import backtype.storm.topology.OutputFieldsDeclarer;
    14. import backtype.storm.tuple.Fields;
    15. import backtype.storm.tuple.Tuple;
    16. import backtype.storm.tuple.Values;
    17. public class WordCounter implements IRichBolt {
    18.     private static final long serialVersionUID = 5683648523524179434L;
    19.     private static Map<String, Integer> counters = new ConcurrentHashMap<String, Integer>();
    20.     private volatile boolean edit = true;
    21.     @Override
    22.     public void prepare(final Map stormConf, TopologyContext context, final OutputCollector collector) {
    23.         new Thread(new Runnable() {
    24.             @Override
    25.             public void run() {
    26.                 while (true) {
    27.                     //5秒后counter不再变化,可以认为spout已经发送完毕
    28.                     if (!edit) {
    29.                         if (counters.size() > 0) {
    30.                             List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>();
    31.                             list.addAll(counters.entrySet());
    32.                             Collections.sort(list, new ValueComparator());
    33.                             //向下一个bolt发送前N个word
    34.                             for (int i = 0; i < list.size(); i++) {
    35.                                 if (i < Integer.parseInt(stormConf.get("N").toString())) {
    36.                                     collector.emit(new Values(list.get(i).getKey() + ":" + list.get(i).getValue()));
    37.                                 }
    38.                             }
    39.                         }
    40.                         //发送之后,清空counters,以防spout再次发送word过来
    41.                         counters.clear();
    42.                     }
    43.                     edit = false;
    44.                     try {
    45.                         Thread.sleep(5000);
    46.                     } catch (InterruptedException e) {
    47.                         e.printStackTrace();
    48.                     }
    49.                 }
    50.             }
    51.         }).start();
    52.     }
    53.     @Override
    54.     public void execute(Tuple tuple) {
    55.         String str = tuple.getString(0);
    56.         if (counters.containsKey(str)) {
    57.             Integer c = counters.get(str) + 1;
    58.             counters.put(str, c);
    59.         } else {
    60.             counters.put(str, 1);
    61.         }
    62.         edit = true;
    63.     }
    64.     private static class ValueComparator implements Comparator<Map.Entry<String, Integer>> {
    65.         @Override
    66.         public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
    67.             return entry2.getValue() - entry1.getValue();
    68.         }
    69.     }
    70.     @Override
    71.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    72.         declarer.declare(new Fields("word_count"));
    73.     }
    74.     @Override
    75.     public void cleanup() {
    76.     }
    77.     @Override
    78.     public Map<String, Object> getComponentConfiguration() {
    79.         return null;
    80.     }
    81. }

    在WordCounter里面有个线程安全的容器ConcurrentHashMap,来存储word以及对应的次数。在prepare方法里启动一个线程,长期监听edit的状态,监听间隔是5秒,
    当edit为false,即execute方法不再执行、容器不再变化,可以认为spout已经发送完毕了,可以开始排序取TopN了。这里使用了一个volatile edit(回忆一下volatile的使用场景:
    对变量的修改不依赖变量当前的值,这里设置true or false,显然不相互依赖)。


    点击(此处)折叠或打开

    1. package test.storm.bolt;
    2. import java.io.FileWriter;
    3. import java.io.IOException;
    4. import java.util.Map;
    5. import backtype.storm.task.TopologyContext;
    6. import backtype.storm.topology.BasicOutputCollector;
    7. import backtype.storm.topology.OutputFieldsDeclarer;
    8. import backtype.storm.topology.base.BaseBasicBolt;
    9. import backtype.storm.tuple.Tuple;
    10. public class WordWriter extends BaseBasicBolt {
    11.     private static final long serialVersionUID = -6586283337287975719L;
    12.     private FileWriter writer = null;
    13.     public WordWriter() {
    14.     }
    15.     @Override
    16.     public void prepare(Map stormConf, TopologyContext context) {
    17.         try {
    18.             writer = new FileWriter("/data/tianzhen/output/" + this);
    19.         } catch (IOException e) {
    20.             e.printStackTrace();
    21.         }
    22.     }
    23.     @Override
    24.     public void execute(Tuple input, BasicOutputCollector collector) {
    25.         String s = input.getString(0);
    26.         try {
    27.             writer.write(s);
    28.             writer.write(" ");
    29.             writer.flush();
    30.         } catch (IOException e) {
    31.             e.printStackTrace();
    32.         } finally {
    33.             //writer不能close,因为execute需要一直运行
    34.         }
    35.     }
    36.     @Override
    37.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    38.     }
    39. }

    最后一个bolt做全局的汇总,这里我偷了懒,直接将结果写到文件了,省略截取TopN的过程,因为我这里就一个supervisor节点,所以结果是正确的。

    引用连接:http://blog.itpub.net/28912557/viewspace-1579860/

         http://www.cnblogs.com/panfeng412/archive/2012/06/16/storm-common-patterns-of-streaming-top-n.html

  • 相关阅读:
    vue学习之vuex的入门
    Vue的入门之安装
    JS之作用域链
    map去重value值
    增量部署和全量部署
    js跳转页面的方法
    Quartz定时任务时间设置
    @RequestParam和@PathVariable用法小结
    nginx负载均衡的5种策略(转载)
    使用Joda-Time优雅的处理日期时间
  • 原文地址:https://www.cnblogs.com/zguood/p/4528195.html
Copyright © 2011-2022 走看看