zoukankan      html  css  js  c++  java
  • storm trident 的介绍与使用

    一.trident 的介绍

      trident 的英文意思是三叉戟,在这里我的理解是因为之前我们通过之前的学习topology spout bolt 去处理数据是没有问题的,但trident 的对spout bolt 更高层次的一个抽象,其实现功能是一样的,只不过是trident做了更多的优化和封装.如果对处理的性能要求比较高,建议要采用spout bolt 来处理,反之则可以用trident

      trident 你可以这样理解,本身这个拓扑就是分散的,如果一个spout 可以有2个bolt,跟三叉戟比较像。(个人理解)

      因为trident是对storm 更高一层的抽象,它与之前学的spout bolt处理数据流的方式不一样,trident 是以batch(一组tuples)为单位进行处理的。

    二.trident API操作

      trident采用批处理的方式来处理数据,其API的操作是对数据处理的方式改成了函数。对数据处理的操作有:filter sum aggregator等

      function函数的操作都是对流中的tuple进行操作的

      下面介绍 trident 常用的API

      1.each(Fields inputFields, Filter filter
          作用:操作batch中的每一个tuple内容,一般与Filter或者Function函数配合使用。 
    
      2.peek(Consumer action
        作用:不作任务操作,传的参数是consumer,类似于System.out.println
    
      3.partitionBy(Fields fields)
        作用:将tuples中的数据按设置的字段重定向到下一处理逻辑,设置相同字段的tuple一定会被分配到同一个线程中处理。
    

     三.trident 的常用函数

      1.FilterFunction 过滤  
        作用:对一组batch 中的tuple数据过滤     实现过程是:自定义类实现BaseFilter接口,重写isKeep()方法,在each()方法中使用自定义的类即可   
    2.SumFunction 求和     作用:对流中的数据进行加减     实现过程:自定义类实现BaseFunction接口,重写execute方法,在each()方法中使用
      3
    .MapFunction (一对一函数)     作用: 对一个tuple进行自定义操作     实现过程:自定义类实现MapFunction接口,重写execute()方法,通过map()方法使用
      
    4.ProjectionFunction (投影函数)     作用:投影函数,只保留stream中指定字段的数据。     实现过程:在project()方法中定义所需字段即可     例:有一个Stream包含如下字段: ["x","y","z"],使用投影: project(new Fields("y", "z")) 则输出的流仅包含 ["y","z"]字段   5.repatition(重定向)     
        作用:重定向是指tuple通过下面哪种方式路由到下一层     shuffle:  通过随机分配算法来均衡tuple到各个分区     broadcast: 每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery     partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区     global:   所有的tuple都被发送到这个分区上,这个分区用来处理整个Stream的tuple数据的,但这个线程是独立起的     batchGlobal:一个batch中的tuple都被发送到同一个分区,不同的batch会去往不同的分区     partition: 通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping   

      6.Aggregation(聚合)     
        在storm的trident中处理数据是以批的形式进行处理的,所以在聚合时也是对批量内的数据进行的。经过aggregation的tuple,是被改变了原有的数据状态     在Aggregator接口中有3个方法需要实现       init()   : 当batch接收到数据时执行。并对tuple中的数据进行初始化       aggregate(): 在接收到batch中的每一个tuple时执行,该方法一个重定向方法,它会随机启动一个单独的线程来进行聚合操作       complete() : 在一个batch的结束时执行

        6.1 partitionAggregator

          它是对当前partition上的各个batch执行聚合操作,它不是一个重定向操作,即统计batch上的tuple的操作
       6.2 aggregator

          对一批batch中的tuple数据进行聚合
         
       
    6.3 ReduceAggregator
          对一批batch中第n个元素的操作 

        6.4 CombinerAggregate

          对一批batch中的tuple进行聚合操作,它是一个重定向操作

        6.5 PersistenceAggrgator

          持久化聚合器,在聚合之前先将数据存到一个位置,然后再对数据进行聚合操作

        6.6 AggregateChina

         聚合链,对一批batch 中的tuple进行多条件聚合操作

       7.GroupBy

          GroupBy会根据指定字段,把整个stream切分成一个个grouped stream,如果在grouped stream上做聚合操作,那么聚合就会发生在这些grouped stream上而不是整个batch。      如果groupBy后面跟的是aggregator,则是聚合操作,如果跟的是partitionAggregate,则不是聚合操作。 

         

    四.trident常用函数示例

      1.FilterFunction

        需求:在 一组数据中,过滤出第1个值与第2个值相加的值是偶数的

    public class FilterTrident {
        private static final Logger LOG = LoggerFactory.getLogger(FilterTrident.class);
    
        @SuppressWarnings("unchecked")
        public static void main(String[] args) throws InterruptedException {
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("a","b","c","d"), 3, 
                                    new Values(1,4,7,10),
                                    new Values(1,1,3,11),
                                    new Values(2,2,7,1), 
                                    new Values(2,5,7,2));
            spout.setCycle(false);
            Config conf = new Config();
            conf.setNumWorkers(4);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            // peek: 不做任务操作,因为参数的consumer
            // each:spout中的指定元素进行操作
            topology.newStream("filter", spout).parallelismHint(1)
                    .localOrShuffle()
                    .peek(input -> LOG.info("peek1 ================{},{},{},{}",input.get(0),input.get(1),input.get(2),input.get(3)))
                    .parallelismHint(2)
                    .localOrShuffle()
                    .each(new Fields("a","b"),new CheckEvenSumFilter())
                    .parallelismHint(2)
                    .localOrShuffle()
                    .peek(input -> LOG.info("peek2 +++++++++++++++++++{},{},{},{}",
                        input.getIntegerByField("a"),input.getIntegerByField("b"),
                        input.getIntegerByField("c"),input.getIntegerByField("d"))
                    ).parallelismHint(1);
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("FilterTrident", conf, topology.build());
            
            LOG.warn("==================================================");
            LOG.warn("the LocalCluster topology {} is submitted.","FilterTrident");
            LOG.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("FilterTrident");
            cluster.shutdown();
        }
        
        private static class CheckEvenSumFilter extends BaseFilter{
    
            @Override
            public boolean isKeep(TridentTuple tuple) {
                Integer a = tuple.getIntegerByField("a");
                Integer b = tuple.getIntegerByField("b");
                
                return (a + b) % 2 == 0;
            }
        }
    }

       2.SumFunction

        需求:对一组数据中的前2个数求各

    public class SumFunctionTrident {
    
        private static final Logger LOG = LoggerFactory.getLogger(SumFunctionTrident.class);
    
        @SuppressWarnings("unchecked")
        public static void main(String[] args) throws InterruptedException {
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("a","b","c","d"), 3, 
                                    new Values(1,4,7,10),
                                    new Values(1,1,3,11),
                                    new Values(2,2,7,1), 
                                    new Values(2,5,7,2));
            spout.setCycle(false);
            Config conf = new Config();
            conf.setNumWorkers(4);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            // peek: 不做任务操作,因为参数的consumer
            // each:spout中的指定元素进行操作
            topology.newStream("function", spout).parallelismHint(1)
                    .localOrShuffle()
                    .peek(input -> LOG.info("peek1 ================{},{},{},{}",input.get(0),input.get(1),input.get(2),input.get(3)))
                    .parallelismHint(2)
                    .localOrShuffle()
                    .each(new Fields("a","b"),new SumFunction(),new Fields("sum"))
                    .parallelismHint(2)
                    .localOrShuffle()
                    .peek(input -> LOG.info("peek2 ================{},{},{},{},{}",
                                input.getIntegerByField("a"),input.getIntegerByField("b"),input.getIntegerByField("c"),input.getIntegerByField("d"),input.getIntegerByField("sum")))
                    .parallelismHint(1);
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("SumFunctionTrident", conf, topology.build());
            
            LOG.warn("==================================================");
            LOG.warn("the LocalCluster topology {} is submitted.","SumFunctionTrident");
            LOG.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("HelloTridentTopology");
            cluster.shutdown();
        }
        
        private static class SumFunction extends BaseFunction{
    
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                Integer a = tuple.getIntegerByField("a");
                Integer b = tuple.getIntegerByField("b");
                collector.emit(new Values(a+b));
            }
        }
    }

    3.MapFunction

      需求:对一组batch中的tuple进行大小写转换

    public class MapFunctionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(MapFunctionTrident.class);
            
        @SuppressWarnings("unchecked")
        public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException {
            
            boolean isRemoteMode = false;
            if(args.length > 0){
                isRemoteMode = true;
            }
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("line"),3, 
                                        new Values("hello stream"),
                                        new Values("hello kafka"),
                                        new Values("hello hadoop"),
                                        new Values("hello scala"),
                                        new Values("hello java")
                                        );
            spout.setCycle(true);
            TridentTopology topology = new TridentTopology();
            Config conf = new Config();
            conf.setNumWorkers(4);
            conf.setDebug(false);
            
            topology.newStream("hello", spout).parallelismHint(1)
                    .localOrShuffle()
                    .map(new MyMapFunction(),new Fields("upper"))
                    .parallelismHint(2)
                    .partition(Grouping.fields(ImmutableList.of("upper")))
                    .peek(input ->LOG.warn("================>> peek process value:{}",input.getStringByField("upper")))
                    .parallelismHint(3);
            if(isRemoteMode){
                StormSubmitter.submitTopology("HelloTridentTopology", conf, topology.build());
                LOG.warn("==================================================");
                LOG.warn("the remote topology {} is submitted.","HelloTridentTopology");
                LOG.warn("==================================================");
            }else{
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("HelloTridentTopology", conf, topology.build());
                
                LOG.warn("==================================================");
                LOG.warn("the LocalCluster topology {} is submitted.","HelloTridentTopology");
                LOG.warn("==================================================");
                
                TimeUnit.SECONDS.sleep(5);
                cluster.killTopology("HelloTridentTopology");
                cluster.shutdown();
            }
            
        }
        
        private static class MyMapFunction implements MapFunction{
            private static final Logger LOG = LoggerFactory.getLogger(MyMapFunction.class);
            
            @Override
            public Values execute(TridentTuple input) {
                String line = input.getStringByField("line");
                LOG.warn("================>> myMapFunction process execute:value :{}",line);
                return new Values(line.toUpperCase());
            }
        }
    }

    4.ProjectionFunctionTrident

      需求:对一组tuple的数据,取部分数据

    public class ProjectionFunctionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(ProjectionFunctionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("x","y","z"), 3, 
                    new Values(1,2,3),
                    new Values(4,5,6),
                    new Values(7,8,9), 
                    new Values(10,11,12)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("ProjectionTrident", spout).parallelismHint(1)
                    .localOrShuffle().peek(tridentTuple ->LOG.info("================ {}",tridentTuple)).parallelismHint(2)
                    .shuffle()
                    .project(new Fields("y","z")).parallelismHint(2)
                    .localOrShuffle().peek(tridentTuple ->LOG.info(">>>>>>>>>>>>>>>> {}",tridentTuple)).parallelismHint(2);
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("ProjectionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("ProjectionTrident");
            cluster.shutdown();
        }
    }

    5.2 Broadcast

      需求:将一组batch 的tuple数据发送到所有partition上

    public class BroadcastRepartitionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(BroadcastRepartitionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("haddop",3), 
                    new Values("java",4),
                    new Values("haddop",5)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("BroadcastRepartitionTrident", spout).parallelismHint(1)
                    .broadcast().peek(tridentTuple ->LOG.info("================ {}",tridentTuple))
                    .parallelismHint(2);    
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("BroadcastRepartitionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("BroadcastRepartitionTrident");
            cluster.shutdown();
        }
    }

    5.3 PartitionBy

      需求:将一组batch中的tuple通过设置的字段分到同一个task中执行

    public class PartitionByRepartitionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(PartitionByRepartitionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            //FixedBatchSpout()里面参数解释:
            //    1.spout 的字段名称的设置
            //    2.设置数据几个为一个批次
            //    3.字段值的设置
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",23),
                    new Values("scala",3),
                    new Values("haddop",10), 
                    new Values("java",23),
                    new Values("haddop",10)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("PartitionByRepartitionTrident", spout).parallelismHint(1)
                    .partitionBy(new Fields("language")).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                    .parallelismHint(3);
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("PartitionByRepartitionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("PartitionByRepartitionTrident");
            cluster.shutdown();
        }
    }

    5.4 Global

       需求:对一组batch中的tuple 进行全局分组统计

    public class GlobalRepatitionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(GlobalRepatitionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            //FixedBatchSpout()里面参数解释:
            //    1.spout 的字段名称的设置
            //    2.设置数据几个为一个批次
            //    3.字段值的设置
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",23),
                    new Values("scala",3),
                    new Values("haddop",10), 
                    new Values("java",23),
                    new Values("haddop",10)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("PartitionByRepartitionTrident", spout).parallelismHint(1)
                        .partitionBy(new Fields("language"))
                        .parallelismHint(3)    //不管配多少个并行度,都没有影响
                        .peek(tridentTuple ->LOG.info(" ================= {}",tridentTuple))
                        .global()
                        .peek(tridentTuple ->LOG.info(" >>>>>>>>>>>>>>>>> {}",tridentTuple));
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("GlobalRepatitionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("GlobalRepatitionTrident");
            cluster.shutdown();
        }
    }

      5.5 batchGlobal 

        需求:不同batch的tuple分到不同的task中

    public class BatchGlobalRepatitionTrident2 {
        
        private static final Logger LOG = LoggerFactory.getLogger(BatchGlobalRepatitionTrident2.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("BatchGlobalRepatitionTrident2", spout).parallelismHint(1)
                    .batchGlobal().peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                    .parallelismHint(3);    
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("BatchGlobalRepatitionTrident2", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("BatchGlobalRepatitionTrident2");
            cluster.shutdown();
        }
    }

      5.6 partition 

        需求:自定义partition 

    public class CustomRepartitionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(CustomRepartitionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("haddop",3), 
                    new Values("java",4),
                    new Values("haddop",5)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("CustomRepartitionTrident", spout).parallelismHint(1)
                    .partition(new HighTaskIDGrouping()).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                    .parallelismHint(2);    
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("CustomRepartitionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("CustomRepartitionTrident");
            cluster.shutdown();
        }
    }
    
    /**
     * 自定义grouping : 
     *         让task编号更大的执行任务
     * @author pengbo.zhao
     *
     */
    public class HighTaskIDGrouping implements CustomStreamGrouping{
        
        private int taskID;
        
        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            //List<Integer> targetTasks: 下游所有的tasks的集合
            ArrayList<Integer> tasks = new ArrayList<>(targetTasks);
            Collections.sort(tasks);        //从小到大排列
            this.taskID = tasks.get(tasks.size() -1);
        }
    
        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
             
            return Arrays.asList(taskID);
        }
    }

      6.1 partitionAggregate

        需求:对一组batch中tuple个数的统计

    public class PartitionAggregateTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(PartitionAggregateTrident.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testPartitionAggregtor(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("PartitionAggregateTrident", spout).parallelismHint(2)//内部的优先级参数是1,所以我们写2是无效的
                    .shuffle()
                    .partitionAggregate(new Fields("name","age"), new Count(),new Fields("count"))
                    .parallelismHint(2)
    //                .each(new Fields("count"),new Debug());
                    .peek(input ->LOG.info(" >>>>>>>>>>>>>>>>> {}",input.getLongByField("count")));
            
            this.submitTopology("PartitionAggregateTrident", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
    }

      6.2 aggregator

        需求:对tuple中的数据进行统计 

    public class AggregateTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(AggregateTrident.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testPartitionAggregtor(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("AggregateTrident", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .aggregate(new Fields("name","age"), new Count(),new Fields("count"))
    //                .aggregate(new Fields("name","age"), new CountAsAggregator(),new Fields("count"))
                    .parallelismHint(2)
                    .each(new Fields("count"),new Debug())
                    .peek(input -> LOG.info("============> count:{}",input.getLongByField("count")));
            
            this.submitTopology("AggregateTrident", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
    }

      6.3 reduceAggregator

         需求:对一批batch 中的tuple第0个元素求和。 即一批batch中的多少条tuple,对tuple中的指定字段求和

    public class ReduceAggregatorTrident  {
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testReduceAggregator(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("ReduceAggregator", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .aggregate(new Fields("age","name"), new MyReduce(),new Fields("sum"))
                    .parallelismHint(5)
                    .each(new Fields("sum"),new Debug()); 
            
            this.submitTopology("ReduceAggregator", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
        
        static class MyReduce implements ReducerAggregator<Integer>{
            
            @Override
            public Integer init() {
                return 0;    //初始值为0
            }
    
            @Override
            public Integer reduce(Integer curr, TridentTuple tuple) {
                
                return curr + tuple.getInteger(0);
            }
        }
    }

      6.4 combinerAggregate

        需求:对tuple中的字段进行求和操作

    public class CombinerAggregate {
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testCombinerAggregate(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("CombinerAggregate", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .aggregate(new Fields("age"), new MyCount(),new Fields("count"))
                    .parallelismHint(5)
                    .each(new Fields("count"),new Debug());
            this.submitTopology("CombinerAggregate", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
        
        static class MyCount implements CombinerAggregator<Integer>{
    
            @Override
            public Integer init(TridentTuple tuple) {
                return tuple.getInteger(0);
            }
    
            @Override
            public Integer combine(Integer val1, Integer val2) {
                return val1 + val2;
            }
    
            @Override
            public Integer zero() {
                return 0;
            }
        }
    }

      6.5  persistenceAggregator

         需求:对一批batch中tuple元素进行统计

    public class PersistenceAggregator {
        
        private static final Logger LOG = LoggerFactory.getLogger(PersistenceAggregator.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testPersistenceAggregator(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("testPersistenceAggregator", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .persistentAggregate(new MemoryMapState.Factory(), new Fields("name"), new Count(),new Fields("count"))
                    .parallelismHint(4)
                    .newValuesStream()
                    .peek(input ->LOG.info("count:{}",input.getLongByField("count")));
            this.submitTopology("testPersistenceAggregator", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
        
    }

      6.6  AggregateChina

        需求:对batch中的tuple进行统计、求和、统计操作

    public class AggregateChina {
        private static final Logger LOG = LoggerFactory.getLogger(AggregateChina.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"),3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testAggregateChina(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("AggregateChina", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .chainedAgg()
                    .aggregate(new Fields("name"),new Count(), new Fields("count"))
                    .aggregate(new Fields("age"),new Sum(), new Fields("sum"))
                    .aggregate(new Fields("age"),new Count(), new Fields("count2"))
                    .chainEnd()
                    .peek(tuple->LOG.info("{}",tuple));
            this.submitTopology("AggregateChina", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
    }

      7.GroupBy

        需求:对一批batch中的tuple按name来分组,求对分组后的tuple中的数据进行统计

    public class GroupBy {
        
        private static final Logger LOG = LoggerFactory.getLogger(GroupBy.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testGroupBy(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("GroupBy", spout).parallelismHint(1)
    //                .partitionBy(new Fields("name"))
                    .groupBy(new Fields("name"))
                    .aggregate(new Count(), new Fields("count"))
                    .peek(tuple -> LOG.info("{},{}",tuple.getFields(),tuple));
                    
            this.submitTopology("GroupBy", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
    }

      

  • 相关阅读:
    ios 数据类型转换 UIImage转换为NSData NSData转换为NSString
    iOS UI 12 block传值
    iOS UI 11 单例
    iOS UI 08 uitableview 自定义cell
    iOS UI 07 uitableviewi3
    iOS UI 07 uitableviewi2
    iOS UI 07 uitableview
    iOS UI 05 传值
    iOS UI 04 轨道和动画
    iOS UI 03 事件和手势
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11078570.html
Copyright © 2011-2022 走看看