zoukankan      html  css  js  c++  java
  • Flink--3种分区方式

    partitionByHash

    //TODO partitionByHash
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    data.+=((7, 4L, "Comment#1"))
    data.+=((8, 4L, "Comment#2"))
    data.+=((9, 4L, "Comment#3"))
    data.+=((10, 4L, "Comment#4"))
    data.+=((11, 5L, "Comment#5"))
    data.+=((12, 5L, "Comment#6"))
    data.+=((13, 5L, "Comment#7"))
    data.+=((14, 5L, "Comment#8"))
    data.+=((15, 5L, "Comment#9"))
    data.+=((16, 6L, "Comment#10"))
    data.+=((17, 6L, "Comment#11"))
    data.+=((18, 6L, "Comment#12"))
    data.+=((19, 6L, "Comment#13"))
    data.+=((20, 6L, "Comment#14"))
    data.+=((21, 6L, "Comment#15"))
    val collection = env.fromCollection(Random.shuffle(data))
    val unique = collection.partitionByHash(1).mapPartition{
      line =>
        line.map(x => (x._1 , x._2 , x._3))
    }
    
    unique.writeAsText("hashPartition", WriteMode.NO_OVERWRITE)
    env.execute()

    Range-Partition

    //TODO Range-Partition
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    data.+=((7, 4L, "Comment#1"))
    data.+=((8, 4L, "Comment#2"))
    data.+=((9, 4L, "Comment#3"))
    data.+=((10, 4L, "Comment#4"))
    data.+=((11, 5L, "Comment#5"))
    data.+=((12, 5L, "Comment#6"))
    data.+=((13, 5L, "Comment#7"))
    data.+=((14, 5L, "Comment#8"))
    data.+=((15, 5L, "Comment#9"))
    data.+=((16, 6L, "Comment#10"))
    data.+=((17, 6L, "Comment#11"))
    data.+=((18, 6L, "Comment#12"))
    data.+=((19, 6L, "Comment#13"))
    data.+=((20, 6L, "Comment#14"))
    data.+=((21, 6L, "Comment#15"))
    val collection = env.fromCollection(Random.shuffle(data))
    val unique = collection.partitionByRange(x => x._1).mapPartition(line => line.map{
      x=>
        (x._1 , x._2 , x._3)
    })
    unique.writeAsText("rangePartition", WriteMode.OVERWRITE)
    env.execute()

    sortPartition

    根据指定的字段值进行分区的排序;

     //TODO Sort Partition
        val data = new mutable.MutableList[(Int, Long, String)]
        data.+=((1, 1L, "Hi"))
        data.+=((2, 2L, "Hello"))
        data.+=((3, 2L, "Hello world"))
        data.+=((4, 3L, "Hello world, how are you?"))
        data.+=((5, 3L, "I am fine."))
        data.+=((6, 3L, "Luke Skywalker"))
        data.+=((7, 4L, "Comment#1"))
        data.+=((8, 4L, "Comment#2"))
        data.+=((9, 4L, "Comment#3"))
        data.+=((10, 4L, "Comment#4"))
        data.+=((11, 5L, "Comment#5"))
        data.+=((12, 5L, "Comment#6"))
        data.+=((13, 5L, "Comment#7"))
        data.+=((14, 5L, "Comment#8"))
        data.+=((15, 5L, "Comment#9"))
        data.+=((16, 6L, "Comment#10"))
        data.+=((17, 6L, "Comment#11"))
        data.+=((18, 6L, "Comment#12"))
        data.+=((19, 6L, "Comment#13"))
        data.+=((20, 6L, "Comment#14"))
        data.+=((21, 6L, "Comment#15"))
        val ds = env.fromCollection(Random.shuffle(data))
        val result = ds
          .map { x => x }.setParallelism(2)
          .sortPartition(1, Order.DESCENDING)//第一个参数代表按照哪个字段进行分区
          .mapPartition(line => line)
          .collect()
        println(result)
      }
    }
  • 相关阅读:
    Javascript之DOM的三大节点及部分用法
    Javascript之全局变量和局部变量部分讲解
    《TCP/IP详解 卷1:协议》系列分享专栏
    说一说MySQL的锁机制
    《TCP/IP详解 卷1:协议》第3章 IP:网际协议
    PHP连接MySql闪断自动重连的方法
    关于MySQL的锁机制详解
    React 源码中的依赖注入方法
    《Mysql高级知识》系列分享专栏
    《AngularJS学习整理》系列分享专栏
  • 原文地址:https://www.cnblogs.com/niutao/p/10548434.html
Copyright © 2011-2022 走看看