zoukankan      html  css  js  c++  java
  • flink支持的数据类型讲解(可序列化) 和 内置累加器的运用

    flink支持的数据类型
    Flink对DataSet和DataStream中可使用的类型加了一些约束。原因是系统可以通过分析这些类型来确定有效的执行策略和选择不同的序列化方式。有7种不同的数据类型:
    1.java Tuple 和 Scala Case类:
    2.java POJO
    (指那些没有从任何类集成,也没有实现任何接口,更没有被其他框架侵入的java对象
    1.是公共类 2.无参构造是公共的 3.所有的属性是可获得的 4.自断必须是flink支持的。Flink会用Avro来序列化任意对象。Flink会分析POJO类型结构获知POJO字段。POJO类型比一般类型好用。此外,Flink访问POJO要比一般类型更高效)
    3.基本类型
    Flink支持java和scala所有的基本数据类型,比如integer,String和Double
    4.通用类
    Flink支持大多数的java scala类,包含不能序列化字段的类在增加一些限制后也可以支持。遵循java Bean规范的类一般都可以使用
    所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列化和反序列化
    5.值
    通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来序列化和反序列化,而不是使用通用的序列化框架
    Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,Char
    Value,BooleanValue)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的。允许程序重用对象从而缓解GC压力
    6.hadoop Writables
    7.特殊类型
    scala的Either Option和try
    java API有自己Either实现
    延伸支持
    java 范型的类型擦除机制

    Flink累加器的实现
    累加器的内置类型(计数)IntCounter,LongCounter和DoubleCounter
    Histogram

    如何使用累加器
    一.在自定义的转换操作里面创建累加器对象
    private IntCounter numLines=new IntCounter();
    二.注册累加器对象,通常在rich function的open()方法中。这里你还需要定义累加器的名字
    (继承RichFlatMapFunction实现open和close方法,只执行一次里面进行注册)
    getRuntimeContext().addAccumlator("num-lines",this.numLines)
    三.在operator函数的任何地方使用累加器,包括在open()和close()方法中
    this.numLines.add(1);
    第四步:结果存储在JobExecutionResult里:
    JobExecutionResult JobExecutionResult = env.execute(“Flink Batch java API Skeleton”)
    myJobExecutionResult.getAccumlatorResult("num-lines")

  • 相关阅读:
    DevExpress_Winform_使用汇总
    SVN的可视化日志统计工具StatSVN
    jenkins搭建.Net项目自动构建
    关闭tab时,定位到导航栏对应的激活tab
    DevExpress.XtraNavBar.NavBarControl 模拟单击导航的选项事件
    选中Tab控件单击事件,点击tab头,定位到导航栏的选项,为选中状态 ,DevExpress.XtraNavBar.NavBarControl 导航控件定位或选中状态 另导航的选项为选中状态.
    WCF服务部署到IIS问题汇总
    iOS多线程编程
    最新版SDWebImage的使用
    前端入门系列之CSS
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/11885728.html
Copyright © 2011-2022 走看看