zoukankan      html  css  js  c++  java
  • Storm starter Overview

    Storm的starter例子, 都给的很有诚意, 不光是例子, 而是可以直接使用在实际的场景里面.
    并且提高一些很有用的tool, 比如SlidingWindowCounter, TimeCacheMap
    所以starter可以说是提高了基于storm编程的框架, 值得认真研究一下...

    ExclamationTopology, 基本的Topology

    没有什么特别的地方, 标准的例子

    /**
     * This is a basic example of a Storm topology.
     */
    public class ExclamationTopology {
        
        public static class ExclamationBolt extends BaseRichBolt {
            OutputCollector _collector;
    
            @Override
            public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
                _collector = collector;
            }
    
            @Override
            public void execute(Tuple tuple) {
                _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
                _collector.ack(tuple);
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
        }
        
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("word", new TestWordSpout(), 10);        
            builder.setBolt("exclaim1", new ExclamationBolt(), 3)
                    .shuffleGrouping("word");
            builder.setBolt("exclaim2", new ExclamationBolt(), 2)
                    .shuffleGrouping("exclaim1");
                    
            Config conf = new Config();
            conf.setDebug(true);
            
            if(args!=null && args.length > 0) {
                conf.setNumWorkers(3);
                
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
            
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", conf, builder.createTopology());
                Utils.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();    
            }
        }
    }

    RollingTopWords

    实现了TopN和滑动窗口功能
    这个例子的Bolt实现的很有指导意义, Storm starter - RollingTopWords

    SingleJoinExample

    通过TimeCacheMap, 实现基于memory的join, Storm starter - SingleJoinExample

    BasicDRPCTopology, ReachTopology

    关于DRPC的例子, 参考Twitter Storm – DRPC

    TransactionalGlobalCount, TransactionalWords

    Transactional Topology, Storm - Transactional-topologies

    TransactionalGlobalCount比较简单, 看看TransactionalWords
    在对word计数的基础上, 加上word count分布统计信息

    public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
    public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
    使用Count_Database来记录word的计数
    使用Bucket_Database来记录word计数的分布, 比如, 出现0~9次的word有多少, 10~20的word有多少
    public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter

    对于KeyedCountUpdater和前面的简单例子没有啥大区别, 在execute时对word进行count, 在finishBatch时, 直接commit到Count_Database
    输出, new Fields("id", "key", "count", "prev-count"), 其他都好理解, 为啥需要prev-count? 因为在更新Bucket_Database, 需要知道该word的bucket是否发生迁移, 所以必须知道之前的count

    Bucketize, 根据count/BUCKET_SIZE, 算出应该属于哪个bucket
    如果新的word, 直接在某bucket +1
    如果word的bucket发生变化, 在新的bucket +1, 旧的bucket –1
    如果没有变化, 不需要输出

        public static class Bucketize extends BaseBasicBolt {
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
                int curr = tuple.getInteger(2);
                Integer prev = tuple.getInteger(3);
    
                int currBucket = curr / BUCKET_SIZE;
                Integer prevBucket = null;
                if(prev!=null) {
                    prevBucket = prev / BUCKET_SIZE;
                }
                
                if(prevBucket==null) {
                    collector.emit(new Values(attempt, currBucket, 1));                
                } else if(currBucket != prevBucket) {
                    collector.emit(new Values(attempt, currBucket, 1));
                    collector.emit(new Values(attempt, prevBucket, -1));
                }
            }
            
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("attempt", "bucket", "delta"));
            }
        }
    BucketCountUpdater, 也就是将上面的bucket的更新, 更新到Bucket_Database

    Topology定义如下,

    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
    builder.setBolt("count", new KeyedCountUpdater(), 5)
            .fieldsGrouping("spout", new Fields("word"));
    builder.setBolt("bucketize", new Bucketize())
            .noneGrouping("count");
    builder.setBolt("buckets", new BucketCountUpdater(), 5)
            .fieldsGrouping("bucketize", new Fields("bucket"));

    WordCountTopology, 多语言的支持

    Storm 多语言支持

    分别使用ShellBolt和BaseBasicBolt来声明使用python和Java实现的Blot

        public static class SplitSentence extends ShellBolt implements IRichBolt {
            
            public SplitSentence() {
                super("python", "splitsentence.py");
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
    
            @Override
            public Map<String, Object> getComponentConfiguration() {
                return null;
            }
        }  
        
        public static class WordCount extends BaseBasicBolt {
            Map<String, Integer> counts = new HashMap<String, Integer>();
    
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String word = tuple.getString(0);
                Integer count = counts.get(word);
                if(count==null) count = 0;
                count++;
                counts.put(word, count);
                collector.emit(new Values(word, count));
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word", "count"));
            }
        }

    在定义Topology的时候, 可以直接将ShellBolt和BaseBasicBolt混合使用, 非常方便

            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("spout", new RandomSentenceSpout(), 5);
            
            builder.setBolt("split", new SplitSentence(), 8)
                     .shuffleGrouping("spout");
            builder.setBolt("count", new WordCount(), 12)
                     .fieldsGrouping("split", new Fields("word"));
  • 相关阅读:
    《C》指针
    《C》变量
    《C》数组
    《C》VS控制台应用
    listagg wm_concat 行转列
    Linux学习之shell script
    Linux学习之正则表达式sed
    Linux学习之正则表达式grep
    Linux学习之SAMBA共享(密码验证)
    Linux学习之SAMBA共享(无密码)
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3097417.html
Copyright © 2011-2022 走看看