zoukankan      html  css  js  c++  java
  • DataStream API介绍和示例

    DataStream API介绍和示例

    Flink程序运行流程

    1. 获取执行环境

    getExecutionEnvironment()
    createLocalEnvironment()
    createRemoteEnvironment(String host, int port, String... jarFiles)

    2. 加载创建初始化数据

    readTextFile()
    addSource
    ..

    3. 对数据在transformation operator

    map
    flatMap
    filter
    ..

    4. 指定计算结果的输出位置 sink

    print()
    writeAdText(String path)
    addSink
    ..

    5. 触发程序执行 execute

    env.execute()
    在sink是print时,不需要显示execute,否则会报错。因为在print方法里已经默认调用了execute。

    StreamExecutionEnvironment

    StreamExecutionEnvironment 作为程序入口context,有两类:LocalStreamEnvironment(本地环境) 和RemoteStreamEnvironment(远程环境)。
    ExecutionConfig、CheckpointConfig等配置均在这里初始化。另外,这里也能设置线程数,检查点周期,以及检查点模式。还有状态后端序列化类型以及注册Type等。

    • 如果集群是standalone模式,则StreamExecutionEnvironment.getExecutionEnvironment() 相当于StreamExecutionEnvironment.createLocalEnvironment()

    DataStream Source

    基于文件的
    • readTextFile(String path) charsetName 默认用 UTF-8
    • readTextFile(String path, String charsetName):文本文件,格式为 TextInputFormat,返回 BasicTypeInfo.STRING_TYPE_INFO ,TextInputFormat对象调用 setCharsetName(charsetName) 设置字符 ,然后底层再调用 readFile 方法。
    • readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation typeInformation):根据给定格式和路径读取文件,根据watchType(FileProcessingMode.PROCESS_ONCE:处理一次路径文件后退出,FileProcessingMode.PROCESS_CONTINUOUSLY:检测给定路径的新数据,此时若旧文件发生修改也会重读,不符合exactly-once),interval 扫描路径的周期。调用 createFileInput ,createFileInput 调用 addSource 。
    Socket流
    • socketTextStream(hostname , port) // 主机,端口号,字段分隔符 delimiter 默认为
    • socketTextStream(hostname , port , delimiter) // maxRetry 默认为零
    • socketTextStream(hostname, port, delimiter, maxRetry )
      maxRetry: 当socket端挂掉是,程序等待的最大重试时间。每秒都会重试连接,为0即停止程序...。利用 SocketTextStreamFunction 生成 sourceFunction对象,调用 addSource 生成DataStreamSource
    基于数据集的

    都是通过本身的SourceFunction对象调用addSource

    • fromCollection(Iterator, class)
    • fromCollection(Iterator, TypeInformation)
    • fromElements(T...)
    • fromParallelCollection
    Customer Source 自定义source
    • addSource(sourceFunction ) // sourceName 为默认值
    • addSource(sourceFunction , sourceName) // typeInfo 为 null
    • addSource(sourceFunction , typeInformation) // SourceName 有默认值
    • addSource(sourceFunction , sourceName , typeInformation)

    自定义source代码示例

    package source;
    
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    /**
     *
     * RichSourceFunction 实现了SourceFunction接口,只做了序列化
     * 实现接口SourceFunction或者继承 RichSourceFunction 需要申明返回的数据类型,不然会报错:
     * Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
     *      The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred.
     *      Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
     */
    
    public class MyDataSource extends RichSourceFunction<Integer> {
    
        private static final Logger LOG = LoggerFactory.getLogger(MyDataSource.class);
    
        private volatile boolean isRunning = true;
    
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception{
            while (isRunning){
                Thread.sleep(300);
                int rnd = (int) (Math.random() * 10);
    
                LOG.info("emit data:"+rnd);
                ctx.collect( rnd );
            }
        }
    
        @Override
        public void cancel() {
    
            isRunning = false;
        }
    }
    

    DataStream Transformations

    Map [DataStream -> DataStream]

    对数据集内每条数据都进行相同的规则处理,常用来做清洗和转换数据格式等

    DataStream<Tuple2<String, Integer>> windowCount
            .map(new MapFunction<Tuple2<String,Integer>, String>() {
                 @Override
                public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    return stringIntegerTuple2.f0;
                }
            })
    
    FlatMap [DataStream -> DataStream]

    将数据集进行打平,即按照逻辑合并在一个或多个数据集里面

    DataStream<Tuple2<String, Integer>> windowCount = text
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    for (String word : value.split("\s")) {
                        collector.collect(Tuple2.of(word, 1));
                    }
                }
            })
    
    Filter [DataStream -> DataStream]

    过滤数据,符合要求的数据返回 true,不符合要求的返回 false

    text
            .filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String s) throws Exception {
                    return s.contains("h");
                }
            })
    
    KeyBy [DataStream -> KeyedStream]

    在数据集中进行 Partition操作,将相同的key值的数据放到相同的分区中,返回 keyedStream。
    指定key时可以按位置指定,也可以按名称指定(此时需要pojo类、case class等明确了字段位置的)
    注意以下类型不能成为key:

    • POJO类型但是不覆盖 hashCode() 方法并依赖于Object.hashCode() 实现
    • 任何类型的数组
    Reduce [KeyedStream -> DataStream]

    定义聚合逻辑,对数据进行聚合处理,其聚合逻辑要求满足运算结合律和交换律。当前元素的值和最后一个Reduce的值进行组合并返回出新的Reduce的值。

            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
                    return Tuple2.of(stringIntegerTuple2.f0, stringIntegerTuple2.f1+t1.f1);
                }
            })
    
    Aggregate [keyedStream -> DataStream]

    聚合算子,将Reduce算子中的函数进行了封装。封装的操作包括 sum、min、minBy、max、maxBy等

    Fold [keyedStream -> DataStream]

    将数据进行滚动折叠,可指定开始值。未来将取消,全部用Aggregate替代

    Union

    合并操作,要求两个 DataStream 数据格式一样

    Connect

    不要求格式一样,类似拼接格式操作,返回 ConnectedStreams。
    比如 (String,Int) connect (Int) 结果: ((String, Int), Int)
    ConnectedStreams不能直接print,需要使用CoMapFunction 或CoFlatMapFunction分别处理DataStrea,处理后返回的数据类型必须保持一致。

    Split [DataStream -> SplitStream]

    Union算子的逆向实现

    Select [SplitStream -> DataStream]

    Select是splitStream的方法,split 只是进行标记,并未进行切分。select切分数据集。

        SplitStream<String> split = text.split(new OutputSelector<String>() {
        // 切分数据的时候给每部分数据打上标记
            @Override
            public Iterable<String> select(String value) {
                ArrayList<String> strings = new ArrayList<>();
                if (value.contains("h"))
                    strings.add("hadoop");
                else
                    strings.add("noHadoop");
                return strings;
            }
        });
    
        // 打印有 hadoop 标签的数据
        split.select("hadoop").print();
        // 打印有 noHadoop 标签的数据
        split.select("noHadoop")
            .map(new MapFunction<String, String>() {
                @Override
                public String map(String s) throws Exception {
                    return s.toUpperCase();
                }
            })
            .print();
    
    Partition类 transformation
    • shuffle: 随机分配,分区相对均衡,容易失去原有数据分区结构
    • rebalance: 尽可能保证每个分区的数据平衡,多用于数据倾斜
    • rescale: 待定
  • 相关阅读:
    Android入门第六篇之ListView
    谷歌Volley网络框架讲解——第一篇
    Android网络通信库Volley简介
    Android网络通信框架Volley的学习笔记
    SharedPreferences介绍
    Android之Adapter用法总结
    ANDROID SQLITEOPENHELPER详解
    RxJava、RxBus学习
    【原创】【Andriod】自定义多行多列视图
    [转]android笔记--Intent和IntentFilter详解
  • 原文地址:https://www.cnblogs.com/mlxx9527/p/11217731.html
Copyright © 2011-2022 走看看