name := "test" version := "0.0.2" scalaVersion := "2.11.8" val sparkVersion = "2.2.0" libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"// % Provided libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10-assembly_2.11" % "2.2.0" //% Provided libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"// % Provided
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.streaming.Seconds import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.SparkConf object stu_live_test { def main(args:Array[String]){ val conf = new SparkConf().setAppName("test") val ssc = new StreamingContext(conf, Seconds(2)) println("hello") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> ip, "group.id" -> "test_kafka1102", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) // "partition.assignment.strategy" -> "range" ); val tops = "artsuseronline" val topics = tops.split(",").toSet println(topics); val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics,kafkaParams)); println("****************************9999"); val lines = stream.map(record => record.toString()); // println(lines); lines.print(); println("dfsdfsdf"); ssc.start(); ssc.awaitTermination(); } }
以上是sbt代码封装以及scala代码,sbt打包后即可运行。
环境spark2.2.0 scala 2.11.8 kafka 0.10
中间遇到问题如下:
Exception in thread “streaming-start” java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
搜索之后反馈是版本冲突问题
搜到的解决办法有:export SPARK_KAFKA_VERSION=0.10 增加kafka-client-10的依赖等等。
spark/jars中之前链接kafka-08下载了spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar
将这个删掉,增加了spark-streaming-kafka-0-10_2.11-2.2.0.jar,运行失败,增加kafka_client_10.jar包,运行失败
删掉以上Jar包,增加spark-streaming-kafka-0-10-assembly_2.11-2.2.0.jar。运行出数据,但是显示序列化失败,于是在Map转换中将输出强制类型转换成string,输出成功。
当kafka 8 与 10同时存在时,调用8运行没有问题,10有问题。
streaming-kafka jar包应单独调用,不能存在于spark/jar环境中
以上是未加sasl认证的kafka0.10获取代码
以下是增加sasl认证代码
需要额外增加kafka_client_jass.conf文件,在文件中加入用户密码信息
如下:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="name" password="psw"; };
kafkaparam增加如下:
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> ip, "group.id" -> "test_kafka11", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "auto.offset.reset" -> "latest", "sasl.plain.username" -> name, "sasl.plain.password" -> psw, "security.protocol" -> "SASL_PLAINTEXT", "sasl.mechanism" -> "PLAIN", "enable.auto.commit" -> (false: java.lang.Boolean) );
提交中增加conf
spark-submit --class scala_class_name --driver-java-options="-Djava.security.auth.login.config=kafka_client_jass.conf" xxxxx.jar