zoukankan      html  css  js  c++  java
  • [Spark][Streaming]Spark读取网络输入的例子

    Spark读取网络输入的例子:

    参考如下的URL进行试验

    https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
    http://www.cnblogs.com/FG123/p/5324743.html

    发现 先执行 nc -lk 9999 ,再执行 spark 程序之后,
    如果停止 nc ,spark程序会报错:

    类似于:

    -------------------------------------------
    Time: 2017-10-28 19:32:02
    -------------------------------------------

    17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

    这表明,两者已经建立 的 通信。但是没有看到预想的 word count 输出。我猜测是 用于参与计算的进程数不够,所以进行如下改动:

    sc = SparkContext("local[2]", "streamwordcount")

    改为:

    sc = SparkContext("local[3]", "streamwordcount")

    整个程序如下:

    [training@localhost ab]$ cat test.py
    #showing remote messages

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    if __name__ == "__main__":

    sc = SparkContext("local[3]", "streamwordcount")
    # 创建本地的SparkContext对象,包含3个执行线程

    ssc = StreamingContext(sc, 2)
    # 创建本地的StreamingContext对象,处理的时间片间隔时间,设置为2s

    lines = ssc.socketTextStream("localhost", 9999)

    words = lines.flatMap(lambda line: line.split(" "))
    # 使用flatMap和Split对2秒内收到的字符串进行分割

    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    wordCounts.pprint()

    ssc.start() 
    # 启动Spark Streaming应用

    ssc.awaitTermination()

    再次运行 nc 程序

    [training@localhost ~]$ nc -lk 9999

    运行 spark 程序:

    [training@localhost ~]$ spark-submit /home/training/ab/test.py

    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

    在nc窗口中输入一些数据:

    aaa bbb ccc
    ddd aaa sss
    sss bbb bbb

    kkk jjj mmm
    ooo kkk jjj
    mmm ccc ddd
    eee fff sss
    rrr nnn ooo
    ppp sss zzz
    mmm sss ttt
    kkk sss ttt
    rrr ooo ppp
    kkk qqq kkk
    lll nnn jjj
    rrr ooo sss
    kkk aaa ddd
    aaa aaa fff
    eee sss nnn
    ooo ppp qqq
    qqq sss eee
    sss mmm nnn

    此时,经过一小会,可以看到,spark 程序的窗口输出:

    ------------------------------------------- 
    Time: 2017-10-28 19:33:50
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:52
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:54
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:56
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:58
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:00
    -------------------------------------------
    (u'', 1)
    (u'mmm', 2)
    (u'bbb', 3)
    (u'nnn', 1)
    (u'ccc', 2)
    (u'rrr', 1)
    (u'sss', 3)
    (u'fff', 1)
    (u'aaa', 2)
    (u'ooo', 2)
    ...

    ------------------------------------------- 
    Time: 2017-10-28 19:34:02
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:04
    -------------------------------------------
    (u'ppp', 1)
    (u'sss', 1)
    (u'zzz', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:06
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:08
    -------------------------------------------
    (u'mmm', 1)
    (u'sss', 1)
    (u'ttt', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:10
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:12
    -------------------------------------------
    (u'sss', 1)
    (u'ttt', 1)
    (u'kkk', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:14
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:16
    -------------------------------------------
    (u'ppp', 1)
    (u'rrr', 1)
    (u'ooo', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:18
    -------------------------------------------
    (u'qqq', 1)
    (u'kkk', 2)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:20
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:22
    -------------------------------------------

  • 相关阅读:
    js插件ztree使用
    asp.net错误页和asp.net mvc错误页设置
    C#实现Excel的导入导出
    ios开发UI篇—使用纯代码自定义UItableviewcell实现一个简单的微博界面布局
    iOS开发UI篇—UITabBarController简单介绍
    iOS开发UI篇—字典转模型
    iOS开发UI篇—从代码的逐步优化看MVC
    iOS开发UI篇—九宫格坐标计算
    iOS开发UI篇—transframe属性(形变)
    iOS开发UI篇—懒加载
  • 原文地址:https://www.cnblogs.com/gaojian/p/7749538.html
Copyright © 2011-2022 走看看