zoukankan      html  css  js  c++  java
  • [Spark][Streaming]Spark读取网络输入的例子

    Spark读取网络输入的例子:

    参考如下的URL进行试验

    https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
    http://www.cnblogs.com/FG123/p/5324743.html

    发现 先执行 nc -lk 9999 ,再执行 spark 程序之后,
    如果停止 nc ,spark程序会报错:

    类似于:

    -------------------------------------------
    Time: 2017-10-28 19:32:02
    -------------------------------------------

    17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

    这表明,两者已经建立 的 通信。但是没有看到预想的 word count 输出。我猜测是 用于参与计算的进程数不够,所以进行如下改动:

    sc = SparkContext("local[2]", "streamwordcount")

    改为:

    sc = SparkContext("local[3]", "streamwordcount")

    整个程序如下:

    [training@localhost ab]$ cat test.py
    #showing remote messages

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    if __name__ == "__main__":

    sc = SparkContext("local[3]", "streamwordcount")
    # 创建本地的SparkContext对象,包含3个执行线程

    ssc = StreamingContext(sc, 2)
    # 创建本地的StreamingContext对象,处理的时间片间隔时间,设置为2s

    lines = ssc.socketTextStream("localhost", 9999)

    words = lines.flatMap(lambda line: line.split(" "))
    # 使用flatMap和Split对2秒内收到的字符串进行分割

    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    wordCounts.pprint()

    ssc.start() 
    # 启动Spark Streaming应用

    ssc.awaitTermination()

    再次运行 nc 程序

    [training@localhost ~]$ nc -lk 9999

    运行 spark 程序:

    [training@localhost ~]$ spark-submit /home/training/ab/test.py

    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

    在nc窗口中输入一些数据:

    aaa bbb ccc
    ddd aaa sss
    sss bbb bbb

    kkk jjj mmm
    ooo kkk jjj
    mmm ccc ddd
    eee fff sss
    rrr nnn ooo
    ppp sss zzz
    mmm sss ttt
    kkk sss ttt
    rrr ooo ppp
    kkk qqq kkk
    lll nnn jjj
    rrr ooo sss
    kkk aaa ddd
    aaa aaa fff
    eee sss nnn
    ooo ppp qqq
    qqq sss eee
    sss mmm nnn

    此时,经过一小会,可以看到,spark 程序的窗口输出:

    ------------------------------------------- 
    Time: 2017-10-28 19:33:50
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:52
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:54
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:56
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:33:58
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:00
    -------------------------------------------
    (u'', 1)
    (u'mmm', 2)
    (u'bbb', 3)
    (u'nnn', 1)
    (u'ccc', 2)
    (u'rrr', 1)
    (u'sss', 3)
    (u'fff', 1)
    (u'aaa', 2)
    (u'ooo', 2)
    ...

    ------------------------------------------- 
    Time: 2017-10-28 19:34:02
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:04
    -------------------------------------------
    (u'ppp', 1)
    (u'sss', 1)
    (u'zzz', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:06
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:08
    -------------------------------------------
    (u'mmm', 1)
    (u'sss', 1)
    (u'ttt', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:10
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:12
    -------------------------------------------
    (u'sss', 1)
    (u'ttt', 1)
    (u'kkk', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:14
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:16
    -------------------------------------------
    (u'ppp', 1)
    (u'rrr', 1)
    (u'ooo', 1)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:18
    -------------------------------------------
    (u'qqq', 1)
    (u'kkk', 2)

    ------------------------------------------- 
    Time: 2017-10-28 19:34:20
    -------------------------------------------

    ------------------------------------------- 
    Time: 2017-10-28 19:34:22
    -------------------------------------------

  • 相关阅读:
    【转载】微服务,我们需要哪些基础框架?
    Flume多Sink方案修正
    Linux find命令
    Kafka日志及Topic数据清理
    Kafka日志清除策略
    Oracle误删除数据的恢复方法
    Kafka中Topic级别配置
    Kafka server部署配置优化
    配置Kafka集群和zookeeper集群
    改变家目录
  • 原文地址:https://www.cnblogs.com/gaojian/p/7749538.html
Copyright © 2011-2022 走看看