zoukankan      html  css  js  c++  java
  • Flink入门

    final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
    
    /* 
    * Filter 
    */ 
    DataStream<Long> input = streamExecutionEnvironment.generateSequence(-5, 5); 
    
    input.filter(new FilterFunction<Long>() { 
    
    @Override 
    public boolean filter(Long value) throws Exception { 
    // TODO Auto-generated method stub 
    return value >= 0; 
    } 
    }).print(); 
    
    streamExecutionEnvironment.execute(); 
    

    /* 
    * Connect 
    */ 
    
    DataStream<Long> someStream = streamExecutionEnvironment.generateSequence(0, 10); 
    
    DataStream<String> otherStream = streamExecutionEnvironment.fromElements(WordCountData.WORDS); 
    
    ConnectedStreams<Long, String> connectedStreams = someStream.connect(otherStream); 
    
    DataStream<String> result = connectedStreams.flatMap(new CoFlatMapFunction<Long, String, String>() { 
    
    @Override 
    public void flatMap1(Long value, Collector<String> out) throws Exception { 
    // TODO Auto-generated method stub 
    out.collect(value.toString()); 
    } 
    
    @Override 
    public void flatMap2(String value, Collector<String> out) throws Exception { 
    // TODO Auto-generated method stub 
    Arrays.asList(value.split("\W+")).stream().forEachOrdered(str -> out.collect(str)); 
    } 
    }); 
    
    result.print(); 
    
    streamExecutionEnvironment.execute(); 
    

    /* 
    * KeyBy 
    */ 
    
    DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT); 
    
    KeyedStream<Tuple4<String, String, String, Integer>, Tuple> keyedStream = input.keyBy("f0"); 
    
    keyedStream.print(); 
    
    keyedStream.maxBy("f3").print(); 
    
    streamExecutionEnvironment.execute(); 
    
    public static final Tuple4[] TRANSCRIPT = new Tuple4[] { 
    
    Tuple4.of("class1","张三","语文",100), 
    
    Tuple4.of("class1","李四","语文",78), 
    
    Tuple4.of("class1","王五","语文",99), 
    
    Tuple4.of("class2","赵六","语文",81), 
    
    Tuple4.of("class2","钱七","语文",59), 
    
    Tuple4.of("class2","马二","语文",97) 
    
    };       
    

    /* 
    * Map 
    */ 
    DataStream<Long> input = streamExecutionEnvironment.generateSequence(0, 10); 
    
    DataStream<Long> plusOne = input.map(new MapFunction<Long, Long>() { 
    
    @Override 
    public Long map(Long value) throws Exception { 
    // TODO Auto-generated method stub 
    return value + 1; 
    } 
    }); 
    
    plusOne.print(); 
    
    streamExecutionEnvironment.execute(); 
    

    /* 
    * Fold 
    */ 
    DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT); 
    
    DataStream<String> result = input.keyBy(0).fold("Start", new FoldFunction<Tuple4<String, String, String, Integer>, String>() { 
    
    @Override 
    public String fold(String str, Tuple4<String, String, String, Integer> value) throws Exception { 
    // TODO Auto-generated method stub 
    return str + " = " + value.f1 + " "; 
    } 
    }); 
    
    result.print(); 
    
    streamExecutionEnvironment.execute(); 
    
    public static final Tuple4[] TRANSCRIPT = new Tuple4[] { 
    
    Tuple4.of("class1","张三","语文",100), 
    
    Tuple4.of("class1","李四","语文",78), 
    
    Tuple4.of("class1","王五","语文",99), 
    
    Tuple4.of("class2","赵六","语文",81), 
    
    Tuple4.of("class2","钱七","语文",59), 
    
    Tuple4.of("class2","马二","语文",97) 
    
    }; 
    
    /** 
    1> Start = 赵六 
    1> Start = 赵六 = 钱七 
    1> Start = 赵六 = 钱七 = 马二 
    
    2> Start = 张三 
    2> Start = 张三 = 李四 
    2> Start = 张三 = 李四 = 王五 
    */ 
    

    /* 
    * Reduce 
    */ 
    DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT); 
    
    KeyedStream<Tuple4<String, String, String, Integer>, Tuple> keyedStream = input.keyBy(0); 
    
    keyedStream.reduce(new ReduceFunction<Tuple4<String, String, String, Integer>>() { 
    
    @Override 
    public Tuple4<String, String, String, Integer> reduce(Tuple4<String, String, String, Integer> value1, 
    Tuple4<String, String, String, Integer> value2) throws Exception { 
    // TODO Auto-generated method stub 
    value1.f3 += value2.f3; 
    return value1; 
    } 
    }).print(); 
    
    streamExecutionEnvironment.execute(); 
    
    /** 
    2> (class1,张三,语文,100) 
    2> (class1,张三,语文,178) 
    2> (class1,张三,语文,277) 
    1> (class2,赵六,语文,81) 
    1> (class2,赵六,语文,140) 
    1> (class2,赵六,语文,237) 
    */ 
    

    /* 
    * Project 
    */ 
    DataStream<Tuple4<String, String, String, Integer>> input = streamExecutionEnvironment.fromElements(TRANSCRIPT); 
    
    DataStream<Tuple2<String, Integer>> output = input.project(1, 3); 
    
    output.print(); 
    
    streamExecutionEnvironment.execute(); 
    
    /** 
    4> (张三,100) 
    4> (钱七,59) 
    2> (王五,99) 
    3> (赵六,81) 
    1> (李四,78) 
    1> (马二,97) 
    */ 
    

    /* 
    * SplitAndSelect 
    */ 
    DataStream<Long> input = streamExecutionEnvironment.generateSequence(0, 10); 
    
    SplitStream<Long> splitStream = input.split(new OutputSelector<Long>() { 
    
    @Override 
    public Iterable<String> select(Long value) { 
    // TODO Auto-generated method stub 
    List<String> output = new ArrayList<>(); 
    if (value % 2 == 0) { 
    output.add(EVEN); 
    } else { 
    output.add(ODD); 
    } 
    return output; 
    } 
    }); 
    
    //      splitStream.print(); 
    
    DataStream<Long> even = splitStream.select(EVEN); 
    
    DataStream<Long> odd = splitStream.select(ODD); 
    
    DataStream<Long> all = splitStream.select(EVEN, ODD); 
    
    odd.print(); 
    
    streamExecutionEnvironment.execute(); 
    

    /* 
    * FlatMap 
    */ 
    DataStream<String> input = streamExecutionEnvironment.fromElements(WordCountData.WORDS); 
    
    DataStream<String> wordStream = input.flatMap(new FlatMapFunction<String, String>() { 
    
    @Override 
    public void flatMap(String value, Collector<String> out) throws Exception { 
    // TODO Auto-generated method stub 
    Arrays.asList(value.toLowerCase().split("\W+")).stream().filter(str -> str.length() > 0).forEach(str -> out.collect(str)); 
    } 
    }); 
    
    wordStream.print(); 
    
    streamExecutionEnvironment.execute();
    
  • 相关阅读:
    Mysql字符串字段判断是否包含某个字符串的方法
    linux下安装jenkins实现自动化部署
    Python:webshell 跳板机审计服务器
    Yum:更换aliyun的yum源
    Python:安装3.6
    Mysql:零散记录
    Kvm:通过 libvirt 远程管理虚拟机
    Python:socket实现ftp程序
    Python:用户自定义异常
    Kvm:启动报错:error: internal error: process exited while connecting to monitor: 2018-11-12T01:47:14.993371Z qemu-system-x86_64: cannot set up guest memory 'pc.ram': Cannot allocate memory
  • 原文地址:https://www.cnblogs.com/fangpengchengbupter/p/11887688.html
Copyright © 2011-2022 走看看