zoukankan      html  css  js  c++  java
  • 用Spark做key排序

    #直接调用sortByKey()函数就可以做到
    from pyspark import SparkContext

    sc = SparkContext('local','Sort')
    list = ["7","4","8","2","5"]
    textFile = sc.parallelize(list)
    textRDD =textFile.map(lambda word : (word, 1))
    textRDD.sortByKey().sortByKey().foreach(print)
    sortByKey:是对键值进行排序
    def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """
    Sorts this RDD, which is assumed to consist of (key, value) pairs.
    # noqa

    >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    >>> sc.parallelize(tmp).sortByKey().first()
    ('1', 3)
    >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
    [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
    [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
    >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
    >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
    [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
    """
    if numPartitions is None:
    numPartitions = self._defaultReducePartitions()

    memory = self._memory_limit()
    serializer = self._jrdd_deserializer
    sortBy: 是自定义排序
    def sortBy(self, keyfunc, ascending=True, numPartitions=None):
    """
    Sorts this RDD by the given keyfunc

    >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
    [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
    [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    """
    return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
     
     
    问题:当创建的RDd是从本地读取时,sortByKey排序出错
    实验数据:
          18
          2    
          89
          34
          67
          892
          34
          34
    代码:
      
    from pyspark import SparkContext

    sc = SparkContext('local','Sort')
    textFile = sc.textFile("file:///usr/local/spark/mycode/TestPackage/sort.txt")
    textRDD =textFile.flatMap(lambda line : line.split(" ")).map(lambda word : (word, 1))
    textRDD.sortByKey().foreach(print)
     
    效果:
    ('18', 1)
    ('2', 1)
    ('34', 1)
    ('34', 1)
    ('34', 1)
    ('67', 1)
    ('89', 1)
    ('892', 1)
     
        
     
  • 相关阅读:
    学习wxPython的一个例子
    Prismatic Joint(移动关节)
    利用Revolute Joint创建Motor
    FlashDevelop+OMSF第一个例子,关于编译常量的问题
    模拟从上向下看的汽车(Top Down Car Simulation)
    记一笔:As3监听键盘组合键如:Ctrl+Z,Ctrl+Y等
    渐变填充beginGradientFill或者lineGradientStyle的参数说明
    Revolute Joints(转动关节)
    播放本地视频的截图测试
    内建函数chr,str,ord的使用细节
  • 原文地址:https://www.cnblogs.com/SoftwareBuilding/p/9400552.html
Copyright © 2011-2022 走看看