zoukankan      html  css  js  c++  java
  • spark-streaming-kafka交互问题

    name := "test"
    version := "0.0.2"
    scalaVersion := "2.11.8"
    val sparkVersion = "2.2.0"
    libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"// % Provided
    libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10-assembly_2.11" % "2.2.0" //% Provided
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"// % Provided
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.streaming.Seconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.SparkConf
    
    object stu_live_test {
        def main(args:Array[String]){
            val conf = new SparkConf().setAppName("test")
            val ssc = new StreamingContext(conf, Seconds(2))
            println("hello")
            val kafkaParams = Map[String, Object](
                "bootstrap.servers" -> ip,
                "group.id" -> "test_kafka1102",
                "key.deserializer" -> classOf[StringDeserializer],
                "value.deserializer" -> classOf[StringDeserializer],
                "auto.offset.reset" -> "latest",
                "enable.auto.commit" -> (false: java.lang.Boolean)
       //         "partition.assignment.strategy" -> "range"
    
            );
            val tops = "artsuseronline"
            val topics = tops.split(",").toSet
            println(topics);
    
            val stream = KafkaUtils.createDirectStream[String, String](
                ssc,
                PreferConsistent,
                Subscribe[String, String](topics,kafkaParams));
            println("****************************9999");
            val lines = stream.map(record => record.toString());
      //      println(lines);
            lines.print();
            println("dfsdfsdf");
            ssc.start();
            ssc.awaitTermination();
    
        }
    }

    以上是sbt代码封装以及scala代码,sbt打包后即可运行。

    环境spark2.2.0 scala 2.11.8 kafka 0.10

    中间遇到问题如下:

    Exception in thread “streaming-start” java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

     搜索之后反馈是版本冲突问题

    搜到的解决办法有:export SPARK_KAFKA_VERSION=0.10  增加kafka-client-10的依赖等等。

    spark/jars中之前链接kafka-08下载了spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar

    将这个删掉,增加了spark-streaming-kafka-0-10_2.11-2.2.0.jar,运行失败,增加kafka_client_10.jar包,运行失败

    删掉以上Jar包,增加spark-streaming-kafka-0-10-assembly_2.11-2.2.0.jar。运行出数据,但是显示序列化失败,于是在Map转换中将输出强制类型转换成string,输出成功。

    当kafka 8 与 10同时存在时,调用8运行没有问题,10有问题。

    streaming-kafka jar包应单独调用,不能存在于spark/jar环境中

    以上是未加sasl认证的kafka0.10获取代码

    以下是增加sasl认证代码

    需要额外增加kafka_client_jass.conf文件,在文件中加入用户密码信息

    如下:

    KafkaClient {
            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="name"
            password="psw";
    };

    kafkaparam增加如下:

            val kafkaParams = Map[String, Object](
                "bootstrap.servers" -> ip,
                "group.id" -> "test_kafka11",
                "key.deserializer" -> classOf[StringDeserializer],
                "value.deserializer" -> classOf[StringDeserializer],
                "auto.offset.reset" -> "latest",
                "sasl.plain.username" -> name,
                "sasl.plain.password" -> psw,
                "security.protocol" -> "SASL_PLAINTEXT",
                "sasl.mechanism" -> "PLAIN",
                "enable.auto.commit" -> (false: java.lang.Boolean)
    
            );

    提交中增加conf

    spark-submit --class scala_class_name --driver-java-options="-Djava.security.auth.login.config=kafka_client_jass.conf" xxxxx.jar

  • 相关阅读:
    Spring@Profile注解
    day 32 子进程的开启 及其用法
    day 31 udp 协议SOCK_DGRAM
    day 30 客户端获取cmd 命令的步骤
    day 29 socket 理论
    day 29 socket 初级版
    有关 组合 继承
    day 27 多态 接口 类方法 静态方法 hashlib 摘要算法模块
    新式类和经典类的区别
    day 28 hasattr getattr serattr delattr 和带__内置__ 类的内置方法
  • 原文地址:https://www.cnblogs.com/supermanwx/p/9916643.html
Copyright © 2011-2022 走看看