zoukankan      html  css  js  c++  java
  • flink入门

    wordCount

    POM文件需要导入的依赖:

    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.12</artifactId>
                <version>1.7.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.12</artifactId>
                <version>1.7.1</version>
            </dependency>
    

      

    离线代码:

    java版本:

    package flink;
    
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    
    public class WordExample {
        public static void main(String[] args) throws Exception {
    
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //创建构建字符串的数据集
            DataSet<String> text = env.fromElements(
                    "flink test","" +
                            "I think I hear them. Stand, ho! Who's there?");
    
            //分割字符串,按照key进行分组,统计相同的key个数
            DataSet<Tuple2<String, Integer>> wordCount = text.flatMap(new LineSplitter())
                    .groupBy(0).sum(1);
            
            wordCount.print();
        }
    }
    
    package flink;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
        @Override
        public void flatMap(String o, Collector<Tuple2<String, Integer>> collector) throws Exception {
            for (String word : o.split(" ")) {
                collector.collect(new Tuple2<String, Integer>(word,1));
            }
        }
    }
    

    scala版本:

    package flink
    
    import org.apache.flink.api.scala._
    
    object WordCountExample {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
    
        val text = env.fromElements("Who's there?",
    
          "I think I hear them. Stand, ho! Who's there?")
    
        val counts = text.flatMap(_.toLowerCase().split("\W+")filter(_.nonEmpty))
          .map((_,1)).groupBy(0).sum(1)
    
        counts.print()
      }
    }

    流式:

     java版本:

    package flink;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    public class WordCount {
        public static void main(String[] args) throws Exception {
            final int port;
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
                port = params.getInt("port");
            } catch (Exception e) {
                System.out.println("No port specified.Please run 'SocketWindowWordCount--port <port>'");
                return;
            }
            //get the execution enviroment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //get input data by connecting to the socket
            DataStream<String> text = env.socketTextStream("localhost", port, '
    ');
            //parse the data,group it.window it,and aggregeate the counts
            DataStream<WordWithCount> windowCounts = text
                    .flatMap(new FlatMapFunction<String, WordWithCount>() {
                        @Override
                        public void flatMap(String s, Collector<WordWithCount> collector) {
                            for (String word : s.split("\s")) {
                                collector.collect(new WordWithCount(word, 1L));
                            }
                        }
                    }).keyBy("word").timeWindow(Time.seconds(10), Time.seconds(5))
                    .reduce(new ReduceFunction<WordWithCount>() {
                        @Override
                        public WordWithCount reduce(WordWithCount wordWithCount, WordWithCount t1) throws Exception {
                            return new WordWithCount(wordWithCount.word, wordWithCount.count + t1.count);
                        }
                    });
    
            //print the result with a single thread,rather than in parallel
            windowCounts.print().setParallelism(1);
    
            env.execute("Socket Window WordCount");
        }
    }
    
    package flink;
    
    public class WordWithCount {
        public String word;
        public long count;
    
        public WordWithCount() {
        }
    
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
    
        @Override
        public String toString() {
            return word + ":" + count;
        }
    }
    

      scala版本

    package flink
    
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.windowing.time.Time
    
    object SokcetWindowWordCount {
    
      case class WordWithCount(word: String, count: Long)
    
      def main(args: Array[String]): Unit = {
        //the port to connect to
        val port: Int = try {
          ParameterTool.fromArgs(args).getInt("port")
        } catch {
          case e: Exception => {
            System.err.println("No port specified.Please run 'SocketWindowWordCount --port<port>'")
            return
          }
        }
        //get the execution enviroment
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        //parse input data by connecting to the socket
        val text = env.socketTextStream("localhost", port, '
    ')
    
        //parse the data.group it.window it.and aggregate the counts
    
        val windowCount = text
          .flatMap{w => w.split("\s")}
          .map{w => WordWithCount(w, 1)}
          .keyBy("word")
          .timeWindow(Time.seconds(10), Time.seconds(5))
    
          .sum("count")
    
        //print the results with a single thread ,rather than in parallel
        windowCount.print().setParallelism(1)
    
        env.execute("Socket Window WordCount")
      }
    }
    

      运行,传参:

    终端使用nc命令进行模拟发送数据到9999端口

      运行结果:

      注意事项:

        千万不要把包导错了,java就导java,scala就导scala,如果导错,程序跑不起来

  • 相关阅读:
    关于产品那些事
    关于“编程的本质”的探讨
    分享一款在线贝塞尔曲线调试器
    HTML、CSS、JS对unicode字符的不同处理
    HTTP Content-Disposition Explanation [ from MDN ]
    认证 (authentication) 和授权 (authorization) 的区别
    事件驱动引擎会取代多线程编程吗
    你所不知道的JSON
    都有哪些特殊而实用的的搜索引擎?
    巨头们的GitHub仓库整理
  • 原文地址:https://www.cnblogs.com/Gxiaobai/p/10290990.html
Copyright © 2011-2022 走看看