zoukankan      html  css  js  c++  java
  • 创建RDD队列流

    1. import time
    2.  
    3. from pyspark import SparkContext
    4. from pyspark.streaming import StreamingContext
    5.  
    6. if __name__ == "__main__":
    7.  
    8. sc = SparkContext(appName="PythonStreamingQueueStream")
    9. ssc = StreamingContext(sc, 1)
    10.  
    11. # Create the queue through which RDDs can be pushed to
    12. # a QueueInputDStream
    13. rddQueue = []
    14. for i in range(5):
    15. rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
    16.  
    17. # Create the QueueInputDStream and use it do some processing
    18. inputStream = ssc.queueStream(rddQueue)
    19. mappedStream = inputStream.map(lambda x: (x % 10, 1))
    20. reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
    21. reducedStream.pprint()
    22.  
    23. ssc.start()
    24. time.sleep(6)
    25. ssc.stop(stopSparkContext=True, stopGraceFully=True)

    程序就开始运行,就可以看到类似下面的结果:

    -------------------------------------------                                     
    Time: 1479522100000 ms
    -------------------------------------------
    (4,10)
    (0,10)
    (6,10)
    (8,10)
    (2,10)
    (1,10)
    (3,10)
    (7,10)
    (9,10)
    (5,10)
  • 相关阅读:
    MySQL创建数据库与创建用户以及授权
    java关于map用来筛选的用法
    C#实体类的关联运用
    PHP 数据库基础操作
    PHP 文件操作
    PHP 加密和解密
    PHP 图形处理
    PHP Cookie和Session
    SQL基本操作
    JAVA基本术语
  • 原文地址:https://www.cnblogs.com/SoftwareBuilding/p/9449889.html
Copyright © 2011-2022 走看看