zoukankan      html  css  js  c++  java
  • Flink-- 数据输出Data Sinks

    flink在批处理中常见的sink

    1.基于本地集合的sink(Collection-based-sink)
    
    2.基于文件的sink(File-based-sink)

    基于本地集合的sink(Collection-based-sink)

    //1.定义环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2.定义数据 stu(age,name,height)
    val stu: DataSet[(Int, String, Double)] = env.fromElements(
      (19, "zhangsan", 178.8),
      (17, "lisi", 168.8),
      (18, "wangwu", 184.8),
      (21, "zhaoliu", 164.8)
    )
    //3.TODO sink到标准输出
    stu.print
    
    //3.TODO sink到标准error输出
    stu.printToErr()
    
    //4.TODO sink到本地Collection
    print(stu.collect())
    View Code

    基于文件的sink(File-based-sink)

    flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。

    flink支持多种文件的存储格式,包括text文件,CSV文件等。

    Ø writeAsText():TextOuputFormat - 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。

    1、将数据写入本地文件
    //0.主意:不论是本地还是hdfs.若Parallelism>1将把path当成目录名称,若Parallelism=1将把path当成文件名。
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1: DataSource[Map[Int, String]] = env.fromElements(Map(1 -> "spark" , 2 -> "flink"))
    //1.TODO 写入到本地,文本文档,NO_OVERWRITE模式下如果文件已经存在,则报错,OVERWRITE模式下如果文件已经存在,则覆盖
    ds1.setParallelism(1).writeAsText("test/data1/aa", WriteMode.OVERWRITE)
    env.execute()
    View Code
    2、将数据写入HDFS
    //TODO writeAsText将数据写入HDFS
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1: DataSource[Map[Int, String]] = env.fromElements(Map(1 -> "spark" , 2 -> "flink"))
    ds1.setParallelism(1).writeAsText("hdfs://hadoop01:9000/a", WriteMode.OVERWRITE)
    env.execute()
    View Code

    可以使用sortPartition对数据进行排序后再sink到外部系统。

    //TODO 使用sortPartition对数据进行排序后再sink到外部系统
    val env = ExecutionEnvironment.getExecutionEnvironment
    //stu(age,name,height)
    val stu: DataSet[(Int, String, Double)] = env.fromElements(
      (19, "zhangsan", 178.8),
      (17, "lisi", 168.8),
      (18, "wangwu", 184.8),
      (21, "zhaoliu", 164.8)
    )
    //1.以age从小到大升序排列(0->9)
    stu.sortPartition(0, Order.ASCENDING).print
    //2.以name从大到小降序排列(z->a)
    stu.sortPartition(1, Order.ASCENDING).print
    //3.以age升序,height降序排列
    stu.sortPartition(0, Order.ASCENDING).sortPartition(2, Order.DESCENDING).print
    //4.所有字段升序排列
    stu.sortPartition("_", Order.ASCENDING).print
    //5.以Student.name升序
    //5.1准备数据
    case class Student(name: String, age: Int)
    val ds1: DataSet[(Student, Double)] = env.fromElements(
      (Student("zhangsan", 18), 178.5),
      (Student("lisi", 19), 176.5),
      (Student("wangwu", 17), 168.5)
    )
    val ds2 = ds1.sortPartition("_1.age", Order.ASCENDING).setParallelism(1)
    //5.2写入到hdfs,文本文档
    val outPath1="hdfs://hadoop01:9000/Student001.txt"
    ds2.writeAsText(outPath1, WriteMode.OVERWRITE)
    env.execute()
    //5.3写入到hdfs,CSV文档
    val outPath2="hdfs://hadoop01:9000/Student002.csv"
    ds2.writeAsCsv(outPath2, "
    ", "|||",WriteMode.OVERWRITE)
    env.execute()
    View Code
  • 相关阅读:
    【2017 Multi-University Training Contest
    【“玲珑杯”ACM比赛 Round #20 H】康娜的数学课
    【hdu 6181】Two Paths
    Cache coherence protocol
    C#实现图片的无损压缩
    收集一些常用的正则表达式
    收集一些常用的正则表达式
    收集一些常用的正则表达式
    Sql中存储过程的定义、修改和删除操作
    Sql中存储过程的定义、修改和删除操作
  • 原文地址:https://www.cnblogs.com/niutao/p/10548466.html
Copyright © 2011-2022 走看看