zoukankan      html  css  js  c++  java
  • Flink入门

    /* 
    * ProcessWinFunOnWindow 
    */ 
    
    final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
    
    DataStream<Tuple3<String, String, Long>> input = streamExecutionEnvironment.fromElements(ENGLISH_TRANSCRIPT); 
    
    DataStream<Double> avgEnglishScore = input.keyBy(0).countWindow(2).process(new MyProcessWindowFunction()); 
    
    avgEnglishScore.print(); 
    
    streamExecutionEnvironment.execute(); 
    
    public static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[] { 
    
    Tuple3.of("class1","张三",100L), 
    
    Tuple3.of("class1","李四",78L), 
    
    Tuple3.of("class1","王五",99L), 
    
    Tuple3.of("class2","赵六",81L), 
    
    Tuple3.of("class2","钱七",59L), 
    
    Tuple3.of("class2","马二",97L) 
    
    }; 
        
    private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, String, Long>, Double, Tuple, GlobalWindow> { 
    
    @Override 
    public void process(Tuple tuple, 
    ProcessWindowFunction<Tuple3<String, String, Long>, Double, Tuple, GlobalWindow>.Context context, 
    Iterable<Tuple3<String, String, Long>> elements, Collector<Double> out) throws Exception { 
    Long sum = 0L; 
    Long count = 0L; 
    for (Tuple3<String, String, Long> element : elements) { 
    sum += element.f2; 
    count++; 
    } 
    out.collect(sum.doubleValue() / count.doubleValue()); 
    } 
    } 
    
    // 运行结果 
    2> 89.0 
    1> 70.0 
    
    // 如果是input.keyBy(0).countWindow(3) 
    1> 79.0 
    2> 92.33333333333333 
    

    /** 
    *AggFunctionOnWindow 
    */ 
    final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
    
    DataStream<Tuple3<String, String, Long>> input = streamExecutionEnvironment.fromElements(ENGLISH_TRANSCRIPT); 
    
    DataStream<Double> avgEnglishScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggregate()); 
    
    avgEnglishScore.print(); 
    
    streamExecutionEnvironment.execute(); 
    
    
    private static class AverageAggregate implements AggregateFunction<Tuple3<String, String, Long>, Tuple2<Long, Long>, Double> { 
    
    /** 
    * 创建累加器来保存中间状态 
    */ 
    @Override 
    public Tuple2<Long, Long> createAccumulator() { 
    // TODO Auto-generated method stub 
    return new Tuple2<>(0L, 0L); 
    } 
    
    /** 
    * 来一个元素计算一下sum和count并保存中间结果到累加器 
    */ 
    @Override 
    public Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> accmulator) { 
    // TODO Auto-generated method stub 
    return new Tuple2<>(accmulator.f0 + value.f2, accmulator.f1 + 1); 
    } 
    
    
    /** 
    * 从累加器提取结果 
    */ 
    @Override 
    public Double getResult(Tuple2<Long, Long> accmulator) { 
    // TODO Auto-generated method stub 
    return accmulator.f0.doubleValue() / accmulator.f1.doubleValue(); 
    } 
    
    /** 
    * 
    */ 
    @Override 
    public Tuple2<Long, Long> merge(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) { 
    // TODO Auto-generated method stub 
    return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + value2.f1); 
    } 
    
    } 
    
    // 运行结果 
    1> 79.0 
    2> 92.33333333333333 
    
    /** 
    *ReduceFunctionOnWindowAll 
    */ 
    
    final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
    
    DataStream<Tuple3<String, String, Long>> input = streamExecutionEnvironment.fromElements(ENGLISH_TRANSCRIPT); 
    
    DataStream<Tuple3<String, String, Long>> totalEnglishScore = input.keyBy(0).countWindow(3).reduce(new ReduceFunction<Tuple3<String, String, Long>>(){ 
    
    @Override 
    public Tuple3<String, String, Long> reduce(Tuple3<String, String, Long> value1, 
    Tuple3<String, String, Long> value2) throws Exception { 
    // TODO Auto-generated method stub 
    return new Tuple3<>(value1.f0, value1.f1, value1.f2 + value2.f2); 
    } 
    }); 
    
    totalEnglishScore.map(new MapFunction<Tuple3<String, String, Long>, Tuple2<String, Long>>() { 
    
    @Override 
    public Tuple2<String, Long> map(Tuple3<String, String, Long> value) throws Exception { 
    // TODO Auto-generated method stub 
    return new Tuple2<>(value.f0, value.f2); 
    } 
    }).print(); 
    
    streamExecutionEnvironment.execute(); 
    
    // 运行结果 
    2> (class1,277) 
    1> (class2,237) 
    
  • 相关阅读:
    java中&和&&是怎么运算的
    struts中ActionForward 使用mapping.findForward如何传递get参数
    EL表达式_详解
    JSTL标签_详解
    inner join, left join, right join, full join 的区别
    CentOS7部署FastDFS+nginx模块
    一个实例明白AutoResetEvent和 ManulResetEvent的用法
    C#防止在画面上闪烁的Button
    C#中给Label控件设置BackgroundImage属性
    浅析C#异步操作
  • 原文地址:https://www.cnblogs.com/fangpengchengbupter/p/11890738.html
Copyright © 2011-2022 走看看