zoukankan      html  css  js  c++  java
  • Trident学习笔记(一)

    1. Trident入门

    Trident

    -------------------

     三叉戟

     storm高级抽象,支持有状态流处理;

     好处是确保消费被处理一次;

     以小批次方式处理输入流,得到精准一次性处理 ;

     不再使用bolt,使用functions、aggreates、filters以及states。

     Trident Tuple: trident top的数据模型,trident处理数据的单元;

            每个tuple有预定义的字段列表构成,字段类型可以是byte;

            character,integer,long,float,double,Boolean or byte array。

     Trident functions: 包含修改tuple的业务逻辑,输入的是tuple的字段,输出多个tuple。

    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    /**
     * 求和函数
     */
    public class SumFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            Integer num1 = input.getInteger(0);
            Integer num2 = input.getInteger(1);
            int sum = num1 + num2;
            collector.emit(new Values(sum));
        }
    
    }

    如果tuple有a, b, c, d四个field,只有a和b作为输入传给function,functions会生成新的sum字段,

    sum字段和输入的元祖进行合并,生成一个完成tuple,因此,新的tuple的总和字段个数是a, b, c, d, sum。

    Trident Filter

    --------------------

      1. 描述

      获取字段集合作为输入,输出boolean,如果反悔true,tuple在流中保留,否则删除,

      a, b, c, d, sum是元祖的字段,sum作为输入传递给filter,判断sum是否为偶数,

      如果是偶数,tuple(a, b, c, d, sum)保留,否则tuple删除。

      2. 代码

    import org.apache.storm.trident.operation.BaseFilter;
    import org.apache.storm.trident.tuple.TridentTuple;
    
    /**
     * 校验是否是偶数的过滤器
     */
    public class CheckEvenFilter extends BaseFilter {
    
        @Override
        public boolean isKeep(TridentTuple input) {
            Integer sum = input.getInteger(0);
            if (sum % 2 == 0) {
                return true;
            }
            return false;
        }
    
    }

    Trident projections

    --------------------

      1. 描述

       投影操作中,trident值保留在投影中制定的字段,

       x, y, z --> projection(x) --> x

      2. 调用投影的方式

       mystream.project(new fields("x"));

     

    写一个topology

    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    
    public class PrintFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            Integer sum = input.getInteger(0);
            System.out.println(this.getCLass.getSimpleName + ": " + sum);
        }
        
    }
    import com.google.common.collect.ImmutableList;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.testing.FeederBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class TridentTopologyApp {
    
        public static void main(String[] args) {
            // 创建topology
            TridentTopology topology = new TridentTopology();
    
            // 创建spout
            FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b", "c", "d"));
    
            // 创建流
            Stream stream = topology.newStream("spout", testSpout);
            stream.shuffle().each(new Fields("a", "b"), new SumFunction(), new Fields("sum")).parallelismHint(1)
                    .shuffle().each(new Fields("sum"), new CheckEvenFilter()).parallelismHint(1)
                    .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("xxx")).parallelismHint(1);
    
            // 本地提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TridentDemo", new Config(), topology.build());
    
            // 测试数据
            testSpout.feed(ImmutableList.of(new Values(1, 2, 3, 4)));
            testSpout.feed(ImmutableList.of(new Values(2, 3, 4, 5)));
            testSpout.feed(ImmutableList.of(new Values(3, 4, 5, 6)));
            testSpout.feed(ImmutableList.of(new Values(4, 5, 6, 7)));
        }
    
    }

    输出结果

    SumFunction:1, 2
    CheckEvenFilter:3
    PrintFunction: 3
    SumFunction:2, 3
    CheckEvenFilter:5
    PrintFunction: 5
    SumFunction:3, 4
    CheckEvenFilter:7
    PrintFunction: 7
    SumFunction:4, 5
    CheckEvenFilter:9
    PrintFunction: 9

    加入一个求平均数的函数

    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    
    /**
     * 求平均值方法
     */
    public class AverageFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            int a = input.getIntegerByField("a");
            int b = input.getIntegerByField("b");
            int c = input.getIntegerByField("c");
            int d = input.getIntegerByField("d");
            int sum = input.getIntegerByField("sum");
            float avg = (float) ((a+b+c+d+sum) / 5.0);
            System.out.println(this.getClass().getSimpleName() + ": avg = " + avg);
        }
    
    }
    import com.google.common.collect.ImmutableList;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.testing.FeederBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class TridentTopologyApp {
    
        public static void main(String[] args) {
            // 创建topology
            TridentTopology topology = new TridentTopology();
    
            // 创建spout
            FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b", "c", "d"));
    
            // 创建流
            Stream stream = topology.newStream("spout", testSpout);
            stream.shuffle().each(new Fields("a", "b"), new SumFunction(), new Fields("sum")).parallelismHint(1)
                    .shuffle().each(new Fields("sum"), new CheckEvenFilter()).parallelismHint(1)
                    .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("res")).parallelismHint(1)
                    .shuffle().each(new Fields("a", "b", "c", "d", "sum"), new AverageFunction(), new Fields("avg")).parallelismHint(1);
    
            // 本地提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TridentDemo", new Config(), topology.build());
    
            // 测试数据
            testSpout.feed(ImmutableList.of(new Values(1, 2, 3, 4)));
            testSpout.feed(ImmutableList.of(new Values(2, 3, 4, 5)));
            testSpout.feed(ImmutableList.of(new Values(3, 4, 5, 6)));
            testSpout.feed(ImmutableList.of(new Values(4, 5, 6, 7)));
        }
    
    }

     

    2. Trident聚合函数

     分区聚合

    import com.google.common.collect.ImmutableList;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.testing.FeederBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class TridentTopologyApp2 {
    
        public static void main(String[] args) {
            // 创建topology
            TridentTopology topology = new TridentTopology();
    
            // 创建spout
            FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b"));
    
            // 创建流
            Stream stream = topology.newStream("testSpout", testSpout);
            stream.shuffle().each(new Fields("a", "b"), new MyFilter1()).parallelismHint(1)
                    .global().each(new Fields("a", "b"), new MyFilter2()).parallelismHint(1)
                    .partitionBy(new Fields("a"))
                    //.each(new Fields("a", "b"), new MyFunction1(), new Fields("none")).parallelismHint(1)
                    .partitionAggregate(new Fields("a"), new MyCount(), new Fields("count"))
                    .each(new Fields("count"), new MyPrintFunction1(), new Fields("xxx")).parallelismHint(1);
    
            // 本地提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TridentDemo2", new Config(), topology.build());
    
            // 测试数据
            testSpout.feed(ImmutableList.of(new Values(1, 2)));
            testSpout.feed(ImmutableList.of(new Values(2, 3)));
            testSpout.feed(ImmutableList.of(new Values(2, 4)));
            testSpout.feed(ImmutableList.of(new Values(3, 5)));
        }
    
    }

    批次聚合

     

    3. 自定义聚合函数-Sum-SumAsAggregator

  • 相关阅读:
    switch语句
    switch语句
    if语句三种格式
    dowhile语句
    if语句三种格式
    if语句配对
    ansible
    linux系统中网站服务程序(web服务/httpd服务)
    ansible
    ansible
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9599009.html
Copyright © 2011-2022 走看看