zoukankan      html  css  js  c++  java
  • Flink--3种分区方式

    partitionByHash

    //TODO partitionByHash
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    data.+=((7, 4L, "Comment#1"))
    data.+=((8, 4L, "Comment#2"))
    data.+=((9, 4L, "Comment#3"))
    data.+=((10, 4L, "Comment#4"))
    data.+=((11, 5L, "Comment#5"))
    data.+=((12, 5L, "Comment#6"))
    data.+=((13, 5L, "Comment#7"))
    data.+=((14, 5L, "Comment#8"))
    data.+=((15, 5L, "Comment#9"))
    data.+=((16, 6L, "Comment#10"))
    data.+=((17, 6L, "Comment#11"))
    data.+=((18, 6L, "Comment#12"))
    data.+=((19, 6L, "Comment#13"))
    data.+=((20, 6L, "Comment#14"))
    data.+=((21, 6L, "Comment#15"))
    val collection = env.fromCollection(Random.shuffle(data))
    val unique = collection.partitionByHash(1).mapPartition{
      line =>
        line.map(x => (x._1 , x._2 , x._3))
    }
    
    unique.writeAsText("hashPartition", WriteMode.NO_OVERWRITE)
    env.execute()

    Range-Partition

    //TODO Range-Partition
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    data.+=((7, 4L, "Comment#1"))
    data.+=((8, 4L, "Comment#2"))
    data.+=((9, 4L, "Comment#3"))
    data.+=((10, 4L, "Comment#4"))
    data.+=((11, 5L, "Comment#5"))
    data.+=((12, 5L, "Comment#6"))
    data.+=((13, 5L, "Comment#7"))
    data.+=((14, 5L, "Comment#8"))
    data.+=((15, 5L, "Comment#9"))
    data.+=((16, 6L, "Comment#10"))
    data.+=((17, 6L, "Comment#11"))
    data.+=((18, 6L, "Comment#12"))
    data.+=((19, 6L, "Comment#13"))
    data.+=((20, 6L, "Comment#14"))
    data.+=((21, 6L, "Comment#15"))
    val collection = env.fromCollection(Random.shuffle(data))
    val unique = collection.partitionByRange(x => x._1).mapPartition(line => line.map{
      x=>
        (x._1 , x._2 , x._3)
    })
    unique.writeAsText("rangePartition", WriteMode.OVERWRITE)
    env.execute()

    sortPartition

    根据指定的字段值进行分区的排序;

     //TODO Sort Partition
        val data = new mutable.MutableList[(Int, Long, String)]
        data.+=((1, 1L, "Hi"))
        data.+=((2, 2L, "Hello"))
        data.+=((3, 2L, "Hello world"))
        data.+=((4, 3L, "Hello world, how are you?"))
        data.+=((5, 3L, "I am fine."))
        data.+=((6, 3L, "Luke Skywalker"))
        data.+=((7, 4L, "Comment#1"))
        data.+=((8, 4L, "Comment#2"))
        data.+=((9, 4L, "Comment#3"))
        data.+=((10, 4L, "Comment#4"))
        data.+=((11, 5L, "Comment#5"))
        data.+=((12, 5L, "Comment#6"))
        data.+=((13, 5L, "Comment#7"))
        data.+=((14, 5L, "Comment#8"))
        data.+=((15, 5L, "Comment#9"))
        data.+=((16, 6L, "Comment#10"))
        data.+=((17, 6L, "Comment#11"))
        data.+=((18, 6L, "Comment#12"))
        data.+=((19, 6L, "Comment#13"))
        data.+=((20, 6L, "Comment#14"))
        data.+=((21, 6L, "Comment#15"))
        val ds = env.fromCollection(Random.shuffle(data))
        val result = ds
          .map { x => x }.setParallelism(2)
          .sortPartition(1, Order.DESCENDING)//第一个参数代表按照哪个字段进行分区
          .mapPartition(line => line)
          .collect()
        println(result)
      }
    }
  • 相关阅读:
    数组和切片
    if else,for循环,switch语句
    数据库介绍以及MySQL数据库的使用
    Django Rest Framwork的认证组件 权限组件以及频率组件
    Django Rest Frawwork框架的CBV FBV分析
    事务介绍
    celery介绍
    多道技术 进程 线程 协程 GIL锁 同步异步 高并发的解决方案 生产者消费者模型
    win10安装mysql8.0版本
    安装VMware Tools的注意事项
  • 原文地址:https://www.cnblogs.com/niutao/p/10548434.html
Copyright © 2011-2022 走看看