zoukankan      html  css  js  c++  java
  • Trident学习笔记(二)

    aggregator

    ------------------

    聚合动作;聚合操作可以是基于batch、stream、partiton

    [聚合方式-分区聚合]

    partitionAggregate

      分区聚合;基于分区进行聚合运算;作用于分区而不是batch。

      mystream.partitionAggregate(new Fields("x"), new Count(), new Fields("count"));

    [聚合方式-batch聚合]

    aggregate

      批次聚合;先将同一batch的所有分区的tuple进行global再分区,将其汇集到一个分区中,再进行聚合运算。

      .aggregate(new Fields("a"), new Count(), new Fields("count"));   // 批次聚合

      聚合函数

        [ReducerAggregator]

          init();

          reduce();

        Aggregator

        CombinerAggregator

    import org.apache.storm.trident.operation.ReducerAggregator;
    import org.apache.storm.trident.tuple.TridentTuple;
    
    /**
     * 自定义sum聚合函数
     */
    public class SumReducerAggregator implements ReducerAggregator<Integer> {
        
        private static final long serialVersionUID = 1L;
        
        @Override
        public Integer init() {
            return 0;
        }
    
        @Override
        public Integer reduce(Integer curr, TridentTuple tuple) {
            return curr + tuple.getInteger(0) + tuple.getInteger(1);
        }
        
    }

    分区聚合

    import net.baiqu.shop.report.trident.demo01.PrintFunction;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.testing.FixedBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class TridentTopologyApp4 {
    
        public static void main(String[] args) {
            // 创建topology
            TridentTopology topology = new TridentTopology();
    
            // 创建spout
            FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("a", "b"), 4,
                    new Values(1, 2),
                    new Values(2, 3),
                    new Values(3, 4),
                    new Values(4, 5));
    
            // 创建流
            Stream stream = topology.newStream("testSpout", testSpout);
            stream.partitionAggregate(new Fields("a", "b"), new SumReducerAggregator(), new Fields("sum"))
                    .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("result"));
    
            // 本地提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TridentDemo4", new Config(), topology.build());
        }
    
    }

    [ReducerAggregator]

      init();

      reduce();

      public interface ReducerAggregator<T> extends Serializable {

        T init();

        T reduce(T curr, TridentTuple tuple);

      }

    [Aggregator]

      描述同ReducerAggregator.

      public interface Aggregator<T> extends Operation {

        // 开始聚合之间调用,主要用于保存状态。共下面的两个方法使用   

        T init(Object batchId, TridentCollector collector);

        // 迭代batch的每个tuple, 处理每个tuple后更新state的状态。

        void aggreate(T val, TridentTuple tuple, TridentCollector collector);

        // 所有tuple处理完成后调用,返回单个tuple给每个batch。

        void complete(T val, TridentCollector collector);

      }

    CombinerAggregator

    import org.apache.storm.trident.operation.BaseAggregator;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    import java.io.Serializable;
    
    /**
     * 批次求和函数
     */
    public class SumAggregator extends BaseAggregator<SumAggregator.State> {
    
        private static final long serialVersionUID = 1L;
    
        static class State implements Serializable {
            private static final long serialVersionUID = 1L;
            int sum = 0;
        }
    
        @Override
        public SumAggregator.State init(Object batchId, TridentCollector collector) {
            return new State();
        }
    
        @Override
        public void aggregate(SumAggregator.State state, TridentTuple tuple, TridentCollector collector) {
            state.sum = state.sum + tuple.getInteger(0) + tuple.getInteger(1);
        }
    
        @Override
        public void complete(SumAggregator.State state, TridentCollector collector) {
            collector.emit(new Values(state.sum));
        }
    
    }

    批次聚合

    package net.baiqu.shop.report.trident.demo04;
    
    import net.baiqu.shop.report.trident.demo01.PrintFunction;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.testing.FixedBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class TridentTopologyApp4 {
    
        public static void main(String[] args) {
            // 创建topology
            TridentTopology topology = new TridentTopology();
    
            // 创建spout
            FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("a", "b"), 4,
                    new Values(1, 2),
                    new Values(2, 3),
                    new Values(3, 4),
                    new Values(4, 5));
    
            // 创建流
            Stream stream = topology.newStream("testSpout", testSpout);
            stream.aggregate(new Fields("a", "b"), new SumAggregator(), new Fields("sum"))
                    .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("result"));
    
            // 本地提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TridentDemo4", new Config(), topology.build());
        }
    
    }

    运行结果

    PrintFunction: 24
    PrintFunction: 0
    PrintFunction: 0
    ......

    平均值批次聚合函数

    import org.apache.storm.trident.operation.BaseAggregator;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    import java.io.Serializable;
    
    /**
     * 批次求平均值函数
     */
    public class AvgAggregator extends BaseAggregator<AvgAggregator.State> {
    
        private static final long serialVersionUID = 1L;
    
        static class State implements Serializable {
            private static final long serialVersionUID = 1L;
            // 元组值的总和
            float sum = 0;
            // 元组个数
            int count = 0;
        }
    
        /**
         * 初始化状态
         */
        @Override
        public AvgAggregator.State init(Object batchId, TridentCollector collector) {
            return new State();
        }
    
        /**
         * 在state变量中维护状态
         */
        @Override
        public void aggregate(AvgAggregator.State state, TridentTuple tuple, TridentCollector collector) {
            state.count = state.count + 2;
            state.sum = state.sum + tuple.getInteger(0) + tuple.getInteger(1);
        }
    
        /**
         * 处理完成所有元组之后,返回一个具有单个值的tuple
         */
        @Override
        public void complete(AvgAggregator.State state, TridentCollector collector) {
            collector.emit(new Values(state.sum / state.count));
        }
    
    }

    批次聚合求平均值

    import net.baiqu.shop.report.trident.demo01.PrintFunction;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.testing.FixedBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class TridentTopologyApp4 {
    
        public static void main(String[] args) {
            // 创建topology
            TridentTopology topology = new TridentTopology();
    
            // 创建spout
            FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("a", "b"), 4,
                    new Values(1, 2),
                    new Values(2, 3),
                    new Values(3, 4),
                    new Values(4, 5));
    
            // 创建流
            Stream stream = topology.newStream("testSpout", testSpout);
            stream.aggregate(new Fields("a", "b"), new AvgAggregator(), new Fields("avg"))
                    .shuffle().each(new Fields("avg"), new PrintFunction(), new Fields("result"));
    
            // 本地提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TridentDemo4", new Config(), topology.build());
        }
    
    }

    [CombinerAggregator]

      在每个partition运行分区聚合,然后再进行global再分区将同一对batch的所有tuple分到一个partition中,最后再这一个partition中进行聚合运算,并产生结果进行输出。

      该种方式的网络流量占用少于前两种方式。

      public interface CombinerAggregator<T> extents Serializable {

        // 在每个tuple上运行,并接受字段值

        T init(TridentTuple tuple);

        // 合成tuple的值,并输出一个值的tuple

        T combine(T val1, T vak2);

        // 如果分区不含有tuple,调用该方法.

        T zero();

      }

    合成聚合函数

    import clojure.lang.Numbers;
    import org.apache.storm.trident.operation.CombinerAggregator;
    import org.apache.storm.trident.tuple.TridentTuple;
    
    /**
     * 合成聚合函数
     */
    public class SumCombinerAggregator implements CombinerAggregator<Number> {
    
        private static final long serialVersionUID = 1L;
        
        @Override
        public Number init(TridentTuple tuple) {
            return (Number) tuple.getValue(0);
        }
    
        @Override
        public Number combine(Number val1, Number val2) {
            return Numbers.add(val1, val2);
        }
    
        @Override
        public Number zero() {
            return 0;
        }
        
    }

    topology

    import net.baiqu.shop.report.trident.demo01.PrintFunction;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.testing.FixedBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class TridentTopologyApp4 {
    
        public static void main(String[] args) {
            // 创建topology
            TridentTopology topology = new TridentTopology();
    
            // 创建spout
            FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("a", "b"), 4,
                    new Values(1, 2),
                    new Values(2, 3),
                    new Values(3, 4),
                    new Values(4, 5));
    
            // 创建流
            Stream stream = topology.newStream("testSpout", testSpout);
            stream.aggregate(new Fields("a", "b"), new SumCombinerAggregator(), new Fields("sum"))
                    .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("result"));
    
            // 本地提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TridentDemo4", new Config(), topology.build());
        }
    
    }

    输出结果

    PrintFunction: 10
    PrintFunction: 0
    PrintFunction: 0
    ......

     

     

     

  • 相关阅读:
    减少.NET应用程序内存占用的一则实践
    ASP.NET中检测图片真实否防范病毒上传
    PHP、Python 相关正则函数实例
    利用脚本将java回归到面向函数式编程
    ASP.Net 实现伪静态方法及意义
    ExecuteNonQuery()方法
    在.net中使用split方法!
    str.split()如何用?谢谢
    输出货币型
    (DataRowView)e.Item.DataItem只有在ItemDataBound这个事件中起效
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9609534.html
Copyright © 2011-2022 走看看