zoukankan      html  css  js  c++  java
  • Flink笔记

    1.Flink简介
    Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算
    应用行业:市场营销报表,电商,业务流程
         物联网,电信业,金融业
    
    Flink的主要特点:事件驱动(Event-driven)
    Flink的世界观中一切都是流组成的,离线数据是有界的流,实时数据是没有界限的流
    
    分层API
       High-level Analytics API  : SQL/Table API(dynamic tables)
       Stream- & Batch Data Porcessing  :DataStream API(streams,windows)
       Stateful Event-Driven Application : ProcessFuntion(events,state,time) 
    其他特点:
       支持事件时间(event-time)和处理时间(processing-time)语义
       精准一次(exactly-once)的状态一致性保证
       低延迟,每秒处理数百万个事件,毫秒级延迟
       与众多常用存储系统的连接
       高可用,动态扩展,实现7*24全天候运行
    Flink vs Spark Streaming
       流(Stream)和微批(micro-batching)
    
    2.快速上手  
    WorkCount案例:博客中查看
    
    DataStreamSource<String> source.flatMap()
                    .keyBy("word")//key分组统计
                    .filter()
                    .map()
                    .timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
                    .sum("count")//计算时间窗口内的词语个数
    
    
    3.Flink部署
    1.Standalone模式单机
        安装版本:Flink-1.10.1  Flink-1.10.1-bin-scala_2.12.tgz
        
        控制台提交job
        Overview
        jobs
        Task managers
        job manager
        submit new job
        
        命令行提交job
    
    2.Yarn模式
        Flink on Yarn
        Session Cluster
        Per Job Cluster
        
    3.Kubernetes部署
    
    
    
    4.Flink运行架构
        1、Flink运行时的组件
            JobManger(作业管理器):全局管理,接收提交的.jar分析流程,生成执行计划图 执行的task,分发给taskManager
                控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
                JobManager会先接收到要执行的应用程序,这个应用程序包括:作业图(JobGraph)逻辑数据流图和打包了所有的类库和其他资源的Jar包
                JobManager会把JobGraph转换成一个物理层面的数据流图,这个图叫做执行图,包含了所有可以并发执行的任务
                JobManager会向资源管理器(ResourceManager)
                请求执行任务必要的资源,也就是TaskManager上的插槽Slot,
                一旦它获取到了足够的资源,就会将执行图分发到真正运行他们的TaskManager上,
                而在运行过程中,JobManager会负责所有需要中央协调的操作,比如检查点(checkpoints)的协调
            TaskManager(任务管理器):干活的
                Flink中的工作进程,通常在Flink中会有多个TaskManager运行,每一个TaksManger都包含了一定数量的插槽(slots:cpu资源),插槽的数量限制了TaskManager能够执行的任务数量
                启动后,TaskManger会向资源管理器注册他的插槽,收到资源管理器的指令后,TaskManager就会将插槽给JobManager调用
                JobManager可以向插槽分配任务(tasks)来执行
                在执行过程中,一个taskManger可以跟其他运行同一应用程序的taskManager交换数据
            ResourceManger(资源管理器):为Job分配taks计算资源,
                管理TaskManager的插槽slot,
                为不同环境提供了不同资源管理器,如YARN/Mesos/K8s/stndalone部署
                当JobManager申请插槽资源时,
                ResourceManager会将有空闲插槽的TaskManager分配给JobManager,
                如果ResourceManager没有足够的插槽来满足JobManager的请求,
                它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器
            Dispacher(分发器):提供resultf接口,方便应用的提交
                可以跨作用运行,它为应用程序提供了Rest接口
                当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager.
                Dispatcher也会启动一个WebUI,用来方便地展示和监控作业执行的信息。
                Dispatcher在架构中可能并不是必需的,这取决于应用提交的运行方式
        
        2.任务提交流程
            app提交应用->Dispatcher启动并提交应用->JobManager请求Slots
                                 ->ResoureceManager(查看TaskManager可用slots,注册Slots)
                                     ->TaskManager(JobManager提交slots中执行的任务)
            (YARN)提交
            FlinkClient->
            1.上传flink的jar包和配置
            2.提交Job->ResourceManager(YARN)->NodeManager/JobManager去请求资源
                                ->ResourceManager(YARN)
                            ->启动NodeManager/TaskManager
                
        3.任务调度原理
            FlinkClien提交应用
            JobManager生成可执行图、发送任务、取消任务、检查点保存存盘
            ↑↓
            TaskManager给状态信息、心跳信息、统计信息
        
            怎么实现并行计算?
                利用分布式进行并且计算,
                每一个设置并行度-分配到不同的slot上多线程
    
            并行的任务,需要占用多少?
                
            一个流处理程序,到底包含多少个任务?
                代码中算子调用对应的几个任务呢,什么时候能合并,什么时候不能合并
        并行度(Parallelism)?
            代码里设置并行度
                env.setParallelism(4);//
            提交job设置并行度 -p
            集群配置中设置默认并行度
        TaskManager和Slots
            一个TaskManager都是一个JVM进程,会在独立的线程上执行一个或多个子任务
            TaskManager通过task slot(任务)来控制一个TaskManager能接收多少个task(子任务)
            默认:flink允许子任务共享slot,哪怕不是同一个任务的子任务,一个slot可以保存作业的整个管道piplin
            流程:数据->TaskManager->task slot->Source、map->keyBy、window开窗计算、apply->Sink print输出
    
            启动 nc *lk -7777socket文本流连接    ``                    
            先分组,每个组的最大并行度,各组的最大并行度叠加
            env.socketTexStream占用一个solt,flatMap占用一个,sum和pring可以共享占2个 因为代码中分组了slot如:.slotSharingGroup("red");
        并行子任务的分配
        
        程序与数据流(DataFlow)
            DataStream<String> lines = env.addSource(new FlinkKafkaConsumer<>());            Source
            
            DataStream<Event> events = lines.map((line) -> parse(line));
    
            DataStream<Statistics> stats = events.keyBy("id")                    Transformation转换运算
                                 .timeWindow()
                                 .apply(new MyWindowAggregationFunction());
    
            stats.addSink(new RollingSink(path));                            Sink    
            
            所有Flink程序由三部分组成:Source、Transformation、Sink
                Source负责读取数据源、
                Transformation利用各种算子进行处理加工
                Sink负责输出
            每一个dataFlow以一个或多个sources开始或一个或多个finks结束
            StreamGraph(代码) ->JobGraph(Client) ->ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构(jobManager生成)  ->物理执行图(task上生成)
            
        数据传输和任务链
            算子之间传输数据的形式可以是:
                one-to-one(forwarding)的模式                                
                redistributing的模式,具体是哪一种形式,取决于算子的种类
            One-to-one:Stream维护者分区以及元素的顺序(比如Source和map之间)。
            这意味着map算子的子任务看到的元素个数以及顺序跟source算子的子任务生产的元素的个数、顺序相同。
            map、fliter、flatMap等算子都是one-to-one的对应关系。
            
            Redistributing:stream的分区会发生变化。每一个算子的子任务依据所选择的tramsformation发送数据到不同的目标任务。
            例如,keyBy基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程。
            而redistribute过程就类似与Spark中的shuffle过程
            
            FORWARD(forward):one-to-one
            HASH(hash):hashCode重分区
            REBALANCE(rebalance):轮询的方式,轮询选择下一个分区
            shuffle:完全随机分机
            
        任务链(Operator Chain)
            Flink采用了一种称为任务链的优化技术,可以在特点条件下减少本地通信的开销。
            任务链的要求: 必须将两个或多个算子设为相同的并行度,并通过本地转发(loacl forward)的方式连接
            
            ”相同并行度“的one-to-one操作,Flink这样相连的算子连接在一起形成一个task
            原来的算子成为里面的subtask
            并行度相同、并且是one-to-one操作,两个条件缺一不可
            .disableChaining()不管数据传输方式,不参与任务链合并
            env.disableOperatorChaining()所有任务不合并 任务链
    
    5.Flink 流处理 DataStream API
            SingleOutputSteamOperator继承了DataStream
            StreamExecutionEnvironment.createLocalEncironment(1) 本地执行环境
            StreamExecutionEnvironment.createRomoteEnvironment("ip",6123,"YOURPATH/WordCount.jar")远程生产执行环境
            创建执行环境 封装了以上↑        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            //socket文本流
            DataStream<String> inputStream = env.socketTextStream("localhost",7777);
            
            1.从集合读取数据                     fromElements(...)//直接模拟数据
            DataStream<String> dataStreams = env.fromCollection(Array.asList(new ddInfo("测试数据","测试","测试")));
            2.从文件读取数据
            DataStream<String> dataStreams = env.readTextFile("C:\RuanJian\jeecg-boot\resource\sensor.txt");
            3.kafka中读取数据
                引入依赖:flink-conector-kafka-0.11_2.12 
                Properties properties =new Properties();
                properties.setProperty("bootstrap.servers","localhost:9092");
                DataStream<String> dataStreams = env.addSource(new FlinkKafkaConsumer011(String)("sensor",new info(),properties));
            创建kafka生产者主题:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
            4.自定义数据源:如生产随机数据
        
        Transform
            map简单算子
            flatMap简单算子
            Filter 简单算子
                  //特点 相同的key都在一个分区中
            KeyBy 分组数据流,流拆分成不同的分区-》KeyedStream
                滚动聚合算子:sum()min()max()minBy()包含最小时间戳 maxBy()
            Reduce:一个分组数据流的聚合操作,KeyedStream.Reduce
            
            //多条流转换算子
            Split和Select
                DataStream -> SplitStream:根据某些特征把一个DataStream拆分成两个或多个DataStream
                Split()分流
                Select()
                SplitStream<T> splitStream =   dataStream.split(new OutputSelector<T>(){
                    @Override
                    public Iterable<String> select(T value){
                        return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
                    }
                })
                DataStream<SensorReading> highTempStream = splitStream.select("high");
                DataStream<SensorReading> lowTempStream = splitStream.select("low");
                highTempStream.pring("high");
                highTempStream.pring("low");
                
            //多条流合流    
            Connect和CoMap CoFlatMap
                DataStream -> ConnectedDStreams 连接两个类型一至的数据流
                Connect()合流
                CoMap()
                DataStream<Tuple2<String,Double>> waringStream = highTempStream.map(new MapFunction<SensorReading,Tuple2<String,Double>>(){
                    @Override
                    public Tuple2<String,Double> map<SensorReading value> throws Exception{
                        return new Tuple2<>(value.getId(),value.getTemperature());
                    }
                });
                ConnectedStreams<Tuple2<String,Double>,SensorReading> connectedStreams = warningStream.connect(lowTempStream);
                DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String,Double>,SensorReading,Object>(){
                    @Override
                    public Object map1(Tuple2<String,Double> value) throws Exception{
                        return new Tuple3<>(value.f0,value.f1,"high temp warning");
                    }
                    
                    @Override
                    public Object map2(SensorReading value) throws Exception{
                        return new Tuple2<>(value.getId(),"normal");
                    }
                })
                env.execute();
            //union联合:联合多个DataStream
            Union
                highTempStream.union(lowTempStream,allTempStream);
        支持的数据类型
            基础数据类型
                Flink流应用程序处理的是以数据对象表示的事件流,
                所以在Flink的内部,我们需要能够处理这些对象,他们需要被序列化和反序列化以便通过网络传送他们。
                或者从状态后端、检查点和保存点读取他们,Flink需要明确知道所处理的数据类型。
                Flink使用类型信息的概念表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
                
                Flink还具有类型提取系统,该系统分析函数的输入和返回类型,
                并启动获取类型信息从而获取序列化器和反序列化器。
                DataStream<Integer/String/Double/flat...> numberStream = env.fromFlements(1,2,3,4);
                numberStream.map(data -> data * 2);
            Java和Scala元组(Tuples:Tuple/Tuple0/Tuple1...)
                DataStream<Tuple2<String,Integer>> personStream = env.fromElements(
                    new Tuple2("adam",27);
                    new Tuple2("Sarah",23);
                ); 
                personStream.filter(p -> p.f1 > 18);
            Scala样例类(case classes)
                case calss Person(name:String,age:Int)
                
                val persons:DataStream[Person] = env.fromElements(Person("Adam",27),Person("Sarah",23));
                persons.filter(p -> p.age > 18)
            Java简单对象(POJOs)
                public class Person{
                    public String name;
                    public int age;
                    public Person() {}
                    public Person(String name,int age){
                        this.name = name;
                        this.age = age;
                    }                
                }
                DataStream<Person> persons = env.fromElements(
                    new Person("Alex",21
                    new Person("Wendy",23);
                );
            其他(Arrays,Lists,Map,Enum)
                Flink对Java和Scala中的一些特性目的的类型也都支持,
                比如Java的:ArrayList,HashMap,Enum等
        
        实现UDF函数-更细粒度的控制流
            函数类(Function Class)
                Flink可以调用所有udf函数的接口(实现方式为接口或抽象类)
                如:
                MapFunction,(接口)
                FilterFunction,
                ProcessFunction(抽象类,最底层)
                
                DataStream<String> flinkTweets = tweetStreams.filter(new FlinkFilter("flink"));
                public static class FlinkFilter implements FilterFunction(String){
                    private String KeyWord;
                    
                    FlinkFilter(String KeyWord){
                        this.KeyWord = KeyWord;
                    }
                    
                    @Override
                    public boolean filter(String value) throws Exception{
                        return value.contains(this.KeyWord);
                    }
                }
                //匿名实现
                DataStream<String> flinkTweets = tweetsStream.filter(new FlinkFunction<String>(){
                    @Override
                    public boolean filter(String value) throws Exception{
                        return value.contains("flink");
                    }
                });
            
            匿名函数(Lamdba Functions)
                DataSteam<String> tweetStream = env.readTextFile(".../File.txt");
                DataStream<String> flinkStream = tweetStream.fliter(tweet -> tweet.contains("flink"));
            
            富函数-功能加强版(Rich Functions)
                与常规函数不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,可以实现更复杂的功能。
                RichMapFunction, 
                RichFlatMapFunction, 
                RichFilterFunction
                Rich Function有一个生命周期的方法有:
                    open()方法是rich function初始化方法,当算子map或filter被调用前open被调用。
                    close()方式是生命周期最后调用的方法。
                    getRuntimeContext()方法提供了函数的RuntimeContext的信息上下文,
                                       例如函数执行的并行度,任务的名字,以及state状态。
                //创建执行环境
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
                env.setParallelism(4);设置并行度
                //从文件读取数据
                DataStream<String> dataStreams = env.readTextFile("C:\RuanJian\jeecg-boot\resource\sensor.txt");
                
                //装换成SensorReading类型
                DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
                });
                DataStream<Tuple2<String,Integer>> resultlStream = dataStream.map(new MyMapper());
                resultStream.print();
                env.execute();
                
                //实现自定义普通函数类
                public static class myMapper implements MapFunction<SensorReading,Tuple2<String,Integer>>{
                    @Override
                    public Tuple2<String,Integer> map(SensorReading value) throws Exception{
                        return new Tuple2<>(value.getId,value.getId.length));
                    }
                }
        
                //实现自定义富函数类
                public static class myMapper extend RichMapFunction<SensorReading,Tuple2<String,Integer>>{
                    @Override
                    public Tuple2<String,Integer> map(SensorReading value) throws Exception{
                        return new Tuple2<>(value.getId,getRuntimeContext().getIndexOfThisSubtask());
                    }
                    
                    @Override
                    public void open(Configuration parameters) throws Exception{
                        //初始化工作,一般是定义状态,或建立数据库连接
                    }
                }
            数据重分区操作//作用:数据在任务之间传输的方式定义
                其他分区方式:dataStream.broadcast()广播分区,与Keyby()相似
                              rebalarce()均匀轮训分区,
                              rescal() 给rebalarce分组
                              global() 全部数据给第一个分区
                              partitionCustom用户自定义重分区器
        Sink
            Flink-kafka实现ETL:
            1.取kfaka某主题的数据,2.计算转换数据,3.(Sink输出)存KafKa某主题
                引入依赖:flink-conector-kafka-0.11_2.12 
                Properties properties =new Properties();
                properties.setProperty("bootstrap.servers","localhost:9092");
                DataStream<String> dataStreams = env.addSource(new FlinkKafkaConsumer011(String)("sensor",new info(),properties));
                创建kafka producer topic:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
        
                //装换成SensorReading类型
                DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
                });
            
                dataStream.addSink(new FlinkKafKaProducer011<String>("localhost:9092","sinktest",new SimpleStringScheam()));
                env.execute();
                ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest   查看kafka consumer topic
        
            Flink没有类似于Spark中的foreach方法,让用户进行迭代操作,
            对外的输出操作都用Slink完成。
            官方提供了一部分框架的sink,以外需要自定义sink
                stream.addSink(new MySink(xxx));
                
            kafka(source/sink)
            rabitmq(source/sink)
            nifi    (source/sink)
            es     (sink)
            redis(sink)->需引入依赖支持:Apache Bahir
            ......    
            
            Flink Sink Redis
                org.apache.bahir
                flink-connector-redis2.11
                
                //装换成SensorReading类型
                DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
                });
    
                
                FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                                                                .setHost("localhost")
                                                                .setPost(6379)
                                                                .build()
                
                dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));;
        
                env.execute();
    
                public static class MyRedisMapper implements RedisMapper<SensorReading>{
                    //定义保存数据到redis命令,存成Hash表 hset sensor_temp id temp
                    @Override
                    public  RedisCommandDescription getCommandDescription(){
                        return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
                    }
                    
                    @Override 
                    public String getKeyFromData(SensorReading data){
                        data.getId();
                    }
                    
                    @Override 
                    public String getValueFromData(SensorReading data){
                        data.getTemp().toString();
                    }
                    
                }
                
                
            Flink Sink ES
                flink-connector-elastiesearch6_2.12
                
                //装换成SensorReading类型
                DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
                });
    
                
                List<HttpHost> httpHosts = new ArrayList<HttpHost>();
                httpHosts.add(new HttpHost("localhost",9200));
                
                dataStream.addSink(new ElasticSearchSink.Builder<SensorReading>(httpHosts,new MyEsSinkFunction()).build());
                
                env.execute();
                
                public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{
                    
                    public void process(SensorReading element,RuntimeContext cxt,RequestIndexer indexer){
                        //定义写入数据
                         HashMap<String,String> dataSource = new HashMap<>();
                         dataSource.put("id",element.getId());
                         dataSource.put("id",element.getTemperature().toString());
                         dataSource.put("ts",element.getTimestamp().toString());
                         
                         //创建请求,作为向es发起的写入命令
                         IndexRequest indexRequest = Requests.indexRequest()
                                                            .index("sensor")
                                                            .type("readingdata")
                                                            .source(dataSource);
                         
                         //用indexer发送请求
                         indexer.add(indexRequest);
                    }
                }
                
                curl "localhost:9200/_cat/indices?v"
                
                curl "localhost:9200/sensor/_search?pretty" 查看数据
                
            Flink Sink Mysql
                mysql-connector-java
                
                //装换成SensorReading类型
                DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
                });
    
                dataStream.addSink(new MyJdbcSink());
                
                env.execute();
            
                public static class MyJdbcSink extends RichSinkFunction<SensorReading>{
                    Connection conn = null;
                    PreparedStatement insertStmt = null;
                    PreparedStatement updateStmt = null;
                    
                    //创建连接
                    @Override
                    public void open(Configuration parameters){
                        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
                        insertStmt = conn.propareStatement("insert into sensor_temp (id,temp) values (?,?)");
                        updateStmt = conn.propareStatement("update sensor_temp set temp = ?  where id = ?");
                    }
                    
                    //每来一条数据,调用连接执行sql
                    @Override
                    public void invoke(SensorReading value,Context cxt){
                        //直接执行更新语句,如果没有更新那么就插入
                        updateStmt.setDouble(1,value.getTemperature());
                        updateStmt.setString(2,value.getId());
                        updateStmt.execute();
                        if(updateStmt.getUpdateCount() == 0){
                            insertStmt.setString(1,value.getId());
                            insertStms.setDouble(2,value,getTemperature());
                            updateStmt.execute();
                        }
                    }
                    
                    @Override
                    public void close() throws Exception{
                        insertStmt.close();
                        updateStmt.close();
                        conn.close();
                    }
                }
    6.Flink中的Window
    
        window概念
            一般真实的流都是无界的,怎样处理无界的数据?
            可以把无限的数据流进行切分,得到有限的数据集进行处理---也就是得到有界流
            窗口(window)就是把无界流切割(某个时间段的)为有界流,
            它会将流数据分发到有限大小的桶(bucket)中进行分析。
        
        window类型
            时间窗口(Time Window)
                滚动时间窗口(Tumling Windows)window size
                    将数据根据固定的窗口长度对数据切分
                    时间对齐,窗口长度固定,没有重叠
                    dataStream.keyBy("id")//必须先Keyby
                    .timeWindow(Time.seconds(15)//十五秒);
                
                滑动时间窗口(Sliding Windows)window size/window slide
                    滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
                    窗口长度固定,可以有重叠
                    .timeWindow(Time.seconds(15),//参数二);
                    
                会话窗口(Session Windows)session gap
                    由一系列事件组合一个指定时间长度的timeout间隙组成,
                    一段时间没有收到新数据就会生成新的窗口。
                    特点:时间无对齐
                    .window(EventTimeSessionWindows.withGap(Time.minutes(1)//间隔1分钟));
                    
            计数窗口(Count Window)
                滚动计数窗口
                    .countWindow(parms1,)
                滑动计数窗口
                    .countWindow(parms1,parms2)
        窗口分配器4类
            window()方法接收的输入参数是一个WindowAssigner
            WindowAssigner负责将每条输入的数据分发到正确的window中
            GlobalWindows
            Tumling Windows
            Sliding Windows
            Session Windows        
            
        窗口函数(window Funciton)(无界变有界流分桶后的聚合)
            window function定义了要对窗口中收集的数据做的计算操作
            增量聚合函数-统计(incrementa aggregation function)
                每条数据到来就进行计算,保持一个简单状态
                ReduceFunction,
                AggregateFunction
            全窗口函数(full window function)
                先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
                ProcessWindowFunction,
                WindowFunction
            代码:
                //开窗测试
                //socket文本流
                DataStream<String> inputStream = env.socketTextStream("localhost",7777);
    
                //增量聚合函数
                DataStream<Integer> resultStream= dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction<SensorReading,Integer,Integer>(){
                    @Override
                    创建累加器
                    @Override
                    累加
                    @Override
                    结果
                    @Override
                    merge session窗口一般操作
                })
                resultStream.pring();
                
                env.execute();
                
                //全窗口聚合函数
                DataStream<Tuple3<String,Long,Integer>> resultStream2= dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction<SensorReading,Tuple3<String,Long,Integer>,Tuple,TimeWindow>(){
                    @Override
                    public void apply(Tuple tuple,TimeWindow window,Iterable<SensorReading> input,Collector<Tuple3<String,Long,Integer>> out){
                        String id = tuple.getField(0);
                        Long windowEnd = window.getEnd();
                        Integer count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple<>(id,windowEnd,count));
                    }
                })
                或
                .pracess(new ProcessWindowFunction<SendorReading,Object,Tuple,TimeWindow>(){
                })
                resultStream2.pring();
                
                env.execute();
                
            其他API
                .trigger()--触发器
                .evictor()--移除器
                .allowedLateness(Time.minutes(1))--允许处理迟到的数据
                .sideOutputLateData()--将迟到的数据放入侧输出流
                .getSideOutput()--获取侧输出流
                分为:keyby后开窗,和不keyby开窗如有:windowAll()...
                
    7.Flink的时间语义与Wartermark
        时间语义概念
            Flin中的时间语义
                EventTime事件创建时间->KafKaQueue->IngestionTime数据进入Flink的时间->ProcessingTime开窗后处理算子的本地系统时间如:.timeWindow(Time.seconds(5)),与机器相关
            设置Event Time
                事件时间
                //对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性            
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                //具体时间需要从数据中提取时间戳,不设置默认是IngestionTime数据进入Flink的时间
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            水位线(Watermark)
                EventTime处理由网络原因发生的乱序数据(水位标记)
                Flink以EventTime模式处理数据流时,会根据数据里的时间戳处理基于时间的算子
                
                把EventTime进展变慢了,比如把表时间调慢了2分钟时间还是10点发车,
                如设置5秒开窗的时间窗口.timeWindow(Time.seconds(5)//五秒),处理乱序时间数据
                并表时间调慢3秒:1 4 5/秒(不关闭) 2 3 6 7 8窗口关闭
                
                遇到一个时间戳达到啦窗口关闭时间,不应该立即出发窗口计算,而是等待一段时间,等迟到的数据来了在关闭窗口。
                重点:watermark可以设置延迟触发时间窗口关闭时间
                     数据流中的watermark表示data timestamp小于watermark timestamp的数据都已经到达了因此Watermark触发window的执行
                      
                    1  2(watemark)  3  5  5(watemark)  6 7 8
            watermark是一条特殊的数据记录,必须单调递增,与数据的时间戳相关
            
            watermark的传递、引入和设定
                watermark在任务间的传递:
                    每一个任务都可能有上游多个并行任务再给他发watermark,并同时有并行向下游任务广播watermark,
                    从上游向下游传递是把watermark广播出去,通过watermark传递可以一层一层的推进eventTime,每一个任务eventTime不一样是对的因为流处理又先后顺序:有的数据是souce任务/Trform/sink任务当然eventTime不一样
                    以最小的watermark作为(当前任务)事件时钟向下游广播
            watermark在代码中的设置
                //AssignerWithPunctuatedWatermarks//断电性指定数据和时间戳生成watermark
                //AssignerWithPeriodicWatermarks  //周期性生成watermark
                //new AscendingTimestampExtractor<SendorReading>(){}//有序数据设置事件时间和watermark
                //new BoundedOutOfOrderness有界乱序数据情况下的时间戳提取器,1.提取时间戳2.生成设置watermark
                assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(TIme.second(2)//延迟时间,最大的乱序程度){
                    @Override
                    public long  extractTimestamp(SensorReading element){
                        return element.getTimestamp() * 1000L;
                    }
                });
                            
                OutputTag<SensorReaqding> outputTag = new OutputTag<SensorReading>("late"//控制台输出时的标识){};
                //迟到数据处理
                DataStream minTempStream = minTempStream.keyBy("id")
                                        .timeWindow(Time.minutes(15));
                                        .allowedLateness(1)    //设置迟到1分钟关窗
                                        .sideOutputLateData(outputTag);//放入侧输出流
                                        .minBy("temperature");//获取字段最小值
                                        
                minTempStream.pring()打印流//.pring("minTemp")表示minTemp是打印输出时的标识            
                minTempStream.getSideOutput(outputTag).pring("late");
    
                env.execute();
                总结:watermark延时时间 对所有时间无论是time window,还是allowedLateeness(1)迟到时间,都影响生效                    
                                        
                
        时间语义的应用
        
        事件时间语义的设置
        
        Wartermark概念和原理
    
    Flink状态管理
        状态的作用为了保证:扩容并行度调整,机器挂了,内存不够,机器挂了
        1.Flink会进行状态管理,状态一致性的保证,2.故障后的恢复以及高效存储和访问,以便开发人员专注于应用程序的逻辑。
        2.可以认为状态就是一个本地变量(在内存中),可以被当前任务的所有业务逻辑访问到,不会跨任务访问状态。
        
        无状态的(任务/算子):map/flamap/比如:timewindow().状态()
        有状态的(任务/算子)     :window/redcue/...
        状态和有状态的算子相关联,为了运行时了解算子状态需要先注册其状态
        
        算子状态Operatior state
            作用范围:当前的(算子)访问,当前任务输入来的数据都能访问到当前状态
            同一个分区访问同一个状态
            
            状态对于同一子任务是共享的
            
            算子状态的数据结构
                List state、列表状态:一直数据的列表
                Union List state、联合列表状态:发生故障时,或者从保存点(savepoint)启动应用程序如何恢复
                Broadcast state    广播状态:如果一个算子有多个任务,每个任务状态又相同,这种情况适合广播状态
            算子状态代码使用
                public static class MyCountMapper implements MapFunction<SensorReading,Integer>,ListCheckpointed<Integer>{
                    //作为算子状态
                    private Integer count = 0;
    
                    @Override
                    public Integer map(SensorReading value) throws Exception{
                        count++;
                        return count;
                    }
                    
                    @Override //操作状态到List
                    public List<Integer> snapshotState(long checkpointId,long timestamp) throws Exception{
                        
                        return Collections.singletonList(count);
                    }
                    
                    @Override //读取状态
                    public void restoreState(List<Integer> state) throws Exception{
                        for(Integer num : state)
                            count += num; 
        
                    }
                }
        键控状态Keyed state//分组/分区KeyBy的状态KeyBy的状态
            1.根据输入数据流中的定义的键(key),来访问和维护
            2.Flink为每个key维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,
            这个任务会维护和处理这个key对应的状态。
            3.当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。
            
            键控状态数据结构
                Value state值状态:将状态表示为单个值
                List state:状态表示为一组数据列表
                Map state:状态表示为一组Key-value
                Reducing state & Aggregating state聚合状态:表示为一个用于操作的列表
    
            键控状态代码使用
                //声明
                Private ValueState<Integer> keyCountState= getRuntimeContext().getState(new ValueStateDescriptor<Integer>("my-value",Integer.class));
                                       getRuntimeContext().getListState(...);
                                       ......
                //读取状态
                Integer myValue = myValueState.value();
                //对状态赋值
                myValueState.update(10);
        状态后端state Backends
            每传入一条数据,(有状态的)算子任务都会读取和更新状态
            所以每个并行任务都会在本地维护其状态,以确保快速的状态访问
            状态的存储,访问以及维护,由一个可插入的组件决定,这个组件就是状态后端(state backends)
            状态后端主要负责两件事:本地状态管理,以及检查点(checkpoint)状态写入远程存储
            
            MemoryStateBackend
                内存级的状态后端,会将键控状态作为内存中的对象管理,将它们存储在TaskManager的JVM堆上,
                而将checkpoint存储在JobManager的内存中,特点:快速,低延迟,但不稳定
            
            FsStateBackend
                将checkpoint存到远程的持久化文件系统(flieSystem)上,而对于本地状态,跟MemoryStateBackend一样,
                也会存在TaskManager的JVM堆上,特点:有内存及本地访问速度,容错保证。
            
            RocksDBStateBackend
                将所有状态序列后,存入本地RocksDB中,特点:读写速度慢,
            
            flink-conf.yml中配置状态后端 
            env.setStateBackend(new MemoryStateBackend());
            env.setStateBackend(new FsStateBackend("远程IP"));
            env.setStateBackend(new RocksDBStateBackend("远程IP"));需引入maven依赖
        
    
    8.ProcessFunction API
        3层API分别为:顶层SQL/TableApi->DataStream/DataSetAPI->Stateful Stream Processing
        我们之前学习的转换算子是无法访问事件的时间戳信息和Watermark信息,如:MapFunction这样的map转换算子无法访问
        这在一些场景下极为重要,
        DataStream API提供了Low-Level转换算算子,可以访问时间戳,watermark以及注册定时事件,还可以特定事件如:超时事件
        ProcessFunction用来构建事件驱动的应用 及 自定义业务逻辑,Flink SQL就是使用ProcessFunction实现的
        
        Flink提供了8个Process Function
            ProcessFunction
            KeyedProcessFunction 分组后调用KeyedProcessFunction   :常见
            CoProcessFunction     链接流后调用
            ProcessJoinFunction     两条流join后
            BroadcastProcessFunction 广播流后调
            KeyedBroadcastProcessFunction 分组广播后
            ProcessWindowFunction     全窗窗口
            ProcessAllWindowFunction 不keyby直接基于DataStream 开窗时时调
            
            dataStream.keyBy("id");
                      .process(new MyProcess());
                      .pring();
    
            //KeyedProcessFunction 测试
            public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer>{
                ValueState<Long> tsTimerState;
                
                @Override
                public void processElement(SensorReading value,Context ctx,Collector<Integer> out )throws Exception{
                    out.collect(value.getId().length());
                
                    ctx.timestamp();//获取时间戳
                    ctx.getCurrentKey()//当前key
                    ctx.output();
                    ctx.timerService().currentProcessingTime();处理时间
                    ctx.timerService().currentWatermark();//事件时间
                    ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000L);//处理时间的定时器
                    tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);//状态保存时间
                    ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);//事件时间的定时器
                    ctx.timerService().deleteProcessingTimeTimer(10000L)//删除定时器
                }
                
                @Override
                public void onTimer(long timestamp,OnTimerContext ctx,Collector<Integer> out)throws Exception{
                    system.out.print(timestamp + "定时器触发");
                }
            }
            
             //需求:监控温度传感器的温度值,如果温度值在10秒内连续上升,则报警
                dataStream.keyBy(SendorReading::getId);
                  .process(new TempIncreaseWarning(10));
                  .pring();
                public static class TempIncreaseWarning extends KeyedProcessFunction<String,SendorReading,String>{
                    //当前统计的时间间隔
                    private Integer interval;
                    
                    public TempIncreaseWarning(Integer interval){
                        this.interval = interval;
                    }
                    
                    //定义状态,保存上一次温度值,定时器时间戳
                    private ValueState<Double> lastTempState;
                    private ValueState<Long> timerTsState;
                    
                    @Override
                    public void open(Configuration parameters)throws Exception{
                        lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp",Double.class,Double.Min_value));
                        timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp",Long.class));
                    }
                    
                    @Override
                    public void processElement(SensorReading value,Context ctx,Collector<String> out )throws Exception{
                        
                        //取出状态
                        Double lastTemp = lastTempState.value();
                        Long timerTs = timerTsState.value();
                
                        //如果温度上升并且没有定时器,注册10秒的定时器
                        if(value.getTemperature() > lastTemp && timerTs == null){
                            //计算出定时器时间戳
                            Long ts = ctx.timerService.currentProcessingTime() + interval * 1000L;
                            ctx.timerService.registerProcessingTimeTimer(ts);
                            timerTsState.update(ts);
                        }else if{
                        //如果温度下降,并有定时器,删除定时器
                            ctx.timerService.deleteProcessingTimeTimer(timeTs);
                            timeTsState.clear();
                        }
                        
                        //更新温度状态
                        lastTempState.update(value.getTemperature());    
                    }
                    
                    @Override
                    public void onTimer(long timestamp,OnTimerContext ctx,Collector<String> out)throws Exception{
                         //定时器触发,输出报警信息
                         out.collect("传感器"+ctx.getCurrentKey().getField(0)+"温度值连续"+interval+"s上升");
                         lastTempStat.clear();
                    }
                    
                    @Override
                    public void close()throws Exception{
                        lastTempStat.clear();
                    }
                }        
            
            侧输出流(SideOutput)
                除了split算子可以将一条流分成多条流,这些流的数据类型也都相同。
                processFunction的side output功能也能产生多条流,这些流的数据类型可以不一样
                一个 side output可以定义为OutputTag[X]对象,X是输出流的数据类型,
                processfuntion通过Context对象发射一个事件到一个或多个side output
            
    9.状态编程和容错机制(暂缓,实操时学习)
        1.有状态的算子和应用程序
            算子状态(operator state)
            键控状态(keyed state)
        2.状态一致性
            一致性级别
            端到端(end-to-end)状态一致性
        3.检查点(checkpoint)
            flink的检查点算法
            flink+kafka如何实现端到端的exa...
        4.选择一个状态后端(state backend)
        
    
    10.Table API和SQL
        Flinke对批处理和流处理,提供了统一的上层API
        TableAPI是一套内嵌在Java和Scala语言中的查询API,
        代码案例:
            引入依赖
            flink-table-planner_2.12
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            env.setParallelism(4);设置并行度
            //从文件读取数据
            DataStream<String> dataStreams = env.readTextFile("C:\RuanJian\jeecg-boot\resource\sensor.txt");
                
            //装换成SensorReading类型
            DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
            });
            
            //创建表环境
            StreamTableEnvironment tableEvn = StreamTableEnvironment.create(env);
            //基于流创建一张表
            Table dataTable = tableEvn.fromDateStream(dataStream);
            
            //调用table Api进行转换    方式一
            Table resultTable = dataTable.select("id,temperature")
                                        .where("id = ‘sendor_1’");
            //执行SQL    方式二
            tableEnv.createTemporarView("sensorView",dataTable);//dataTable注册为sensor,自定义的
            String sql = "select id,temperature from sensorView where id = 'sensor_1'";
            Table resultSqlTable = tableEvn.sqlQuery(sql);
            
            //输出数据
            tableEnv.toAppendStream(resultTable,Row.class).pring("result");
            tableEnv.toAppendStream(resultSqlTable,Row.class).pring("sql");
            
            env.execute();
            
        基本程序结构    
            TableAPI和SQL的程序结构,与流式处理的程序结构十分类似。
            StreamTableEnvironment tableEnv = ...
            
            //创建一张表,用于读取数据,表名为:inputTable
            tableEnv.connect().createTemporarTable("inputTable");
            
            //创建一张表,用于把计算结果输出
            tableEnv.connect().createTemporaryTable("outputTable");
            
            //通过Table Api查询算子,得到一张结果表
            Table result = tableEnv.from("inputTable").select();
            //通过SQL查询语句,得到一张结果表
            String Sql = "select * from input Table ..."
            Table sqlResult = tableEnv.sqlQuery(sql);
            
            //输出结果表到输出表中
            result.insertInto("outputTable");
        表环境配置StreamTableEnvironment->继承TableEnvironment
            //1.1老版本planner的流处理
            EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
                                                            .useOldPlanner()  //设置处理的版本
                                                            .inStreamingMode()//设置流处理或批处理模式
                                                            .build()
            StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env,oldStreamSettings);
    
            //1.2老版本Flink planner的批处理
            ExecutionEvnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment oldBatchTableEnv = StreamTableEnvironment.create(batchEnv);
            
            
            //2.1基于Blink的流处理
            EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                                                            .useBlinkPlanner()//设置处理的版本
                                                            .inStreamingMode()//设置流处理或批处理模式
                                                            .build()
            StreamTableEnvironment blinkStreanTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings);
            //2.2基于Blink的批处理
            EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
                                                    .useBlinkPlanner()//设置处理的版本
                                                    .inBatchingMode()//设置流处理或批处理模式
                                                    .build()
            TableEnvironment blinkBatchTableEnv = TableEnvironment.create(env,blinkBatchSettings);
        创建表-从文件读取数据    
            TableEnvironment可以注册目录Catalog,并可以基于Cataog注册表 
            表(Table)是标识符(identifier)来指定的,3部分组成:Catalog名、数据库database名,对象名
            表可以是常规也可以是视图View
            常规表(Table)一般用来描述外部数据,如文件,数据库或从消息队列的数据,或从DataStream转换而来的数据
            视图(View)可以从现有表中创建、通常是table API或SQL查询的一个结果集。
            
            TableEnvironment可以调用.connect()方法,连接外部系统
            并调用.createTemporaryTable()方法,在Catalog中注册表
                tableEnv.connect()//定义表的数据来源,和外部系统建立连接
                        .withFormat()//定义数据格式化方法
                        .withSchema()//定义表结构
                        .createTemporaryTable("MyTable")//创建临时表
            读取文件数据
                String filePath="D:\RuanJian\java03\jeecg-boot\src\main\resources\sensor.txt";
                tableEnv.connect(new FileSystem().path(filePath))//外部文件系统连接
                        .withFormat(new Csv())//以csv格式进行数据格式化
                        .withSchema(new Schema()
                            .field("id",DataTypes.STRING())
                            .field("timestamp",DataTypes.BIGINT())
                            .field("temp",DataTypes.DOUBLE())
                        )
                        .createTemporaryTable("inputTable");
                Table inputTable = tableEnv.from("inputTable");基于临时表(外部数据)创建一张表
                
                inputTabel.printSchema()//打印表结构
                tableEnv.toAppendStream(inputTable,Row.class).pring();//表转换为流打印输出
                
                env.execute();
        表的查询
            Table API是基于’表‘的Table类,提供一套操作方法,这些方法会返回一个新的Table对象。
            //有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。
            Table sensorTable = tableEnv.from("inputTable");
            1.0简单转换
            Table resultTable = sensorTable.select("id,temperature").filter("id === 'sensor_1'");
            
            1.1查询装换Table API:聚合装换
            Table aggTable = sensorTable.groupBy().select("id,id.count as count,temperature.avg as avgTemp");
            
            1.2SQL
            tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_6'");
            Table sqlAggTable = tableEnv.sqlQuery("select id,count(id)as cnt,avn(temperature) as avgTemp from inputTable group by id");
            
            tableEnv.toAppendStream(resultTable.Row.class).pring("result");
            tableEnv.toRetractStream(aggTable.Row.class).pring("agg");
            tableEnv.toRetractStream(sqlAggTable.Row.class).pring("sqlagg");
            
            env.execute();
        表的输出-输出到文件
            String outputPath="D:\RuanJian\java03\jeecg-boot\src\main\resources\out.txt";
                tableEnv.connect(new FileSystem().path(outputPath))//外部文件系统连接
                        .withFormat(new Csv())//以csv格式进行数据格式化
                        .withSchema(new Schema()
                            .field("id",DataTypes.STRING())
                            .field("temperature",DataTypes.DOUBLE())
                        )
                        .createTemporaryTable("outputTable");
                        
                resultTable.insertInto("outputTable");//将转化后的数据写到outputTable对应的文件中
                
                env.execute();
        KafKa数据连接,读写kafka
            tableEnv.connect(new KafKa()
                            .version("0.11")
                            .topic("sensor")
                            .property("zookeeper.connect","localhost:2181")
                            .property("bootstrap.servers","localhost:9092")
            )
                    .withFormat(new Csv())//序列化和反序列化Json或Csn
                    .withScheam(new Schema()
                            .field("id",DataTypes.STRING())
                            .field("timestamp",DataTypes.BIGINT())
                            .field("temp",DataTypes.DOUBLE())
                    )
                    .createTemporaryTable("inputTable");
            Table sensorTable = tableEnv.from("inputTable");
            1.0简单转换
            Table resultTable = sensorTable.select("id,temperature").filter("id === 'sensor_1'");
            
            1.1查询装换Table API:聚合装换
            Table aggTable = sensorTable.groupBy().select("id,id.count as count,temperature.avg as avgTemp");
    
            tableEnv.connect(new KafKa()
                            .version("0.11")
                            .topic("sinkTest")
                            .property("zookeeper.connect","localhost:2181")
                            .property("bootstrap.servers","localhost:9092")
            )
                    .withFormat(new Csv())//序列化和反序列化Json或Csn
                    .withScheam(new Schema()
                            .field("id",DataTypes.STRING())
                            //.field("timestamp",DataTypes.BIGINT())
                            .field("temp",DataTypes.DOUBLE())
                    )
                    .createTemporaryTable("outputTable");
            resultTable.insertInto("outputTable");
            
            env.execute();
        更新模式
            对于流式查询,需要声明如何在表和外部连接器之间转换。
            与外部系统交换的消息类型,由更新模式(Update Mode)指定
            Append追加模式
                表和外部连接器只交换插入(insert)消息
            
            Retract撤回模式
                表和外部链接器交换添加add 和撤回retract 消息
                插入操作 编码为add消息,
                删除操作 编码为retract消息
                更新操作 编码为上一条的retract和下一条的add消息
            
            Upsert更新插入模式
                更新和插入都被编码为Upsert消息;
                删除编码为delete消息
        
        输出到外部系统
            输出到ES
                tableEnv.connect(new Elasticsearch()
                        .version("6")
                        .host("localhost",9200,"http")
                        .index("sensor")
                        .documentType("temp")
                    )
                    .inUpsertMode()//默认是Append模式,设置为Upsert模式
                    .withFormat(new Json())
                    .withSchema(new Schema()
                        .field("id",DataTypes.STRING())
                        .field("count",DataTypes.BIGGINT)
                    )
                    .createTemporarTable("esoutputTable");
                    
                aggresultTable.insertInto("esoutputTable");
                
            输出到Mysql
                flink-jdbc_2.12
                String sinkDDL=
                    "create table jdbcOutputTable("+
                    " id varchar(20) not null, "+
                    " cnt bigint not null"+
                    ") with ("+
                    " 'connector.type' = 'jdbc', "+
                    " 'connector.url' = 'jdbc:mysql://localhost:3306/test', "+
                    " 'connector.table' = 'sensor_count' "+
                    " 'connector.driver' = 'com.mysql.jdbc.Driver' "+
                    " 'connector.usernaem' = 'root' "+
                    " 'connector.password' = '123456' )";
                tableEnv.sqlUpdate(sinkDDL)    //执行DDL创建表
                aggResultSqlTable.insertInto("jdbcOutputTable");
                
        表和流的装换
            Table装换为DataStream
                1.表可以转换为DataStream或DataSet,
                  这样自定义流处理或批处理程序就可以继续在Table API或SQL查询的结果上运行了。
                2.将Table 转换为 DataStream或DataSet时,需要指定生成的数据类型
                  即要将表的每一行转换成的数据类型
                3.表作为流式查询的结果,是动态更新的
                4.有两种转换模式:追加append模式 和 撤回retract模式
                        /*流转表*/
                        
                        默认转换后的Table Schema和DataStream中的字段定义一一对应,
                        也可以单独制定出来。
                            Table dataTable = tableEvn.fromDateStream(dataStream,"id,timestamp as ts, temperature");
                        
                        //基于流创建一张表
                        Table dataTable = tableEvn.fromDateStream(dataStream);
                        //基于临时表(外部数据)创建一张表
                        Table inputTable = tableEnv.from("inputTable");
                        //基于SQL创建表    方式二
                        tableEnv.createTemporarView("sensorView",dataTable);//dataTable注册为sensor,自定义的视图名
                        String sql = "select id,temperature from sensorView where id = 'sensor_1'";
                        Table resultSqlTable = tableEvn.sqlQuery(sql);
                        
                        /*表转流*/
                        tableEnv.toAppendStream(resultTable.Row.class).pring("result");
                        tableEnv.toRetractStream(aggTable.Row.class).pring("agg");
                        
                        append Mode模式:
                            用于表只会被插入操作更改的场景
                            DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable.Row.class).pring("result");
                        retract Mode模式:
                            用于任何场景
                            DataStream<Tuple2<Boolean,Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable,Row.class);
                        基于DataStream 创建临时视图            
                            tableEnv.createTemporarView("sensorView",dataStream,"id,temperature,timestamp as ts");//dataTable注册为sensor,自定义的视图名
                        基于Table 创建临时视图    
                            tableEnv.createTemporarView("sensorView",dataTable);//dataTable注册为sensor,自定义的视图名
            查看执行计划:通过TableEnvironment.explain(Table)方法或。explain()完成,返回字符串
                String explaination = tableEnv.explain(resultTable);
                System.out.print(explaination);
        
        动态表和持续查询    
            动态表(Dynamic Tables)
            动态表是Flink对流数据的Table API和SQL支持的核心概念
            与表示批处理数据的静态表不同,动态表是随时间变化的。
            持续(连续)查询(Continuous Query)
            动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)
            连续查询永远不会终止,并会生成另一个动态表
            查询会不断更新其动态结果表,以反映其动态输入表上的更改
            
            
            过程:Stream ->Dynamic Table->Continuous Query ->Dynamic Table ->Stream
            1.流被转换为动态表
            2.对动态表计算连续查询,生成新的动态表
            3.生成的动态表被转换回流
            
            将流转换成动态表
                为了处理带有关系查询的流,必须先转换为表
                从概念上讲,流的每个数据记录,都被解释为对结果表的插入insert修改update操作
                
            持续查询会在动态表上做计算处理,并作为结果生成最新的动态表    
                user     url            select uers,count(url) as cnt       user    cnt        
                mary    ./home        from clicks                          mary       2        //Upsert by KEY /- delete by KEY
                Bob        ./cart        group by user                      Bob       1
                myary    ./parod        
            将动态表转换成DataStream
                与常规的数据库表一样,动态表可以insert update  delete更改,进行持续的修改。
                将动态表装换为流或将其写入外部系统时,需要对这些更改进行编码
                Append-only仅追加流
                    仅通过insert更改来修改的动态表,可以直接转换为仅追加流
                retract撤回流
                    撤回流是包含两类信息的流:添加信息和撤回信息
                Upsert(更新插入)流
                    Upsert流也包含两类信息的流:Upsert信息和删除信息
            
        处理时间特性(Time Attributes)
            1.基于时间的操作如:TableAPI和SQL中的窗口操作,需要定义相关的时间语义和时间数据来源的信息
            2.Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
            3.时间属性,可以是每个表schema的一部分,一旦定义了时间属性,
            他就可以作为一个字段引用,并且可以在基于时间的操作中使用。
            4.时间属性的行为类似于常规时间戳,可以访问,并且进行计算
            
            定义处理时间(Processing Time)
             1.处理时间语义下,允许表处理程序根据机器的本地时间生成结果,他是时间的最简单概念,
               即不需要提取时间戳,也不需要生成watermark
            由DataStream转换成表时指定
                在定义Schema期间,可以使用.proctime,指定字段名定义处理时间字段
                这个proctime属性只能通过附加逻辑字段来扩展物理schema,因此只能在schema定义的末尾定义它
                Table sensorTable = tableEnv.fromDataStream(dataStream,"id,temp,timestamp,pt.proctime");pt.proctime系统时间
            定义Table Schema时指定
                .withScheam(new Schema()
                            .field("id",DataTypes.STRING())
                            .field("timestamp",DataTypes.BIGINT())
                            .field("temp",DataTypes.DOUBLE())
                            .field("pt",DataTypes.TIMESTAMP(3)).proctiom()
                    )
            在创建表的DDL中定义
                String sinkDDL=
                    "create table jdbcOutputTable("+
                    " id varchar(20) not null, "+
                    " ts bigint, "+
                    " temperature double, "+
                    " pt as PROCTIME(), "+
                    ") with ("+
                    " 'connector.type' = 'filesystem' "+
                    " 'connector.path' = '/sensor.txt' "+
                    " 'format.type' = 'csv' )";
                tableEnv.sqlUpdate(sinkDDL)    //执行DDL创建表
    
        事件时间特性
            和上述事件时间作用一样↑
            事件时间定义有三种方法:
                由DataStream转换成表时指定
                定义Table Schems时指定
                在创建表的DDL中定义
                
                由DataStream转换成表时指定:使用.rowtime可以定义事件时间属性
                    Table sensorTable = tableEnv.fromDataStream(dataStream,"id,temperature,timestamp.rowtime,rt.rowtime");
                
                .withScheam(new Schema()
                            .field("id",DataTypes.STRING())
                            .field("timestamp",DataTypes.BIGINT())
                            .rowtime(new Rowtime()
                                    .timestampsFromField("timestamp")//从字段中提取时间戳
                                    .watermarksPeriodicBound(1000)//watermark延迟一秒
                                    )
                            .field("temp",DataTypes.DOUBLE())
                )
        
                String sinkDDL=
                    "create table jdbcOutputTable("+
                    " id varchar(20) not null, "+
                    " ts bigint, "+
                    " temperature double, "+
                    " rt as TO_TIMESTAMP(FROM_UNIXTIME(ts)), "+
                    " watermark for rt as rt - interval '1' second "+
                    ") with ("+
                    " 'connector.type' = 'filesystem' "+
                    " 'connector.path' = '/sensor.txt' "+
                    " 'format.type' = 'csv' )";
                tableEnv.sqlUpdate(sinkDDL)    //执行DDL创建表
    
        分组窗口
            时间语义,要配合窗口操作才能发挥作用
            TableAPI和SQL中有两种窗口:
                Group Windows(分组窗口)
                    按窗口对表进行分组,窗口的别名必须在group by 子句中,像常规分组字段一样引用
                    Table table = in put.window([w:GroupWindow] as "w") //定义窗口别名w
                                       .groupBy("w,a")      //按字段a 和窗口w分组
                                       .select("a,b.sum");//聚合
                    滚动窗口(Tumbling window)
                        .window(Tumble.over("10.minutes").on("rowtime").as("w"))
                        .window(Tumble.over("10.minutes").on("proctime").as("w"))
                        .window(Tumble.over("10.rows").on("proctime").as("w"))
                    滑动窗口(Sliding window)
                        .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
                        .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
                        .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
                    会话窗口(session window)
                        .window(Session.withGap("10.minutes").on("rowtime").as("w"))
                        .window(Session.withGap("10.minutes").on("proctime").as("w"))
                    SQL中的Group window    
                        TUMBLE(time_attr,interval)参数1时间字段、窗口长度
                        滚动窗口    
                        HOP(time_attr,interval,interval)时间字段、窗口滑动步长、窗口长度
                        滑动窗口
                        SESSION(time_attr,interval)时间字段、窗口间隔
                        会话窗口
                    代码操作:
                        Table dataTable = tableEvn.fromDateStream(dataStream,"id,timestamp as ts, temperature");
                        1.1
                        Table resultTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
                                                     .groupBy("id,tw")
                                                     .select("id,id.count,temp.avg,tw.end")
                        1.2
                        String Sql = "select id,count(id) as cnt,avg(temp) as avgTemp,tumble_end(rt,interval '10' second) "+
                                    "from sensor group by id,tumble(rt,interval '10' second)";
                        Table SqlresultTable = tableEnv.SqlQuery(Sql);
                        tableEnv.toAppendStream(resultTable.Row.class).pring("result");
                        tableEnv.toRetractStream(SqlresultTable.Row.class).pring("sql");
                
                        env.execute();
        开窗函数
                Over Windows
                    使用window(w:overwindows*),并在select()方法中通过别名来引用
                    Table table = input.window([w:OverWindow] as "w") //定义窗口别名w
                                       .select("a,b.sum over w,c.min over w");//聚合
                    无界Over window
                        可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over window
                        无界Over window使用常亮指定的
                        //无界的事件时间 over window
                        .window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))  ///partitionBy分区/preceding前面多少
                        //无界的处理时间 over window
                        .window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))  //preceding前面多少
                        //无界的事件时间Row-count over window
                        .window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))  //preceding前面多少
                        //无界的处理时间Row-count over window
                        .window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))  //preceding前面多少
                    有界Over window
                        可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over window
                        无界Over window使用常亮指定的
                        //无界的事件时间 over window
                        .window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))  ///partitionBy分区/preceding前面多少
                        //无界的处理时间 over window
                        .window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))  //preceding前面多少
                        //无界的事件时间Row-count over window
                        .window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))  //preceding前面多少
                        //无界的处理时间Row-count over window
                        .window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))  //preceding前面多少
                    SQL中的Over window    
                        所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围
                        Order By必须在单一的时间属性上指定
                        Select count(amount) OVER(
                            PARTITION BY user
                            Order by proctime
                            rows between 2 preceding and current row
                        )
                        from Orders
                    代码操作:    
                        Table overresultTable = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
                                .select("id,rt,id.count over ow,temp.avg over ow")
                        
                        String Sql = "select id,rt,count(id) over ow,avg(temp) over ow "+
                                    "from sensor "+
                                    "window ow as (partition by id order by rt rows between 2 preceding and current row)";
                        
                        Table overSqlresultTable = tableEnv.SqlQuery(Sql);
                        tableEnv.toAppendStream(overresultTable.Row.class).pring("result");
                        tableEnv.toRetractStream(overSqlresultTable.Row.class).pring("sql");
        
                        env.execute();
        系统内置函数
            比较函数
                SQL:
                    value1 = value2
                    value1 > value2
                TableAPI:
                    ANY1 === ANY2
                    ANY1 > ANY2
            逻辑函数
                SQL:
                    boolean1 or boolean2
                    boolean is false
                    not boolean
                TableAPI:
                    boolean1 || boolean2
                    boolean.isFalse
                    !boolean
            算数函数
                SQL:
                    numeric1 +numeric2
                    power(numeric1,numeric2)
                TableAPI:
                    numeric1 + numeric2
                    numeric1.power(numeric2)
            字符串函数
                SQL:
                    string1 || string2 字符串拼接
                    upper(string)        转大写
                    char_length(string) 字符串长度
                TableAPI:
                    String1 + String2    
                    String.upperCase()
                    String.charLength()
            时间函数
                SQL:
                    Date string              日期YYYY-MM-DD HH:mm:ss
                    timestamp string    时间戳
                    current_time        当前时间
                    interval string range 时间间隔
                TableAPI:
                    string.toDate
                    string.toTimestamp
                    currentTime()
                    numeric.days
                    numeric.minutes
            聚合函数
                SQL:
                    count(*)        计数
                    sum(expression) 求和
                    rank()            排序取行号
                    row_number()
                TableAPI:
                    field.count
                    field.sum()
        标量函数(ScalarFnction)
            自定义函数UDF
                用户定义的函数必须先注册,然后才能在查询中使用
                函数通过调用registerFunction()方法在TableEnvironment中注册,当用户定义的函数被注册时,
                他被插入到TableEnvironment的函数目录中,这样TableApi或SQL解析器就可以识别并正确的解释它
            标量函数Scalar functions
                定义的标量函数,可以将0 1或多个标量值,映射到新的标量值
                必须在org.apache.flink.table.functions中扩展基类ScalarFuntion 并实现(一个或多个)求值(eval)方法
                求值方法必须公开声明命名为eval
                public static class HashCode extends ScalarFnction{
                    private int factor = 13;
                    public HashCode(int factor){
                        this.factor = factor;
                    }
                    public int eval(String s){
                        return s.hashCode() * factor;
                    }
                }
                //求id的hash值
                HashCode hashCode = new HashCode(23);
                //需要在环境中注册UDF
                tableEnv.registerFunction("HashCode" hashCode);
                Table resultTable = sensorTable.select("id,ts,hashCode(id)");
                
                //SQL
                tableEnv.createTemporaryView("sensor",sensorTable);
                String Sql = "select id,ts,hashCode(id) from second ";
                Table SqlresultTable = tableEnv.SqlQuery(Sql);
                
                tableEnv.toAppendStream(resultTable.Row.class).pring("result");
                tableEnv.toAppendStream(SqlresultTable.Row.class).pring("sql");
    
                env.execute();
        表函数(TableFunction)
            表函数的行为由求值方法决定,求值方法必须是public并命名为eval
            public static class Split extends TableFunction<Tuple2<String,Integer>>{
                private String separator = ",";
                public Split(String,separator){
                    this.separator = separator;
                }
                public void eval(String str){
                    for(String s : str.split(separator)){
                        collect(new Tuple2<String,Integer>(s,s.length()));
                    }
                }
            }
            
                Split split = new Split("_");
                //需要在环境中注册UDF
                tableEnv.registerFunction("split" split);
                Table resultTable = sensorTable.joinLateral("split(id) as (word,length)")
                                               .select("id,ts,word,length");
                
                //SQL
                tableEnv.createTemporaryView("sensor",sensorTable);
                String Sql = "select id,ts,word,length from second,lateral table(split(id)) as splitid(word,length) ";
                Table SqlresultTable = tableEnv.SqlQuery(Sql);
                
                tableEnv.toAppendStream(resultTable.Row.class).pring("result");
                tableEnv.toAppendStream(SqlresultTable.Row.class).pring("sql");
    
                env.execute();
        
        聚合函数(Aggregate Function)
            可以把一个表中的数据,聚合成一个标量值
            用户定义的聚合函数通过继承AggregateFunction实现必须实现的方法:
                createAccumulator()
                accumulate()
                getValue()
            AggregateFunctiond的工作原理
                首先需要一个累加器Accumulator,用来保存聚合中间结果的数据结构,通过createAccumulator()来创建
                随后,对每个输入行调用函数的accumulate()方法来更新累加器
                处理完所有行后,将调用函数的getValue()方法来计算并返回结果
                public static class AvgTemp extends AggregateFunction<Double,Tuple2<Double,Integer>>{
                    @Override
                    public Double getValue(){
                        return accumulator.f0 / accumulator.f1;
                    }
                    
                    @Override
                    public Tuple2<Double,Integer> createAccumulator(){
                        return new Tuple2<>(0.0,0);
                    }
                    
                    //必须实现一个accumulate方法,来数据之后更新状态
                    public void accumulate(Tuple2<Double,Integer> accumulator,Double temp){
                        accumulator.f0 += temp;
                        accumulator.f1 += 1;
                    }
                }
                
                AvgTemp avgTemp = new AvgTemp();
                //需要在环境中注册UDF
                tableEnv.registerFunction("avgTemp" avgTemp);
                Table resultTable = sensorTable.groupBy("id")
                                               .aggregate("avgTemp(temp) as avgtemp")
                                               .select("id,avgtemp");
                
                //SQL
                tableEnv.createTemporaryView("sensor",sensorTable);
                String Sql = "select id,avgTemp(temp) from second group by id ";
                Table SqlresultTable = tableEnv.SqlQuery(Sql);
                
                tableEnv.toRetractStream(resultTable.Row.class).pring("result");
                tableEnv.toRetractStream(SqlresultTable.Row.class).pring("sql");
    
                env.execute();
            
        表聚合函数(Tabl Aggregate Function)
            可以输出多个结果,可以把一个表中的数据聚合为具有多行多列的结果表
            通过继承TableAggregateFunction来实现
            必须实现的方法:
                createAccumulator()
                accumulate()
                emitValue()
            Table resultTable = sensorTable.groupBy("id")
                                   .flatAggregate("avgTemp(temp) as avgtemp")
                                   .select("id,avgtemp");
                
    11.Flink CEP简介 用户行为分析 用户画像
        Flink CEP高级API是复杂事件处理(Complex Event Processing)的库,理解为Flink SQL一个层级
        CEP允许在无休止的事件流中检测事件模式,让我们有机会掌握数据的重要部分。
        一个或多个简单事件构成的事件流通过一定的规则匹配,输出用户想得到的结果 ---复杂事件处理
  • 相关阅读:
    Keep it simple & stupid
    BZOJ 2631: tree( LCT )
    BZOJ 2843: 极地旅行社( LCT )
    BZOJ 2002: [Hnoi2010]Bounce 弹飞绵羊( LCT )
    BZOJ 1742: [Usaco2005 nov]Grazing on the Run 边跑边吃草( dp )
    BZOJ 3531: [Sdoi2014]旅行( 树链剖分 )
    BZOJ 1269: [AHOI2006]文本编辑器editor( splay )
    BZOJ 2016: [Usaco2010]Chocolate Eating( 二分答案 )
    BZOJ 1734: [Usaco2005 feb]Aggressive cows 愤怒的牛( 二分答案 )
    BZOJ 2101: [Usaco2010 Dec]Treasure Chest 藏宝箱( dp )
  • 原文地址:https://www.cnblogs.com/Bkxk/p/14209414.html
Copyright © 2011-2022 走看看