zoukankan      html  css  js  c++  java
  • Flink Java Demo(Windows)

    关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽。本文主要记录一下Java使用Flink的简单例子。

    首先,去官网下载Flink的zip包(链接就不提供了,你已经是个成熟的程序员了,该有一定的搜索能力了),解压后放到你想放的地方。

    进入主目录后,是这样子的


     
    image.png

    你可以简单的看下其目录结构,然后就回到你喜欢的IDE创建一个工程吧。

    使用IDEA创建一个maven项目,然后加入相应的依赖即可。也可以按照Flink官网的方式去创建一个maven工程,然后导入你喜欢的IDE。下面是官网的quickstart里的maven依赖。

        <dependencies>
            <!-- Apache Flink dependencies -->
            <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- Add connector dependencies here. They must be in the default scope (compile). -->
    
            <!-- Example:
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            -->
    
            <!-- Add logging framework, to produce console output when running in the IDE. -->
            <!-- These dependencies are excluded from the application JAR by default. -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
                <scope>runtime</scope>
            </dependency>
        </dependencies>
    

    创建工程后我们就可以写代码了,以下的例子和官网上的差不多,直接上代码

    package org.myorg.quickstart;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    /**
     * Skeleton for a Flink Streaming Job.
     *
     * <p>For a tutorial how to write a Flink streaming application, check the
     * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
     *
     * <p>To package your appliation into a JAR file for execution, run
     * 'mvn clean package' on the command line.
     *
     * <p>If you change the name of the main class (with the public static void main(String[] args))
     * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
     */
    public class StreamingJob {
    
        public static void main(String[] args) throws Exception {
            // set up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            /*
             * Here, you can start creating your execution plan for Flink.
             *
             * Start with getting some data from the environment, like
             *  env.readTextFile(textPath);
             *
             * then, transform the resulting DataStream<String> using operations
             * like
             *  .filter()
             *  .flatMap()
             *  .join()
             *  .coGroup()
             *
             * and many more.
             * Have a look at the programming guide for the Java API:
             *
             * http://flink.apache.org/docs/latest/apis/streaming/index.html
             *
             */
    
            DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
            DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] tokens = s.toLowerCase().split("\W+");
    
                    for (String token : tokens) {
                        if (token.length() > 0) {
                            collector.collect(new Tuple2<String, Integer>(token, 1));
                        }
                    }
                }
            }).keyBy(0).timeWindow(Time.seconds(5)).sum(1);
    
            dataStream.print();
            // execute program
            env.execute("Java WordCount from SocketTextStream Example");
        }
    }
    
    

    大家都是文化人,注释已经很详尽了,就不翻译了,唯一需要注意的是,IDEA好像不支持它的lambda表达式,所以我这里没有直接变lambda。

    接下来就是激动人心的运行环节,Windows需要安装个瑞士军刀来支持nc命令(直接在官网下个zip包,解压,配置到环境变量即可)。在命令行中执行 nc -l -p 9000,然后运行上边那个程序(如果先运行程序会因为连接不到socket报错)

     
    image.png

    随便输入,然后在IDEA的console中可以看到如下的结果。


     
    image.png

    以上因为没启动Flink服务,所以不需要像其他博主那样,去localhost:8081的webUI中进行监控
    ,StreamExecutionEnvironment.getExecutionEnvironment()会创建一个LocalEnvironment然后在Java虚拟机上执行。

    Windows单机模式下启动Flink相当简单,进入到bin目录,直接双击start-cluster.bat,会启动Flink的JobManager和TaskManager两个服务。如果想将上述程序提交到Flink,需要执行maven命令打成jar包,然后在命令行中,进入到bin目录下执行 flink.bat run xxxx/xxx/xxx.jar 即可,输出结果会在TaskManager的服务窗口中输出。



    作者:瓜尔佳_半阙
    链接:https://www.jianshu.com/p/f8b66afd32bf
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    【python-plt】二元正态密度函数图像
    【python-plt】一元正态分布图像
    【7】极大似然估计与贝叶斯估计
    实变函数【1】集合
    图形学-心得
    分布式网络架构
    shader的内置变量
    图形学-绘制
    python加载图片
    linux下批量删除utf8 bom
  • 原文地址:https://www.cnblogs.com/duwamish/p/10376605.html
Copyright © 2011-2022 走看看