zoukankan      html  css  js  c++  java
  • spark-streaming-kafka交互问题

    name := "test"
    version := "0.0.2"
    scalaVersion := "2.11.8"
    val sparkVersion = "2.2.0"
    libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"// % Provided
    libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10-assembly_2.11" % "2.2.0" //% Provided
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"// % Provided
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.streaming.Seconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.SparkConf
    
    object stu_live_test {
        def main(args:Array[String]){
            val conf = new SparkConf().setAppName("test")
            val ssc = new StreamingContext(conf, Seconds(2))
            println("hello")
            val kafkaParams = Map[String, Object](
                "bootstrap.servers" -> ip,
                "group.id" -> "test_kafka1102",
                "key.deserializer" -> classOf[StringDeserializer],
                "value.deserializer" -> classOf[StringDeserializer],
                "auto.offset.reset" -> "latest",
                "enable.auto.commit" -> (false: java.lang.Boolean)
       //         "partition.assignment.strategy" -> "range"
    
            );
            val tops = "artsuseronline"
            val topics = tops.split(",").toSet
            println(topics);
    
            val stream = KafkaUtils.createDirectStream[String, String](
                ssc,
                PreferConsistent,
                Subscribe[String, String](topics,kafkaParams));
            println("****************************9999");
            val lines = stream.map(record => record.toString());
      //      println(lines);
            lines.print();
            println("dfsdfsdf");
            ssc.start();
            ssc.awaitTermination();
    
        }
    }

    以上是sbt代码封装以及scala代码,sbt打包后即可运行。

    环境spark2.2.0 scala 2.11.8 kafka 0.10

    中间遇到问题如下:

    Exception in thread “streaming-start” java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

     搜索之后反馈是版本冲突问题

    搜到的解决办法有:export SPARK_KAFKA_VERSION=0.10  增加kafka-client-10的依赖等等。

    spark/jars中之前链接kafka-08下载了spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar

    将这个删掉,增加了spark-streaming-kafka-0-10_2.11-2.2.0.jar,运行失败,增加kafka_client_10.jar包,运行失败

    删掉以上Jar包,增加spark-streaming-kafka-0-10-assembly_2.11-2.2.0.jar。运行出数据,但是显示序列化失败,于是在Map转换中将输出强制类型转换成string,输出成功。

    当kafka 8 与 10同时存在时,调用8运行没有问题,10有问题。

    streaming-kafka jar包应单独调用,不能存在于spark/jar环境中

    以上是未加sasl认证的kafka0.10获取代码

    以下是增加sasl认证代码

    需要额外增加kafka_client_jass.conf文件,在文件中加入用户密码信息

    如下:

    KafkaClient {
            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="name"
            password="psw";
    };

    kafkaparam增加如下:

            val kafkaParams = Map[String, Object](
                "bootstrap.servers" -> ip,
                "group.id" -> "test_kafka11",
                "key.deserializer" -> classOf[StringDeserializer],
                "value.deserializer" -> classOf[StringDeserializer],
                "auto.offset.reset" -> "latest",
                "sasl.plain.username" -> name,
                "sasl.plain.password" -> psw,
                "security.protocol" -> "SASL_PLAINTEXT",
                "sasl.mechanism" -> "PLAIN",
                "enable.auto.commit" -> (false: java.lang.Boolean)
    
            );

    提交中增加conf

    spark-submit --class scala_class_name --driver-java-options="-Djava.security.auth.login.config=kafka_client_jass.conf" xxxxx.jar

  • 相关阅读:
    面向对象与面向过程
    mul 指令
    Debug 的使用
    子域名扫描器
    div 指令
    C Primer Plus学习笔记(十三)- 结构和其他数据形式
    C Primer Plus学习笔记(十一)- 存储类别、链接和内存管理
    BugkuCTF WEB
    BugkuCTF 逆向
    C Primer Plus学习笔记(十)- 字符串和字符串函数
  • 原文地址:https://www.cnblogs.com/supermanwx/p/9916643.html
Copyright © 2011-2022 走看看