zoukankan      html  css  js  c++  java
  • [Spark][Streaming]Spark读取网络输入的例子

    Spark读取网络输入的例子:

    参考如下的URL进行试验

    https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
    http://www.cnblogs.com/FG123/p/5324743.html

    发现 先执行 nc -lk 9999 ,再执行 spark 程序之后,
    如果停止 nc ,spark程序会报错:

    类似于:

    -------------------------------------------
    Time: 2017-10-28 19:32:02
    -------------------------------------------

    17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

    这表明,两者已经建立 的 通信。但是没有看到预想的 word count 输出。我猜测是 用于参与计算的进程数不够,所以进行如下改动:

    sc = SparkContext("local[2]", "streamwordcount")

    改为:

    sc = SparkContext("local[3]", "streamwordcount")

    整个程序如下:

    [training@localhost ab]$ cat test.py
    #showing remote messages

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    if __name__ == "__main__":

    sc = SparkContext("local[3]", "streamwordcount")
    # 创建本地的SparkContext对象,包含3个执行线程

    ssc = StreamingContext(sc, 2)
    # 创建本地的StreamingContext对象,处理的时间片间隔时间,设置为2s

    lines = ssc.socketTextStream("localhost", 9999)

    words = lines.flatMap(lambda line: line.split(" "))
    # 使用flatMap和Split对2秒内收到的字符串进行分割

    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    wordCounts.pprint()

    ssc.start() 
    # 启动Spark Streaming应用

    ssc.awaitTermination()

    再次运行 nc 程序

    [training@localhost ~]$ nc -lk 9999

    运行 spark 程序:

    [training@localhost ~]$ spark-submit /home/training/ab/test.py

    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

    在nc窗口中输入一些数据:

    aaa bbb ccc
    ddd aaa sss
    sss bbb bbb

    kkk jjj mmm
    ooo kkk jjj
    mmm ccc ddd
    eee fff sss
    rrr nnn ooo
    ppp sss zzz
    mmm sss ttt
    kkk sss ttt
    rrr ooo ppp
    kkk qqq kkk
    lll nnn jjj
    rrr ooo sss
    kkk aaa ddd
    aaa aaa fff
    eee sss nnn
    ooo ppp qqq
    qqq sss eee
    sss mmm nnn

    此时,经过一小会,可以看到,spark 程序的窗口输出:

    ------------------------------------------- 
    Time: 2017-10-28 19:33:50
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:52
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:54
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:56
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:58
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:00
    -------------------------------------------
    (u'', 1)
    (u'mmm', 2)
    (u'bbb', 3)
    (u'nnn', 1)
    (u'ccc', 2)
    (u'rrr', 1)
    (u'sss', 3)
    (u'fff', 1)
    (u'aaa', 2)
    (u'ooo', 2)
    ...

    ------------------------------------------- 
    Time: 2017-10-28 19:34:02
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:04
    -------------------------------------------
    (u'ppp', 1)
    (u'sss', 1)
    (u'zzz', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:06
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:08
    -------------------------------------------
    (u'mmm', 1)
    (u'sss', 1)
    (u'ttt', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:10
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:12
    -------------------------------------------
    (u'sss', 1)
    (u'ttt', 1)
    (u'kkk', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:14
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:16
    -------------------------------------------
    (u'ppp', 1)
    (u'rrr', 1)
    (u'ooo', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:18
    -------------------------------------------
    (u'qqq', 1)
    (u'kkk', 2)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:20
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:22
    -------------------------------------------

  • 相关阅读:
    wing ide 注释
    dn.net/blueheart20/article/details/22080489
    ubuntu安装R时候增加软件源到sources.list,sudo apt-get update不能更新
    ubuntu install oracle jdk
    Rhadoop安装
    有向图tarjan算法求连通分量的粗浅讲解、证明, // hdu1269
    POJ 3080 多个串最长公共子序列
    POJ 3461 kmp 应用
    POJ2752 NEXT[J]特性应用利用。
    HDU 1358字符串循环节问题 ,next数组
  • 原文地址:https://www.cnblogs.com/gaojian/p/7749538.html
Copyright © 2011-2022 走看看