zoukankan      html  css  js  c++  java
  • storm trident 的介绍与使用

    一.trident 的介绍

      trident 的英文意思是三叉戟,在这里我的理解是因为之前我们通过之前的学习topology spout bolt 去处理数据是没有问题的,但trident 的对spout bolt 更高层次的一个抽象,其实现功能是一样的,只不过是trident做了更多的优化和封装.如果对处理的性能要求比较高,建议要采用spout bolt 来处理,反之则可以用trident

      trident 你可以这样理解,本身这个拓扑就是分散的,如果一个spout 可以有2个bolt,跟三叉戟比较像。(个人理解)

      因为trident是对storm 更高一层的抽象,它与之前学的spout bolt处理数据流的方式不一样,trident 是以batch(一组tuples)为单位进行处理的。

    二.trident API操作

      trident采用批处理的方式来处理数据,其API的操作是对数据处理的方式改成了函数。对数据处理的操作有:filter sum aggregator等

      function函数的操作都是对流中的tuple进行操作的

      下面介绍 trident 常用的API

      1.each(Fields inputFields, Filter filter
          作用:操作batch中的每一个tuple内容,一般与Filter或者Function函数配合使用。 
    
      2.peek(Consumer action
        作用:不作任务操作,传的参数是consumer,类似于System.out.println
    
      3.partitionBy(Fields fields)
        作用:将tuples中的数据按设置的字段重定向到下一处理逻辑,设置相同字段的tuple一定会被分配到同一个线程中处理。
    

     三.trident 的常用函数

      1.FilterFunction 过滤  
        作用:对一组batch 中的tuple数据过滤     实现过程是:自定义类实现BaseFilter接口,重写isKeep()方法,在each()方法中使用自定义的类即可   
    2.SumFunction 求和     作用:对流中的数据进行加减     实现过程:自定义类实现BaseFunction接口,重写execute方法,在each()方法中使用
      3
    .MapFunction (一对一函数)     作用: 对一个tuple进行自定义操作     实现过程:自定义类实现MapFunction接口,重写execute()方法,通过map()方法使用
      
    4.ProjectionFunction (投影函数)     作用:投影函数,只保留stream中指定字段的数据。     实现过程:在project()方法中定义所需字段即可     例:有一个Stream包含如下字段: ["x","y","z"],使用投影: project(new Fields("y", "z")) 则输出的流仅包含 ["y","z"]字段   5.repatition(重定向)     
        作用:重定向是指tuple通过下面哪种方式路由到下一层     shuffle:  通过随机分配算法来均衡tuple到各个分区     broadcast: 每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery     partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区     global:   所有的tuple都被发送到这个分区上,这个分区用来处理整个Stream的tuple数据的,但这个线程是独立起的     batchGlobal:一个batch中的tuple都被发送到同一个分区,不同的batch会去往不同的分区     partition: 通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping   

      6.Aggregation(聚合)     
        在storm的trident中处理数据是以批的形式进行处理的,所以在聚合时也是对批量内的数据进行的。经过aggregation的tuple,是被改变了原有的数据状态     在Aggregator接口中有3个方法需要实现       init()   : 当batch接收到数据时执行。并对tuple中的数据进行初始化       aggregate(): 在接收到batch中的每一个tuple时执行,该方法一个重定向方法,它会随机启动一个单独的线程来进行聚合操作       complete() : 在一个batch的结束时执行

        6.1 partitionAggregator

          它是对当前partition上的各个batch执行聚合操作,它不是一个重定向操作,即统计batch上的tuple的操作
       6.2 aggregator

          对一批batch中的tuple数据进行聚合
         
       
    6.3 ReduceAggregator
          对一批batch中第n个元素的操作 

        6.4 CombinerAggregate

          对一批batch中的tuple进行聚合操作,它是一个重定向操作

        6.5 PersistenceAggrgator

          持久化聚合器,在聚合之前先将数据存到一个位置,然后再对数据进行聚合操作

        6.6 AggregateChina

         聚合链,对一批batch 中的tuple进行多条件聚合操作

       7.GroupBy

          GroupBy会根据指定字段,把整个stream切分成一个个grouped stream,如果在grouped stream上做聚合操作,那么聚合就会发生在这些grouped stream上而不是整个batch。      如果groupBy后面跟的是aggregator,则是聚合操作,如果跟的是partitionAggregate,则不是聚合操作。 

         

    四.trident常用函数示例

      1.FilterFunction

        需求:在 一组数据中,过滤出第1个值与第2个值相加的值是偶数的

    public class FilterTrident {
        private static final Logger LOG = LoggerFactory.getLogger(FilterTrident.class);
    
        @SuppressWarnings("unchecked")
        public static void main(String[] args) throws InterruptedException {
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("a","b","c","d"), 3, 
                                    new Values(1,4,7,10),
                                    new Values(1,1,3,11),
                                    new Values(2,2,7,1), 
                                    new Values(2,5,7,2));
            spout.setCycle(false);
            Config conf = new Config();
            conf.setNumWorkers(4);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            // peek: 不做任务操作,因为参数的consumer
            // each:spout中的指定元素进行操作
            topology.newStream("filter", spout).parallelismHint(1)
                    .localOrShuffle()
                    .peek(input -> LOG.info("peek1 ================{},{},{},{}",input.get(0),input.get(1),input.get(2),input.get(3)))
                    .parallelismHint(2)
                    .localOrShuffle()
                    .each(new Fields("a","b"),new CheckEvenSumFilter())
                    .parallelismHint(2)
                    .localOrShuffle()
                    .peek(input -> LOG.info("peek2 +++++++++++++++++++{},{},{},{}",
                        input.getIntegerByField("a"),input.getIntegerByField("b"),
                        input.getIntegerByField("c"),input.getIntegerByField("d"))
                    ).parallelismHint(1);
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("FilterTrident", conf, topology.build());
            
            LOG.warn("==================================================");
            LOG.warn("the LocalCluster topology {} is submitted.","FilterTrident");
            LOG.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("FilterTrident");
            cluster.shutdown();
        }
        
        private static class CheckEvenSumFilter extends BaseFilter{
    
            @Override
            public boolean isKeep(TridentTuple tuple) {
                Integer a = tuple.getIntegerByField("a");
                Integer b = tuple.getIntegerByField("b");
                
                return (a + b) % 2 == 0;
            }
        }
    }

       2.SumFunction

        需求:对一组数据中的前2个数求各

    public class SumFunctionTrident {
    
        private static final Logger LOG = LoggerFactory.getLogger(SumFunctionTrident.class);
    
        @SuppressWarnings("unchecked")
        public static void main(String[] args) throws InterruptedException {
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("a","b","c","d"), 3, 
                                    new Values(1,4,7,10),
                                    new Values(1,1,3,11),
                                    new Values(2,2,7,1), 
                                    new Values(2,5,7,2));
            spout.setCycle(false);
            Config conf = new Config();
            conf.setNumWorkers(4);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            // peek: 不做任务操作,因为参数的consumer
            // each:spout中的指定元素进行操作
            topology.newStream("function", spout).parallelismHint(1)
                    .localOrShuffle()
                    .peek(input -> LOG.info("peek1 ================{},{},{},{}",input.get(0),input.get(1),input.get(2),input.get(3)))
                    .parallelismHint(2)
                    .localOrShuffle()
                    .each(new Fields("a","b"),new SumFunction(),new Fields("sum"))
                    .parallelismHint(2)
                    .localOrShuffle()
                    .peek(input -> LOG.info("peek2 ================{},{},{},{},{}",
                                input.getIntegerByField("a"),input.getIntegerByField("b"),input.getIntegerByField("c"),input.getIntegerByField("d"),input.getIntegerByField("sum")))
                    .parallelismHint(1);
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("SumFunctionTrident", conf, topology.build());
            
            LOG.warn("==================================================");
            LOG.warn("the LocalCluster topology {} is submitted.","SumFunctionTrident");
            LOG.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("HelloTridentTopology");
            cluster.shutdown();
        }
        
        private static class SumFunction extends BaseFunction{
    
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                Integer a = tuple.getIntegerByField("a");
                Integer b = tuple.getIntegerByField("b");
                collector.emit(new Values(a+b));
            }
        }
    }

    3.MapFunction

      需求:对一组batch中的tuple进行大小写转换

    public class MapFunctionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(MapFunctionTrident.class);
            
        @SuppressWarnings("unchecked")
        public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException {
            
            boolean isRemoteMode = false;
            if(args.length > 0){
                isRemoteMode = true;
            }
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("line"),3, 
                                        new Values("hello stream"),
                                        new Values("hello kafka"),
                                        new Values("hello hadoop"),
                                        new Values("hello scala"),
                                        new Values("hello java")
                                        );
            spout.setCycle(true);
            TridentTopology topology = new TridentTopology();
            Config conf = new Config();
            conf.setNumWorkers(4);
            conf.setDebug(false);
            
            topology.newStream("hello", spout).parallelismHint(1)
                    .localOrShuffle()
                    .map(new MyMapFunction(),new Fields("upper"))
                    .parallelismHint(2)
                    .partition(Grouping.fields(ImmutableList.of("upper")))
                    .peek(input ->LOG.warn("================>> peek process value:{}",input.getStringByField("upper")))
                    .parallelismHint(3);
            if(isRemoteMode){
                StormSubmitter.submitTopology("HelloTridentTopology", conf, topology.build());
                LOG.warn("==================================================");
                LOG.warn("the remote topology {} is submitted.","HelloTridentTopology");
                LOG.warn("==================================================");
            }else{
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("HelloTridentTopology", conf, topology.build());
                
                LOG.warn("==================================================");
                LOG.warn("the LocalCluster topology {} is submitted.","HelloTridentTopology");
                LOG.warn("==================================================");
                
                TimeUnit.SECONDS.sleep(5);
                cluster.killTopology("HelloTridentTopology");
                cluster.shutdown();
            }
            
        }
        
        private static class MyMapFunction implements MapFunction{
            private static final Logger LOG = LoggerFactory.getLogger(MyMapFunction.class);
            
            @Override
            public Values execute(TridentTuple input) {
                String line = input.getStringByField("line");
                LOG.warn("================>> myMapFunction process execute:value :{}",line);
                return new Values(line.toUpperCase());
            }
        }
    }

    4.ProjectionFunctionTrident

      需求:对一组tuple的数据,取部分数据

    public class ProjectionFunctionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(ProjectionFunctionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("x","y","z"), 3, 
                    new Values(1,2,3),
                    new Values(4,5,6),
                    new Values(7,8,9), 
                    new Values(10,11,12)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("ProjectionTrident", spout).parallelismHint(1)
                    .localOrShuffle().peek(tridentTuple ->LOG.info("================ {}",tridentTuple)).parallelismHint(2)
                    .shuffle()
                    .project(new Fields("y","z")).parallelismHint(2)
                    .localOrShuffle().peek(tridentTuple ->LOG.info(">>>>>>>>>>>>>>>> {}",tridentTuple)).parallelismHint(2);
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("ProjectionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("ProjectionTrident");
            cluster.shutdown();
        }
    }

    5.2 Broadcast

      需求:将一组batch 的tuple数据发送到所有partition上

    public class BroadcastRepartitionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(BroadcastRepartitionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("haddop",3), 
                    new Values("java",4),
                    new Values("haddop",5)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("BroadcastRepartitionTrident", spout).parallelismHint(1)
                    .broadcast().peek(tridentTuple ->LOG.info("================ {}",tridentTuple))
                    .parallelismHint(2);    
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("BroadcastRepartitionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("BroadcastRepartitionTrident");
            cluster.shutdown();
        }
    }

    5.3 PartitionBy

      需求:将一组batch中的tuple通过设置的字段分到同一个task中执行

    public class PartitionByRepartitionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(PartitionByRepartitionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            //FixedBatchSpout()里面参数解释:
            //    1.spout 的字段名称的设置
            //    2.设置数据几个为一个批次
            //    3.字段值的设置
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",23),
                    new Values("scala",3),
                    new Values("haddop",10), 
                    new Values("java",23),
                    new Values("haddop",10)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("PartitionByRepartitionTrident", spout).parallelismHint(1)
                    .partitionBy(new Fields("language")).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                    .parallelismHint(3);
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("PartitionByRepartitionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("PartitionByRepartitionTrident");
            cluster.shutdown();
        }
    }

    5.4 Global

       需求:对一组batch中的tuple 进行全局分组统计

    public class GlobalRepatitionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(GlobalRepatitionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            //FixedBatchSpout()里面参数解释:
            //    1.spout 的字段名称的设置
            //    2.设置数据几个为一个批次
            //    3.字段值的设置
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",23),
                    new Values("scala",3),
                    new Values("haddop",10), 
                    new Values("java",23),
                    new Values("haddop",10)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("PartitionByRepartitionTrident", spout).parallelismHint(1)
                        .partitionBy(new Fields("language"))
                        .parallelismHint(3)    //不管配多少个并行度,都没有影响
                        .peek(tridentTuple ->LOG.info(" ================= {}",tridentTuple))
                        .global()
                        .peek(tridentTuple ->LOG.info(" >>>>>>>>>>>>>>>>> {}",tridentTuple));
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("GlobalRepatitionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("GlobalRepatitionTrident");
            cluster.shutdown();
        }
    }

      5.5 batchGlobal 

        需求:不同batch的tuple分到不同的task中

    public class BatchGlobalRepatitionTrident2 {
        
        private static final Logger LOG = LoggerFactory.getLogger(BatchGlobalRepatitionTrident2.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("BatchGlobalRepatitionTrident2", spout).parallelismHint(1)
                    .batchGlobal().peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                    .parallelismHint(3);    
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("BatchGlobalRepatitionTrident2", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("BatchGlobalRepatitionTrident2");
            cluster.shutdown();
        }
    }

      5.6 partition 

        需求:自定义partition 

    public class CustomRepartitionTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(CustomRepartitionTrident.class);
        
        public static void main(String [] args) throws InterruptedException{
            
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("language","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("haddop",3), 
                    new Values("java",4),
                    new Values("haddop",5)
            );
            spout.setCycle(false);
            
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            
            TridentTopology topology = new TridentTopology();
            topology.newStream("CustomRepartitionTrident", spout).parallelismHint(1)
                    .partition(new HighTaskIDGrouping()).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
                    .parallelismHint(2);    
                    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("CustomRepartitionTrident", conf, topology.build());
            TimeUnit.SECONDS.sleep(30);
            cluster.killTopology("CustomRepartitionTrident");
            cluster.shutdown();
        }
    }
    
    /**
     * 自定义grouping : 
     *         让task编号更大的执行任务
     * @author pengbo.zhao
     *
     */
    public class HighTaskIDGrouping implements CustomStreamGrouping{
        
        private int taskID;
        
        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            //List<Integer> targetTasks: 下游所有的tasks的集合
            ArrayList<Integer> tasks = new ArrayList<>(targetTasks);
            Collections.sort(tasks);        //从小到大排列
            this.taskID = tasks.get(tasks.size() -1);
        }
    
        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
             
            return Arrays.asList(taskID);
        }
    }

      6.1 partitionAggregate

        需求:对一组batch中tuple个数的统计

    public class PartitionAggregateTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(PartitionAggregateTrident.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testPartitionAggregtor(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("PartitionAggregateTrident", spout).parallelismHint(2)//内部的优先级参数是1,所以我们写2是无效的
                    .shuffle()
                    .partitionAggregate(new Fields("name","age"), new Count(),new Fields("count"))
                    .parallelismHint(2)
    //                .each(new Fields("count"),new Debug());
                    .peek(input ->LOG.info(" >>>>>>>>>>>>>>>>> {}",input.getLongByField("count")));
            
            this.submitTopology("PartitionAggregateTrident", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
    }

      6.2 aggregator

        需求:对tuple中的数据进行统计 

    public class AggregateTrident {
        
        private static final Logger LOG = LoggerFactory.getLogger(AggregateTrident.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testPartitionAggregtor(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("AggregateTrident", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .aggregate(new Fields("name","age"), new Count(),new Fields("count"))
    //                .aggregate(new Fields("name","age"), new CountAsAggregator(),new Fields("count"))
                    .parallelismHint(2)
                    .each(new Fields("count"),new Debug())
                    .peek(input -> LOG.info("============> count:{}",input.getLongByField("count")));
            
            this.submitTopology("AggregateTrident", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
    }

      6.3 reduceAggregator

         需求:对一批batch 中的tuple第0个元素求和。 即一批batch中的多少条tuple,对tuple中的指定字段求和

    public class ReduceAggregatorTrident  {
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testReduceAggregator(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("ReduceAggregator", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .aggregate(new Fields("age","name"), new MyReduce(),new Fields("sum"))
                    .parallelismHint(5)
                    .each(new Fields("sum"),new Debug()); 
            
            this.submitTopology("ReduceAggregator", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
        
        static class MyReduce implements ReducerAggregator<Integer>{
            
            @Override
            public Integer init() {
                return 0;    //初始值为0
            }
    
            @Override
            public Integer reduce(Integer curr, TridentTuple tuple) {
                
                return curr + tuple.getInteger(0);
            }
        }
    }

      6.4 combinerAggregate

        需求:对tuple中的字段进行求和操作

    public class CombinerAggregate {
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testCombinerAggregate(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("CombinerAggregate", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .aggregate(new Fields("age"), new MyCount(),new Fields("count"))
                    .parallelismHint(5)
                    .each(new Fields("count"),new Debug());
            this.submitTopology("CombinerAggregate", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
        
        static class MyCount implements CombinerAggregator<Integer>{
    
            @Override
            public Integer init(TridentTuple tuple) {
                return tuple.getInteger(0);
            }
    
            @Override
            public Integer combine(Integer val1, Integer val2) {
                return val1 + val2;
            }
    
            @Override
            public Integer zero() {
                return 0;
            }
        }
    }

      6.5  persistenceAggregator

         需求:对一批batch中tuple元素进行统计

    public class PersistenceAggregator {
        
        private static final Logger LOG = LoggerFactory.getLogger(PersistenceAggregator.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testPersistenceAggregator(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("testPersistenceAggregator", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .persistentAggregate(new MemoryMapState.Factory(), new Fields("name"), new Count(),new Fields("count"))
                    .parallelismHint(4)
                    .newValuesStream()
                    .peek(input ->LOG.info("count:{}",input.getLongByField("count")));
            this.submitTopology("testPersistenceAggregator", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
        
    }

      6.6  AggregateChina

        需求:对batch中的tuple进行统计、求和、统计操作

    public class AggregateChina {
        private static final Logger LOG = LoggerFactory.getLogger(AggregateChina.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"),3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testAggregateChina(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("AggregateChina", spout).parallelismHint(2)
                    .partitionBy(new Fields("name"))
                    .chainedAgg()
                    .aggregate(new Fields("name"),new Count(), new Fields("count"))
                    .aggregate(new Fields("age"),new Sum(), new Fields("sum"))
                    .aggregate(new Fields("age"),new Count(), new Fields("count2"))
                    .chainEnd()
                    .peek(tuple->LOG.info("{}",tuple));
            this.submitTopology("AggregateChina", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
    }

      7.GroupBy

        需求:对一批batch中的tuple按name来分组,求对分组后的tuple中的数据进行统计

    public class GroupBy {
        
        private static final Logger LOG = LoggerFactory.getLogger(GroupBy.class);
        
        private FixedBatchSpout spout;
        
        @SuppressWarnings("unchecked")
        @Before
        public void setSpout(){
            this.spout = new FixedBatchSpout(new Fields("name","age"), 3, 
                    new Values("java",1),
                    new Values("scala",2),
                    new Values("scala",3),
                    new Values("haddop",4), 
                    new Values("java",5),
                    new Values("haddop",6)    
            );
            this.spout.setCycle(false);
        }
        
        @Test
        public void testGroupBy(){
            
            TridentTopology topoloty = new TridentTopology();
            topoloty.newStream("GroupBy", spout).parallelismHint(1)
    //                .partitionBy(new Fields("name"))
                    .groupBy(new Fields("name"))
                    .aggregate(new Count(), new Fields("count"))
                    .peek(tuple -> LOG.info("{},{}",tuple.getFields(),tuple));
                    
            this.submitTopology("GroupBy", topoloty.build());
        }
            
        public void submitTopology(String name,StormTopology topology) {
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, createConf(), topology);
            
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cluster.killTopology(name);
            cluster.shutdown();
        }
        
        public Config createConf(){
            Config conf = new Config();
            conf.setNumWorkers(3);
            conf.setDebug(false);
            return conf;
        }
    }

      

  • 相关阅读:
    跟我学习编写通用的单据编码生成器
    asp.net MVC 框架中控制器里使用Newtonsoft.Json对前端传过来的字符串进行解析
    c#语言中的Process进程类型的使用示例
    C#语言中的XmlSerializer类的XmlSerializer.Deserialize (Stream)方法举例详解
    C#语言中的XmlSerializer类的XmlSerializer.Serialize(Stream,Object)方法举例详解
    大型三甲医院管理系统源码PACS超声科室源码DICOM影像工作站
    大型EMR电子病历源码三甲医院医疗信息管理系统软件网络版
    大型三甲医院信息管理系统源码 His系统功能齐全 完整可用
    大型三甲医院医疗体检信息管理系统源码 PEIS 体检科软件 CS
    大型进销存管理系统源码 家电业 电器类进销存 asp.net C#框架
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11078570.html
Copyright © 2011-2022 走看看