zoukankan      html  css  js  c++  java
  • 【Flink】DataStream API 教程

    设置一个maven项目

    使用maven 创建一个flink项目,使用下面命令:

    $ mvn archetype:generate 
        -DarchetypeGroupId=org.apache.flink 
        -DarchetypeArtifactId=flink-quickstart-java 
        -DarchetypeVersion=1.9.0 
        -DgroupId=wiki-edits 
        -DartifactId=wiki-edits 
        -Dversion=0.1 
        -Dpackage=wikiedits 
        -DinteractiveMode=false
    

    可以根据需要编辑groupid artifactId 和 package,目录项目结构如下:

    $ tree wiki-edits
    wiki-edits/
    ├── pom.xml
    └── src
        └── main
            ├── java
            │   └── wikiedits
            │       ├── BatchJob.java
            │       └── StreamingJob.java
            └── resources
                └── log4j.propertie
    

    这是项目已经创建了一些样例代码,我们可以直接删除这些样例代码结构在src/main/java

    $ rm wiki-edits/src/main/java/wikiedits/*.java
    

    最后我们需要添加一些我们程序需要的依赖,在pom.xml中添加:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    

    写一个flink程序

    打开IDE,添加一个文件src/main/java/wikiedits/WikipediaAnalysis.java:

    package wikiedits;
    
    public class WikipediaAnalysis {
    
        public static void main(String[] args) throws Exception {
    
        }
    }
    

    这只是一个基础的main函数,接着我们的第一步就是创建一个环境变量StreamExecutionEnvironment (如果是批处理的话就创建一个ExecutionEnvironment),这个可以用来读取外部文件资源和用来执行程序,所以我们现在main函数中添加这个方法。

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    

    接下来我们再创一个source,来接受Wikipedia 的IRC log

    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    

    这里创建了一个 WikipediaEditEvent 的datastream来帮助我们进一步处理程序。第一步我们需要指明userName为分组key。

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
        .keyBy(new KeySelector<WikipediaEditEvent, String>() {
            @Override
            public String getKey(WikipediaEditEvent event) {
                return event.getUser();
            }
        });
    

    接着我们需要指定我们想要的结果在一个窗口的输出的时间大小,和做一些聚合操作。本示例展示的是在时间窗口内每个用户增加或者删除字节的数量,一个窗口在一个流里面执行计算,在无限流的数据流中我们需要设置窗口,在示例中窗口设置为5s。

    DataStream<Tuple2<String, Long>> result = keyedEdits
        .timeWindow(Time.seconds(5))
        .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> createAccumulator() {
                return new Tuple2<>("", 0L);
            }
    
            @Override
            public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
                accumulator.f0 = value.getUser();
                accumulator.f1 += value.getByteDiff();
                return accumulator;
            }
    
            @Override
            public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
                return accumulator;
            }
    
            @Override
            public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
                return new Tuple2<>(a.f0, a.f1 + b.f1);
            }
        });
    

    最后打印结果并提交执行

    result.print();
    
    see.execute();
    

    所有的操作,例如:建立一个source,transformations,sink,都是在内部建立有向图,
    只有我们执行execute()的时候,这些操作图才会在我们本机或者集群上执行。

    完整的代码如下:

    package wikiedits;
    
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    
    public class WikipediaAnalysis {
    
      public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    
        KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
          .keyBy(new KeySelector<WikipediaEditEvent, String>() {
            @Override
            public String getKey(WikipediaEditEvent event) {
              return event.getUser();
            }
          });
    
        DataStream<Tuple2<String, Long>> result = keyedEdits
          .timeWindow(Time.seconds(5))
          .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() {
            @Override
          	public Tuple2<String, Long> createAccumulator() {
          	  return new Tuple2<>("", 0L);
          	}
    
          	@Override
          	public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) {
          	  accumulator.f0 = value.getUser();
          	  accumulator.f1 += value.getByteDiff();
              return accumulator;
          	}
    
          	@Override
          	public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
          	  return accumulator;
          	}
    
          	@Override
          	public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
          	  return new Tuple2<>(a.f0, a.f1 + b.f1);
          	}
          });
    
        result.print();
    
        see.execute();
      }
    }
    

    可以是用maven执行完成的程序,在命令行中:

    $ mvn clean package
    $ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    

    输出如下

    1> (Fenix down,114)
    6> (AnomieBOT,155)
    8> (BD2412bot,-3690)
    7> (IgnorantArmies,49)
    3> (Ckh3111,69)
    5> (Slade360,0)
    7> (Narutolovehinata5,2195)
    6> (Vuyisa2001,79)
    4> (Ms Sarah Welch,269)
    4> (KasparBot,-245)
    

    每行前面的数字代表哪个并执行器接受并执行了任务。

    练习:在集群上运行并写入kafka

    请先在本机搭建本地集群环境并且安装kafka

    我们需要添加kafka-connector 并且sink到kafka中,第一步,我们需要在pom.xml添加相关依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    

    接下来,我们需要修改我们的程序。将print sink替换成kafka sink,程序如下:

    result
        .map(new MapFunction<Tuple2<String,Long>, String>() {
            @Override
            public String map(Tuple2<String, Long> tuple) {
                return tuple.toString();
            }
        })
        .addSink(new FlinkKafkaProducer011<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
    

    添加import依赖如下:

    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.functions.MapFunction;
    

    使用maven编译出jar包:

    $ mvn clean package
    

    生成的jar文件地址在 target/wiki-edits-0.1.jar

    现在我们需要启动flink集群

    $ cd my/flink/directory
    $ bin/start-cluster.sh
    

    同时也需要创建一个kafka topic,保证我们的程序能够写入金去:

    $ cd my/kafka/directory
    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 
    --replication-factor 1 --partitions 1 --topic wiki-results
    

    现在我们可以准备在我们本地flink集群中运行jar文件

    $ cd my/flink/directory
    $ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
    

    输出的日志如下:

    03/08/2016 15:09:27 Job execution switched to status RUNNING.
    03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
    03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
    03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from CREATED to SCHEDULED
    03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from SCHEDULED to DEPLOYING
    03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from DEPLOYING to RUNNING
    03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
    

    可以登录http://localhost:8081查看任务运行的情况,我们可以看到有两个operation,出于性能考虑,window之后的操作会被折叠成一个,被称为chaining

    可以使用kafka custom命令观察kafka的数据

    bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result
    
  • 相关阅读:
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)-点击激活配置进入到运行模式直接死机或蓝屏怎么办
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)-有时候项目会无法编译,重新生成就自动卡死或者自动退出怎么办
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)-为什么无法打开官方范例的项目,打开tszip文件时提示尝试越过结尾怎么办
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)-为什么没有自动识别成标准FBD功能块
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)-如何在同一台PC上运行多个TwinCAT程序
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)-如何在初始化的时候写入参数
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)-如何在程序中添加注释
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)-如何在初始化的时候写入参数
    倍福TwinCAT(贝福Beckhoff)常见问题(FAQ)如何在TwinCAT Scope中做变量监控
    js文件中获取${pageContext.request.contextPath}
  • 原文地址:https://www.cnblogs.com/yankang/p/11915089.html
Copyright © 2011-2022 走看看