zoukankan      html  css  js  c++  java
  • python3+spark2.1+kafka0.8+sparkStreaming

    python代码:

    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    from operator import add
    
    
    sc = SparkContext(master="local[1]",appName="PythonSparkStreamingRokidDtSnCount")
    ssc = StreamingContext(sc, 2)
    zkQuorum = 'localhost:2181'
    topic = {'rokid':1}
    groupid = "test-consumer-group"
    lines = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
    lines1 = lines.flatMap(lambda x: x.split("
    "))
    valuestr = lines1.map(lambda x: x.value.decode())
    valuedict = valuestr.map(lambda x:eval(x))
    message = valuedict.map(lambda x: x["message"])
    rdd2 = message.map(lambda x: (time.strftime("%Y-%m-%d",time.localtime(float(x.split("u0001")[0].split("u0002")[1])/1000))+"|"+x.split("u0001")[1].split("u0002")[1],1)).map(lambda x: (x[0],x[1]))
    rdd3 = rdd2.reduceByKey(add)
    rdd3.saveAsTextFiles("/tmp/wordcount")
    rdd3.pprint()
    ssc.start()
    ssc.awaitTermination()

    执行SparkStreaming:

    spark/bin/spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar ReadFromKafkaStreaming.py

    其中spark-streaming-kafka-0.98-assembly_2.11-2.1.0.jar从以下网站下载
    http://search.maven.org

    作为入门参考。

  • 相关阅读:
    Rotation Kinematics
    离职 mark
    PnP 问题方程怎么列?
    DSO windowed optimization 代码 (4)
    Adjoint of SE(3)
    IMU 预积分推导
    DSO windowed optimization 代码 (3)
    DSO windowed optimization 代码 (2)
    OKVIS 代码框架
    DSO windowed optimization 代码 (1)
  • 原文地址:https://www.cnblogs.com/zhzhang/p/6792643.html
Copyright © 2011-2022 走看看