zoukankan      html  css  js  c++  java
  • 5: win10 使用IDEA调试编译flink 1.14

    1: 下载   

      https://dlcdn.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz

    2: JAVA, Maven, IDEA都安装好。

    3:搞一个flink的官网的程序

          创建一个maven 工程

    $ mvn archetype:generate \
        -DarchetypeGroupId=org.apache.flink \
        -DarchetypeArtifactId=flink-walkthrough-datastream-java \
        -DarchetypeVersion=1.14.2 \
        -DgroupId=frauddetection \
        -DartifactId=frauddetection \
        -Dversion=0.1 \
        -Dpackage=spendreport \
        -DinteractiveMode=false

           

    pom.xml

    <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
    
      http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>frauddetection</groupId>
        <artifactId>frauddetection</artifactId>
        <version>0.1</version>
        <packaging>jar</packaging>
    
        <name>Flink Walkthrough DataStream Java</name>
        <url>https://flink.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.14.2</flink.version>
            <target.java.version>1.8</target.java.version>
            <scala.binary.version>2.11</scala.binary.version>
            <maven.compiler.source>${target.java.version}</maven.compiler.source>
            <maven.compiler.target>${target.java.version}</maven.compiler.target>
            <log4j.version>2.16.0</log4j.version>
        </properties>
    
        <repositories>
            <repository>
                <id>apache.snapshots</id>
                <name>Apache Development Snapshot Repository</name>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
                <releases>
                    <enabled>false</enabled>
                </releases>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </repository>
        </repositories>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!-- This dependency is provided, because it should not be packaged into the JAR file. -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- Add connector dependencies here. They must be in the default scope (compile). -->
    
            <!-- Example:
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            -->
    
            <!-- Add logging framework, to produce console output when running in the IDE. -->
            <!-- These dependencies are excluded from the application JAR by default. -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
    
                <!-- Java Compiler -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>${target.java.version}</source>
                        <target>${target.java.version}</target>
                    </configuration>
                </plugin>
    
                <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
                <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.0.0</version>
                    <executions>
                        <!-- Run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>org.apache.logging.log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>spendreport.FraudDetectionJob</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
    
            <pluginManagement>
                <plugins>
    
                    <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                    <plugin>
                        <groupId>org.eclipse.m2e</groupId>
                        <artifactId>lifecycle-mapping</artifactId>
                        <version>1.0.0</version>
                        <configuration>
                            <lifecycleMappingMetadata>
                                <pluginExecutions>
                                    <pluginExecution>
                                        <pluginExecutionFilter>
                                            <groupId>org.apache.maven.plugins</groupId>
                                            <artifactId>maven-shade-plugin</artifactId>
                                            <versionRange>[3.0.0,)</versionRange>
                                            <goals>
                                                <goal>shade</goal>
                                            </goals>
                                        </pluginExecutionFilter>
                                        <action>
                                            <ignore/>
                                        </action>
                                    </pluginExecution>
                                    <pluginExecution>
                                        <pluginExecutionFilter>
                                            <groupId>org.apache.maven.plugins</groupId>
                                            <artifactId>maven-compiler-plugin</artifactId>
                                            <versionRange>[3.1,)</versionRange>
                                            <goals>
                                                <goal>testCompile</goal>
                                                <goal>compile</goal>
                                            </goals>
                                        </pluginExecutionFilter>
                                        <action>
                                            <ignore/>
                                        </action>
                                    </pluginExecution>
                                </pluginExecutions>
                            </lifecycleMappingMetadata>
                        </configuration>
                    </plugin>
                </plugins>
            </pluginManagement>
        </build>
    </project>
    FraudDetectionJob.java
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package spendreport;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.walkthrough.common.sink.AlertSink;
    import org.apache.flink.walkthrough.common.entity.Alert;
    import org.apache.flink.walkthrough.common.entity.Transaction;
    import org.apache.flink.walkthrough.common.source.TransactionSource;
    
    /**
     * Skeleton code for the datastream walkthrough
     */
    public class FraudDetectionJob {
        public static void main(String[] args) {
    
            StreamExecutionEnvironment env = null;
            try {
                env = StreamExecutionEnvironment.getExecutionEnvironment();
                DataStream<Transaction> transactions = env
                        .addSource(new TransactionSource())
                        .name("transactions");
    
                DataStream<Alert> alerts = transactions
                        .keyBy(Transaction::getAccountId)
                        .process(new FraudDetector())
                        .name("fraud-detector");
    
                alerts
                        .addSink(new AlertSink())
                        .name("send-alerts");
    
                env.execute("Fraud Detection");
            } catch (java.lang.Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    FraudDetector.java
    package spendreport;
    
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.walkthrough.common.entity.Alert;
    import org.apache.flink.walkthrough.common.entity.Transaction;
    
    public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
    
        private static final long serialVersionUID = 1L;
    
        private static final double SMALL_AMOUNT = 1.00;
        private static final double LARGE_AMOUNT = 500.00;
        private static final long ONE_MINUTE = 60 * 1000;
    
        private transient ValueState<Boolean> flagState;
        private transient ValueState<Long> timerState;
    
        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                    "flag",
                    Types.BOOLEAN);
            flagState = getRuntimeContext().getState(flagDescriptor);
    
            ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                    "timer-state",
                    Types.LONG);
            timerState = getRuntimeContext().getState(timerDescriptor);
        }
    
        @Override
        public void processElement(
                Transaction transaction,
                Context context,
                Collector<Alert> collector) throws Exception {
    
            // Get the current state for the current key
            Boolean lastTransactionWasSmall = flagState.value();
    
            // Check if the flag is set
            if (lastTransactionWasSmall != null) {
                if (transaction.getAmount() > LARGE_AMOUNT) {
                    //Output an alert downstream
                    Alert alert = new Alert();
                    alert.setId(transaction.getAccountId());
    
                    collector.collect(alert);
                }
                // Clean up our state
                cleanUp(context);
            }
    
            if (transaction.getAmount() < SMALL_AMOUNT) {
                // set the flag to true
                flagState.update(true);
    
                long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
                context.timerService().registerProcessingTimeTimer(timer);
    
                timerState.update(timer);
            }
        }
    
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
            // remove flag after 1 minute
            timerState.clear();
            flagState.clear();
        }
    
        private void cleanUp(Context ctx) throws Exception {
            // delete timer
            Long timer = timerState.value();
            ctx.timerService().deleteProcessingTimeTimer(timer);
    
            // clean up all state
            timerState.clear();
            flagState.clear();
        }
    }

    4: IDEA里导入flink libarary

     

     

     导入flink的lib和op连个包。

     然后就可以Idea 运行和调试

  • 相关阅读:
    Ubuntu 16 编译装python2.7
    配置ubunto 流量使用限制 python 实现简单 http server
    vnstat 流量统计 并附带一个小 php 查看流量的页面
    ubunto 免输入密码 登录 putty ssh-keygen
    nes 红白机模拟器 第5篇 全屏显示
    arm 添加 samb 文件共享
    arm 添加 ftp server 之 bftpd
    Tga图片格式分析以及程序实现
    领导力:刘邦的管理之道
    AS3:辨析ROLL_OVER与MOUSE_OVER,ROLL_OUT与MOUSE_OUT
  • 原文地址:https://www.cnblogs.com/liufei1983/p/15708680.html
Copyright © 2011-2022 走看看