zoukankan      html  css  js  c++  java
  • Flink--输入数据集Data Sources

    flink在批处理中常见的source

    flink在批处理中常见的source主要有两大类。

    1.基于本地集合的source(Collection-based-source)
    
    2.基于文件的source(File-based-source)

    在flink最常见的创建DataSet方式有三种。

    1.使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
    
    2.使用env.fromCollection(),这种方式支持多种Collection的具体类型
    
    3.使用env.generateSequence()方法创建基于Sequence的DataSet

    基于本地集合的

    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
    import scala.collection.immutable.{Queue, Stack}
    import scala.collection.mutable
    import scala.collection.mutable.{ArrayBuffer, ListBuffer}
    
    object DataSource001 {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        //0.用element创建DataSet(fromElements)
        val ds0: DataSet[String] = env.fromElements("spark", "flink")
        ds0.print()
    
        //1.用Tuple创建DataSet(fromElements)
        val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
        ds1.print()
    
        //2.用Array创建DataSet
        val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
        ds2.print()
    
        //3.用ArrayBuffer创建DataSet
        val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
        ds3.print()
    
        //4.用List创建DataSet
        val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
        ds4.print()
    
        //5.用List创建DataSet
        val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))
        ds5.print()
    
        //6.用Vector创建DataSet
        val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))
        ds6.print()
    
        //7.用Queue创建DataSet
        val ds7: DataSet[String] = env.fromCollection(Queue("spark", "flink"))
        ds7.print()
    
        //8.用Stack创建DataSet
        val ds8: DataSet[String] = env.fromCollection(Stack("spark", "flink"))
        ds8.print()
    
        //9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)
        val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))
        ds9.print()
    
        //10.用Seq创建DataSet
        val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))
        ds10.print()
    
        //11.用Set创建DataSet
        val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))
        ds11.print()
    
        //12.用Iterable创建DataSet
        val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
        ds12.print()
    
        //13.用ArraySeq创建DataSet
        val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))
        ds13.print()
    
        //14.用ArrayStack创建DataSet
        val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))
        ds14.print()
    
        //15.用Map创建DataSet
        val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
        ds15.print()
    
        //16.用Range创建DataSet
        val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))
        ds16.print()
    
        //17.用fromElements创建DataSet
        val ds17: DataSet[Long] =  env.generateSequence(1,9)
        ds17.print()
      }
    }
    View Code

    基于文件的source(File-based-source)

    (1):读取本地文件
    //TODO 使用readTextFile读取本地文件
    //TODO 初始化环境
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //TODO 加载数据
    val datas: DataSet[String] = environment.readTextFile("data.txt")
    //TODO 指定数据的转化
    val flatmap_data: DataSet[String] = datas.flatMap(line => line.split("\W+"))
    val tuple_data: DataSet[(String, Int)] = flatmap_data.map(line => (line , 1))
    val groupData: GroupedDataSet[(String, Int)] = tuple_data.groupBy(line => line._1)
    val result: DataSet[(String, Int)] = groupData.reduce((x, y) => (x._1 , x._2+y._2))
    result.print()
    View Code
    (2):读取hdfs数据
    //TODO readTextFile读取hdfs数据
    //todo 初始化环境
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //TODO 加载数据
    
    val file: DataSet[String] = environment.readTextFile("hdfs://hadoop01:9000/README.txt")
    val flatData: DataSet[String] = file.flatMap(line => line.split("\W+"))
    val map_data: DataSet[(String, Int)] = flatData.map(line => (line , 1))
    val groupdata: GroupedDataSet[(String, Int)] = map_data.groupBy(line => line._1)
    val result_data: DataSet[(String, Int)] = groupdata.reduce((x, y) => (x._1 , x._2+y._2))
    result_data.print()
    View Code
    (3):读取CSV数据
    //TODO 读取csv数据
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val path = "data2.csv"
    val ds3 = environment.readCsvFile[(String, String, String, String,String,Int,Int,Int)](
      filePath = path,
      lineDelimiter = "
    ",
      fieldDelimiter = ",",
      lenient = false,
      ignoreFirstLine = true,
      includedFields = Array(0, 1, 2, 3 , 4 , 5 , 6 , 7))
    val first = ds3.groupBy(0 , 1).first(50)
    first.print()
    View Code

    基于文件的source(遍历目录)

    flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。

    对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取

    val env = ExecutionEnvironment.getExecutionEnvironment
    val parameters = new Configuration
    // recursive.file.enumeration 开启递归
    parameters.setBoolean("recursive.file.enumeration", true)
    val ds1 = env.readTextFile("test").withParameters(parameters)
    ds1.print()
    View Code

    读取压缩文件

    对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

    //TODO  读取压缩文件
    val env = ExecutionEnvironment.getExecutionEnvironment
    val file = env.readTextFile("test/data1/zookeeper.out.gz").print()
    
    
    tar -czvf ***.tar.gz
    View Code
  • 相关阅读:
    mac下面xcode+ndk7配置cocos2dx & box2d的跨ios和android平台的游戏教程
    如何在macox下面配置集成ios和android游戏教程
    Cocos2d-x win7 + vs2010 配置图文详解(亲测)
    cocos2d-x学习资源汇总(持续更新。。。)
    我常用的iphone开发学习网站[原创]
    「C」关键字、标识符、注释、内存分析、数据、常量、变量
    「Foundation」集合
    「Foundation」字符串
    「Foundation」结构体
    「OC」block 和 protocol
  • 原文地址:https://www.cnblogs.com/niutao/p/10548451.html
Copyright © 2011-2022 走看看