zoukankan      html  css  js  c++  java
  • pyspark streaming

    一、一个例子

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    # create sc with two working threads
    sc = SparkContext("local[2]","test")
    # create local StreamingContext with batch interval of 1 second
    ssc = StreamingContext(sc,1)
    # create DStream that connects to localhost:9999
    lines = ssc.socketTextStream("localhost",9999)
    words = lines.flatMap(lambda line: line.split(" "))
    pairs = words.map(lambda x: (x,1))
    wordcount = pairs.reduceByKey(lambda x,y: x+y)
    # 打印DStream里每个RDD的前10个元素
    wordcount.pprint()
    ssc.start()
    ssc.awaitTermination()
    

    运行过程:
    1、linux 首先查看9999端口是否已经使用

    netstat -ntpl | grep 9999
    

    2、开启999端口

    nc -lk 9999
    

    如果在win10,使用

    nc -l -p 9999
    

    3、在新的窗口运行脚本,在之前的窗口输入字符串,在新窗口查看打印输出

    -------------------------------------------
    Time: 2021-10-21 15:49:17
    -------------------------------------------
    ('kaka', 2)
    ('tt', 1)
    

    二 spark连接kafka实现在线计算

    wordcount案例

    # kafka_spark.py
    import findspark
    
    findspark.init()
    
    import sys
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    if __name__ == "__main__":
        sc = SparkContext(appName="streamingkafka")
        sc.setLogLevel("WARN") # 减少shell打印日志
        ssc = StreamingContext(sc, 5) # 5秒的计算窗口  the time interval (in seconds) at which streaming data will be divided into batches.
        brokers="""hadoop102:9092,hadoop103:9092,hadoop104:9092"""
        topic = "static_online"
        # 使用streaming使用直连模式消费kafka
        # 参数:ssc StreamingContext
        #        topic 名称
        #       kafka节点
        kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
        lines_rdd = kafka_streaming_rdd.map(lambda x: x[1])
        counts = lines_rdd.flatMap(lambda line: line.split(" ")) 
            .map(lambda word: (word, 1)) 
            .reduceByKey(lambda a, b: a+b)
        # 将workcount结果打印到当前shell
        counts.pprint()
        ssc.start()
        ssc.awaitTermination()
    

    提交任务

    spark-submit  --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar  kafka_spark.py
    

    然后启动kakfa生产者,向static_online这个topic发生数据

    from kafka import KafkaProducer
    from kafka.errors import kafka_errors
    import traceback
    import json
    bootstrap_servers = ['hadoop102:9092','hadoop103:9092','hadoop104:9092']
    
    def producer_demo():
        # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
        producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            key_serializer=lambda k: json.dumps(k).encode(),
            value_serializer=lambda v: json.dumps(v).encode())
        # 发送三条消息
        for i in range(0, 300):
            future = producer.send(
                'static_online',
                key='count_num',  # 同一个key值,会被送至同一个分区
                value=str(i) + "@qq.22com",
                partition=1)  # 向分区1发送消息
            print("send {}".format(str(i) + "@qq.22com")
            try:
                future.get(timeout=10) # 监控是否发送成功
            except kafka_errors:  # 发送失败抛出kafka_errors
                traceback.format_exc()
    
    if __name__ == '__main__':
        producer_demo()
    

    参考资料

    1、基于PySpark整合Spark Streaming与Kafka

  • 相关阅读:
    转 linux下ClamAV使用
    oraagent.bin High Memory Usage as Dependent Listener was Removed/Renamed
    转 zabbix 优化方法 以及 后台数据库查询方法 两则
    转 mysql awr 报告
    转 zabbix 优化方法 以及数据库查询方法 两则
    转 检查rac服务时,发现ons服务offline
    转:HR schema
    Spring MVC初始化
    Spring MVC入门的实例
    Spring MVC 设计概述
  • 原文地址:https://www.cnblogs.com/leimu/p/15434664.html
Copyright © 2011-2022 走看看