zoukankan      html  css  js  c++  java
  • [Spark][Streaming]Spark读取网络输入的例子

    Spark读取网络输入的例子:

    参考如下的URL进行试验

    https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
    http://www.cnblogs.com/FG123/p/5324743.html

    发现 先执行 nc -lk 9999 ,再执行 spark 程序之后,
    如果停止 nc ,spark程序会报错:

    类似于:

    -------------------------------------------
    Time: 2017-10-28 19:32:02
    -------------------------------------------

    17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

    这表明,两者已经建立 的 通信。但是没有看到预想的 word count 输出。我猜测是 用于参与计算的进程数不够,所以进行如下改动:

    sc = SparkContext("local[2]", "streamwordcount")

    改为:

    sc = SparkContext("local[3]", "streamwordcount")

    整个程序如下:

    [training@localhost ab]$ cat test.py
    #showing remote messages

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    if __name__ == "__main__":

    sc = SparkContext("local[3]", "streamwordcount")
    # 创建本地的SparkContext对象,包含3个执行线程

    ssc = StreamingContext(sc, 2)
    # 创建本地的StreamingContext对象,处理的时间片间隔时间,设置为2s

    lines = ssc.socketTextStream("localhost", 9999)

    words = lines.flatMap(lambda line: line.split(" "))
    # 使用flatMap和Split对2秒内收到的字符串进行分割

    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    wordCounts.pprint()

    ssc.start() 
    # 启动Spark Streaming应用

    ssc.awaitTermination()

    再次运行 nc 程序

    [training@localhost ~]$ nc -lk 9999

    运行 spark 程序:

    [training@localhost ~]$ spark-submit /home/training/ab/test.py

    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

    在nc窗口中输入一些数据:

    aaa bbb ccc
    ddd aaa sss
    sss bbb bbb

    kkk jjj mmm
    ooo kkk jjj
    mmm ccc ddd
    eee fff sss
    rrr nnn ooo
    ppp sss zzz
    mmm sss ttt
    kkk sss ttt
    rrr ooo ppp
    kkk qqq kkk
    lll nnn jjj
    rrr ooo sss
    kkk aaa ddd
    aaa aaa fff
    eee sss nnn
    ooo ppp qqq
    qqq sss eee
    sss mmm nnn

    此时,经过一小会,可以看到,spark 程序的窗口输出:

    ------------------------------------------- 
    Time: 2017-10-28 19:33:50
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:52
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:54
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:56
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:58
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:00
    -------------------------------------------
    (u'', 1)
    (u'mmm', 2)
    (u'bbb', 3)
    (u'nnn', 1)
    (u'ccc', 2)
    (u'rrr', 1)
    (u'sss', 3)
    (u'fff', 1)
    (u'aaa', 2)
    (u'ooo', 2)
    ...

    ------------------------------------------- 
    Time: 2017-10-28 19:34:02
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:04
    -------------------------------------------
    (u'ppp', 1)
    (u'sss', 1)
    (u'zzz', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:06
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:08
    -------------------------------------------
    (u'mmm', 1)
    (u'sss', 1)
    (u'ttt', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:10
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:12
    -------------------------------------------
    (u'sss', 1)
    (u'ttt', 1)
    (u'kkk', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:14
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:16
    -------------------------------------------
    (u'ppp', 1)
    (u'rrr', 1)
    (u'ooo', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:18
    -------------------------------------------
    (u'qqq', 1)
    (u'kkk', 2)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:20
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:22
    -------------------------------------------

  • 相关阅读:
    hdu 2203
    hdu 3081
    hdu 4240 最大流量路径
    b_vj_Fiber Network(floyd思想+状态压缩)
    b_vj_Corn Fields(预处理行的状态、合法状态+枚举当前行与上一行的状态)
    b_vj_Hackers' Crackdown(预处理所有集合+检查合法集合后进行状态转移)
    b_vj_Count Color(线段树+二进制表示颜色)
    b_vj_K-th Number(二分+线段树)
    b_lg_火烧赤壁(讨论完全覆盖/部分覆盖)
    b_hdu_Ping pong(树状数组+乘法原理)
  • 原文地址:https://www.cnblogs.com/gaojian/p/7749538.html
Copyright © 2011-2022 走看看