zoukankan      html  css  js  c++  java
  • storm group 的介绍与使用

    一.stream group分组介绍

     Stream 的分组分为随机分组、字段分组、全部分组、全局分组、无分组、直接分组,自定义分组

    二.group的介绍

      1.Shuffle grouping:通过tuple获取任务到supout,然后再由spout将任务分发到Bolt上。这种分组是随机性的,没有规律可言,任务的多少可能会跟被分配机器性能有关。

      2.Fields grouping :   根据指定字段将tuple进行分组。例如,根据“user-id”字段,相同“user-id”的tuple总是分发到task上,不同“user-id”的tuple可能分发到不同的task上。

      3.All grouping  : tuple被复制到bolt的所有任务。这种类型需要谨慎使用。

      4.Global grouping : 在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。确切的说,是分配给ID最小的那个task执行

      5.Direct grouping : global grouping 实现刚好相反的作用。这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

      6.local or shuffle grouping :  假如上游的组件是spoutbolt,下游是一个bolt时,假如通过shufflefield分组恰好上游的task到下游的task时,两个work恰好是同一个work,都在一个jvm进程里面,正常情况下,我们会起多个solt点,让上游的task发送到下游的task时分配了2jvm进程,会通过tcp/rcp的方式进行通信,但是,如果上游的task和下游的task时在同一个进程时是没必要进行通信的,所以采用了本地或随机分组方式,减少网络通信的消耗,提高storm的运算效率

      7.None grouping :  你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到BoltsSpouts订阅它们的同一线程去执行(如果可能)。

      8.Custom grouping : 一般情况下,我们不会自定义grouping

    三.group 的具体实现

    1.随机分组(Shuffle grouping)

    =========================================  Topology  ===============================================
    public class ShuffleGroupingTopology {
        
        private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingTopology.class);
        
        public static void main(String[] args) {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("ShuffleGroupingSpout", new ShuffleGroupingSpout(),1);
            builder.setBolt("ShuffleGroupingBolt",new ShuffleGroupingBolt(),2).shuffleGrouping("ShuffleGroupingSpout");
            builder.setBolt("ShuffleGroupingBolt2",new ShuffleGroupingBolt2(),2).shuffleGrouping("ShuffleGroupingBolt");
            
            Config config = new Config();
            config.setNumWorkers(3);
            try {
                StormSubmitter.submitTopology("ShuffleGroupingTopology", config, builder.createTopology());
                log.warn("==================================================");
                log.warn("the topology {} is submitted.","ShuffleGroupingTopology");
                log.warn("==================================================");
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }
    =====================================   Spout  =========================================
    public class ShuffleGroupingSpout extends BaseRichSpout{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private AtomicInteger ai; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.context = context; this.ai = new AtomicInteger(); log.warn("ShuffleGroupingSpout ------> open:hashcode{} ---->taskId:{}",this.hashCode(),context.getThisTaskId()); } @Override public void nextTuple() { int i =this.ai.getAndIncrement(); if(i<10){ log.warn("ShuffleGroupingSpout ------> nextTuple:hashcode:{} ---->taskId:{} ----->value:{}",this.hashCode(),context.getThisTaskId(),i); collector.emit(new Values(i)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("i")); } @Override public void close() { log.warn("ShuffleGroupingSpout ------> close:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } } ====================================== Bolt ============================================ public class ShuffleGroupingBolt extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("ShuffleGroupingBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("i"); collector.emit(new Values(i*10)); log.warn("ShuffleGroupingBolt ------> prepare:hashcode:{} ---->taskId:{} ---->value:{}",this.hashCode(),context.getThisTaskId(),i); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result")); } @Override public void cleanup() { log.warn("ShuffleGroupingBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } } ====================================== Bolt2 ============================================ public class ShuffleGroupingBolt2 extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt2.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("result"); log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode:{} ---->taskId:{} ---->result:{}",this.hashCode(),context.getThisTaskId(),i); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //nothing to do } @Override public void cleanup() { log.warn("ShuffleGroupingBolt2 ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } }

    2.字段分组(Fields grouping)

    //======================================  Topology ============================================
    public class FieldsShuffleTopology {
      private static final Logger log = LoggerFactory.getLogger(FieldsShuffleTopology.class);
        
        public static void main(String[] args) {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("FieldsShuffleSpout", new FieldsShuffleSpout(),1);
            builder.setBolt("FieldsShuffleUpperBolt",new FieldsShuffleUpperBolt(),2).shuffleGrouping("FieldsShuffleSpout");
            builder.setBolt("FieldsShuffleFinalBolt",new FieldsShuffleFinalBolt(),2).fieldsGrouping("FieldsShuffleUpperBolt", new Fields("upperName"));
            
            Config config = new Config();
            config.setNumWorkers(3);
            try {
                StormSubmitter.submitTopology("FieldsShuffleTopology", config, builder.createTopology());
                log.warn("==================================================");
                log.warn("the topology {} is submitted.","FieldsShuffleTopology");
                log.warn("==================================================");
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }
    
    ======================================  Spout  ============================================
    
    public class FieldsShuffleSpout extends BaseRichSpout {
        private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class);
    
        private SpoutOutputCollector collector;
        private TopologyContext context;
        private List<String> list;
        private int index;
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.context = context;
            this.index = 0;
            this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word");
            log.warn("FieldsShuffleSpout open:hashcode{} taskId:{}", this.hashCode(), context.getThisTaskId());
        }
    
        @Override
        public void nextTuple() {
            if (index < list.size()) {
                String s = list.get(index++);
                log.warn("FieldsShuffleSpout nextTuple:hashcode:{} taskId:{} value:{}", this.hashCode(),
                        context.getThisTaskId(), s);
                collector.emit(new Values(s));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("i"));
        }
    
        @Override
        public void close() {
            log.warn("FieldsShuffleSpout close:hashcode:{} taskId:{} ", this.hashCode(), context.getThisTaskId());
        }
    }
    
    ======================================  Bolt ==========================================
    public class FieldsShuffleUpperBolt extends BaseBasicBolt{
    
    private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class);
        
        private  TopologyContext context;
        
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            this.context = context;
            log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId());
            
        }
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String s = input.getStringByField("i");
            collector.emit(new Values(s.toUpperCase()));
            log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("upperName"));
        }
        
        @Override
        public void cleanup() {
            log.warn("FieldsShuffleUpperBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId());
        }
    }
    
    ======================================  Bolt2 ==========================================
    public class FieldsShuffleFinalBolt extends BaseBasicBolt{
        
        private static final Logger log = LoggerFactory.getLogger(FieldsShuffleFinalBolt.class);
        
        private  TopologyContext context;
        
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            this.context = context;
            log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId());
            
        }
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String s = input.getStringByField("upperName");
            log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //nothing to do
        }
        
        @Override
        public void cleanup() {
            log.warn("FieldsShuffleFinalBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId());
        }
    }

    3.全部分组(All grouping

      tuple数据将会被复制到下游的所有的bolt的任务中。这种类型需要谨慎使用。

    public class AllGroupTopology {
        
        private static final Logger log = LoggerFactory.getLogger(AllGroupTopology .class);
        
        public static void main(String[] args) {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("AllGrouppingSpout", new AllGrouppingSpout(),1);
            builder.setBolt("AllBolt1",new AllBolt1(),1).shuffleGrouping("AllGrouppingSpout");
            builder.setBolt("AllBolt2",new AllBolt2(),1).shuffleGrouping("AllGrouppingSpout");
             
            Config config = new Config();
            config.setNumWorkers(3);
            try {
                StormSubmitter.submitTopology("AllGroupTopology", config, builder.createTopology());
                log.warn("==================================================");
                log.warn("the topology {} is submitted.","AllGroupTopology");
                log.warn("==================================================");
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }
    ======================================  AllGrouppingSpout ==========================================
    public class AllGrouppingSpout extends BaseRichSpout {
        
        private static final Logger log = LoggerFactory.getLogger(AllGrouppingSpout.class);
    
        private SpoutOutputCollector collector;
        private TopologyContext context;
        private List<String> list;
        private int index;
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.context = context;
            this.index = 0;
            this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word");
            log.warn("AllGrouppingSpout open:hashcode{} taskId:{}", this.hashCode(), context.getThisTaskId());
        }
    
        @Override
        public void nextTuple() {
            if (index < list.size()) {
                String s = list.get(index++);
                log.warn("AllGrouppingSpout nextTuple:hashcode:{} taskId:{} value:{}", this.hashCode(),
                        context.getThisTaskId(), s);
                collector.emit(new Values(s));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("name"));
        }
    }
    ======================================  AllBolt1  ==========================================
    public class AllBolt1 extends BaseBasicBolt{
    
        private static final Logger log = LoggerFactory.getLogger(AllBolt1.class);
        
        private  TopologyContext context;
        
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            this.context = context;
            log.warn("AllBolt1 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId());
        }
        
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String s = input.getStringByField("name");
            collector.emit(new Values(s.toUpperCase()));
            log.warn("AllBolt1 execute:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //nothing to do 
        }
    }
    ======================================  AllBolt2  ==========================================
    
    public class AllBolt2 extends BaseBasicBolt{
    
        private static final Logger log = LoggerFactory.getLogger(AllBolt2.class);
        
        private  TopologyContext context;
        
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            this.context = context;
            log.warn("AllBolt2 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId());
        }
        
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String s = input.getStringByField("name");
            collector.emit(new Values(s.toUpperCase()));
            log.warn("AllBolt2 execute :hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //nothing to do 
        }
    }

    //通过查看日志得出,2bolt都收到了spout8条数据,也就是说,shuffle groupping的分组并没有起到作用,还是从spout中获取了8条数据,所以是将任务的副本全拷贝过来了

     

    4.全局分组(Global grouping):

      在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。

      确切的说,是分配给ID最小的那个task执行。

      需求:随机将数据分发到doubleBolt上,在最后一个bolt上做整合操作

    public class GlobalGroupingTopology {
        
        private final static Logger log = LoggerFactory.getLogger(GlobalGroupingTopology.class);
        
        public static void main(String[] args) {
    
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(), 1);
            builder.setBolt("NumberDoubleBolt", new NumberDoubleBolt(), 2).shuffleGrouping("NumberGenerateSpout");
            builder.setBolt("NumberPrintBolt", new NumberPrintBolt(), 2).globalGrouping("NumberDoubleBolt");
    
            Config config = new Config();
            config.setNumWorkers(4);
            try {
                StormSubmitter.submitTopology("GlobalGroupingTopology", config, builder.createTopology());
                log.warn("==================================================");
                log.warn("the topology {} is submitted.", "GlobalGroupingTopology");
                log.warn("==================================================");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    // ==============================  spout =======================================
    public class NumberGenerateSpout extends BaseRichSpout{
        
        private SpoutOutputCollector collector;
        
        private AtomicInteger counter;
        
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.counter = new AtomicInteger(0);
            
        }
    
        @Override
        public void nextTuple() {
            while(counter.get()< 10){
                collector.emit(new Values(counter.getAndIncrement()));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("i"));
        }
    }
    
    // =========================== bolt ====================================
    public class NumberDoubleBolt extends BaseBasicBolt{ @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer value = input.getIntegerByField("i"); collector.emit(new Values(value*2,10)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("i","constant")); } } // ========================== bolt ==================================== public class NumberPrintBolt extends BaseBasicBolt{ private final static Logger logger = LoggerFactory.getLogger(NumberPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; logger.warn("============== perpare TaskID:{}",context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("i"); Integer constant = input.getIntegerByField("constant"); logger.warn("taskID:{},instantID:{},i:{},constant:{}",context.getThisTaskId(),this,i,constant); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }

    5.直接分组(Direct grouping

      global grouping 实现刚好相反的作用,这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

    public class DirectGroupingTopology {
        
        private static final Logger log = LoggerFactory.getLogger(DirectGroupingTopology.class);
        
        public static void main(String[] args) {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(),1);
            builder.setBolt("NumberDoubleBolt",new NumberDoubleBolt(),2).directGrouping("NumberGenerateSpout");
            
            Config config = new Config();
            config.setNumWorkers(3);
            try {
                StormSubmitter.submitTopology("DirectGroupingTopology", config, builder.createTopology());
                log.warn("==================================================");
                log.warn("the topology {} is submitted.","DirectGroupingTopology");
                log.warn("==================================================");
                
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }
    
    // ============================= spout ===============================
    public class NumberGenerateSpout extends BaseRichSpout { private SpoutOutputCollector collector; private AtomicInteger counter; private int destTaskID; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; counter = new AtomicInteger(0); List<Integer> tasks = context.getComponentTasks("NumberDoubleBolt"); destTaskID = tasks.stream().mapToInt(Integer::intValue).max().getAsInt(); } @Override public void nextTuple() { while(counter.get() < 10){ collector.emitDirect(destTaskID,new Values(counter.getAndIncrement())); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(true,new Fields("i")); } }
    // =============================== bolt ==============================================
    public class NumberDoubleBolt extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(NumberDoubleBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn(" =================== taskId:{}",context.getThisTaskId()); }; @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer value = input.getIntegerByField("i"); log.warn("taskID:{},instanceID:{},value:{}",context.getThisTaskId(),this,value); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }

    6.本地或随机分组local or shuffle grouping

    public class LocalLogTopology {
        
        private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class);
        
        public static void main(String[] args) {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("LogSpout", new LogSpout(),1);
            builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout");
            builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).localOrShuffleGrouping("LogSpout");
            
            Config config = new Config();
            config.setDebug(false);
            config.setNumWorkers(4);
            try {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("LocalLogTopology", config, builder.createTopology());
                
                log.warn("==================================================");
                log.warn("the topology {} is submitted.","LocalLogTopology");
                log.warn("==================================================");
                
                TimeUnit.SECONDS.sleep(120);
                
                cluster.killTopology("LocalLogToplogy");
                cluster.shutdown();
                
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }
    // =============================== spout ==============================
    public class LogSpout extends BaseRichSpout{
        
        private SpoutOutputCollector collector;
        
        private List<String> list;
        
        private int index;
        
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.index = 0;
            this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM",
                    "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c");
            
        }
        
        @Override
        public void nextTuple() {
            while(index < list.size()){
                collector.emit(new Values(list.get(index++)));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("entity"));
        }
    }
    //============================== bolt =====================================
    public class LogParseBolt extends BaseBasicBolt{
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String entity = input.getStringByField("entity");
            List<String> list = Splitter.on(",").splitToList(entity);
            
            collector.emit(new Values(list.get(0),list.get(1))); 
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("category","item"));
        }
    
    }
    
    //============================= bolt ===========================================
    public class LogPrintBolt extends BaseBasicBolt{ private static final Logger LOG = LoggerFactory.getLogger(LogPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; LOG.info("=========================================="); LOG.info("prepare taskID:{}",context.getThisTaskId()); LOG.info("=========================================="); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String category = input.getStringByField("category"); String item = input.getStringByField("item"); LOG.info("=========================================="); LOG.info("execute:category:{},item:{},taskID:{}",category,item,context.getThisTaskId()); LOG.info("=========================================="); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }

    7.无分组(None grouping)

      你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到BoltsSpouts订阅它们的同一线程去执行(如果可能)。

    8.Custom grouping(自定义)

      

    public class LocalLogTopology {
        
        private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class);
        
        public static void main(String[] args) {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("LogSpout", new LogSpout(),1);
            builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout");
            builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).customGrouping("LogParseBolt", new HighTaskIDGrouping());
            
            Config config = new Config();
            config.setDebug(false);
            config.setNumWorkers(4);
            try {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("LocalLogTopology", config, builder.createTopology());
                
                log.warn("==================================================");
                log.warn("the topology {} is submitted.","LocalLogTopology");
                log.warn("==================================================");
                
                TimeUnit.SECONDS.sleep(120);
                
                cluster.killTopology("LocalLogToplogy");
                cluster.shutdown();
                
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }
    // =============================== spout ==============================
    public class LogSpout extends BaseRichSpout{
        
        private SpoutOutputCollector collector;
        
        private List<String> list;
        
        private int index;
        
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.index = 0;
            this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM",
                    "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c");
            
        }
        
        @Override
        public void nextTuple() {
            while(index < list.size()){
                collector.emit(new Values(list.get(index++)));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("entity"));
        }
    }
    //============================== bolt =====================================
    public class LogParseBolt extends BaseBasicBolt{
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String entity = input.getStringByField("entity");
            List<String> list = Splitter.on(",").splitToList(entity);
            
            collector.emit(new Values(list.get(0),list.get(1))); 
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("category","item"));
        }
    
    }
    
    //============================= custom grouping ===========================================
    
    public class HighTaskIDGrouping implements CustomStreamGrouping{
        
        private int taskID;
        
        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            //List<Integer> targetTasks: 下游所有的tasks的集合
            ArrayList<Integer> tasks = new ArrayList<>(targetTasks);
            Collections.sort(tasks);        //从小到大排列
            this.taskID = tasks.get(tasks.size() -1);
        }
    
        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
             
            return Arrays.asList(taskID);
        }
    
    }
  • 相关阅读:
    Atitit.播放系统规划新版本 v4 q18 and 最近版本回顾
    Atitit.播放系统规划新版本 v4 q18 and 最近版本回顾
    atitit.极光消息推送服务器端开发实现推送  jpush v3. 总结o7p
    atitit.极光消息推送服务器端开发实现推送  jpush v3. 总结o7p
    Atitit.文件搜索工具 attilax 总结
    Atitit.文件搜索工具 attilax 总结
    Atitit.软件命名空间  包的命名统计 及命名表(2000个名称) 方案java package
    Atitit.软件命名空间  包的命名统计 及命名表(2000个名称) 方案java package
    Atitit..状态机与词法分析  通用分词器 分词引擎的设计与实现 attilax总结
    Atitit..状态机与词法分析  通用分词器 分词引擎的设计与实现 attilax总结
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11066077.html
Copyright © 2011-2022 走看看