zoukankan      html  css  js  c++  java
  • spark streaming连接kafka引发"partition.assignment.strategy"异常处理

      服务器运行环境:spark 2.4.4 + scall 2.11.12 + kafka 2.2.2

      由于业务相对简单,kafka只有固定topics,所以一直使用下面脚本执行实时流计算

    spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 --py-files /data/service/xxx.zip /data/service/xxx.py

      代码中使用pyspark.streaming.kafka的KafkaUtils来创建spark streaming与kafka的连接,运行了好长时间都没有出现过问题

      随着新业务接入,在新功能中kafka需要使用动态topics方式,要用到正则表达式,查了KafkaUtils源码和相关资料,发现它不支持动态topics方式,需要使用spark-streaming-kafka-0-10才能支持

      查看文档http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html 与 http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 后,使用结构化流structured-streaming来实现

      实现代码:

    import sys
    
    from pyspark.sql import SparkSession
    
    def process_row(row):
        # Write row to storage
        pass
    
    if __name__ == "__main__":
        if len(sys.argv) != 4:
            print("""
            Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
            """, file=sys.stderr)
            sys.exit(-1)
    
        bootstrapServers = sys.argv[1]
        subscribeType = sys.argv[2]
        topics = sys.argv[3]
    
        spark = SparkSession
            .builder
            .appName("StructuredKafkaWordCount")
            .getOrCreate()
    
        # Create DataSet representing the stream of input lines from kafka
        ds = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option(subscribeType, topics)
            .load()
            .selectExpr("CAST(value AS STRING)")
    
        ds.printSchema()
        query = ds.writeStream.foreach(process_row).start()
        query.awaitTermination()

      执行提交任务命令

    spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 /data/service/demo.py master:9092 subscribePattern event.log.*

      提交后一直报下面错误

    org.apache.kafka.common.config.ConfigException: 
    Missing required configuration "partition.assignment.strategy" which has no default value

      查了好多资料,都说需要添加参数,配置Kafka分区分配策略,并将readStream修改为:

        ds = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option("kafka.partition.assignment.strategy", "range")
            .option(subscribeType, topics)
            .load()

      再次运行异常信息改为无法连接kafka了,弄了整整一天人都快崩溃了还没搞定

      还好最终查找https://xbuba.com/questions/44959483,大牛提示说有可能是kafka0.8版本的jar与kafka0.10的jar冲突原因造成的

      使用命令查找

    find / -name 'spark-streaming-kafka*'
    find / -name 'spark-sql-kafka*'

      发现在/root/.ivy2/cache/org.apache.spark/ 目录下面存在spark-streaming-kafka-0-8_2.11 与 spark-sql-kafka-0-10_2.11 文件夹和相关的jar文件

      将spark-streaming-kafka-0-8_2.11删除后执行代码就正常运行了

      由于老脚本用的还是kafka0.8,为了兼容两个版本能同时运行,需要将/root/.ivy2/cache/org.apache.spark/ 目录下面kafka0.8与kafka0.10两个版本的jar全部清除

      然后登录https://repo1.maven.org/maven2/org/apache/spark/ 下载spark-streaming-kafka-0-8与spark-sql-kafka-0-10对应的jar下来,并将提交命令spark-submit的参数改为:

    spark-submit --jars /data/service/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar --py-files /data/service/xxx.zip /data/service/xxx.py
    spark-submit --jars /data/service/spark-sql-kafka-0-10_2.11-2.4.4.jar /data/service/demo.py master:9092 subscribePattern event.log.*

      修改后两个脚本运行都没有问题(PS:老脚本原想直接用org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4包来启动,执行后直接暴错,提示说要改为org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4才行)

  • 相关阅读:
    Python 集合
    Python sorted()
    CodeForces 508C Anya and Ghosts
    CodeForces 496B Secret Combination
    CodeForces 483B Friends and Presents
    CodeForces 490C Hacking Cypher
    CodeForces 483C Diverse Permutation
    CodeForces 478C Table Decorations
    CodeForces 454C Little Pony and Expected Maximum
    CodeForces 313C Ilya and Matrix
  • 原文地址:https://www.cnblogs.com/EmptyFS/p/12516039.html
Copyright © 2011-2022 走看看