【调试背景】
目前测试kafka集群有两套,版本为 0.10.x。有一套是添加了Kerberos+Sentry认证,另一套没有添加。
现在需要通过sparkStreaming接入kafka做实时分析。
【总体结论】
实验1:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,无Kerberos+Sentry认证,用createStream,可以从zk中获取broker,接入成功;
实验2:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,有Kerberos+Sentry认证,用createStream,无法zk中获取broker,接入失败,报空指针;
实验3:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,有Kerberos+Sentry认证,用createDirectStream,直接设置broker,接入失败,报EOFException;
实验4:2.1.x版本spark的jar包,010版本的spark-streaming-kafka,有Kerberos+Sentry认证,用createDirectStream,直接设置broker,需要修改“KafkaUtils”源码,接入成功;
PS :2.1.x版本spark的jar包,010版本的spark-streaming-kafka,无Kerberos+Sentry认证,用createDirectStream,直接设置broker,接入成功;
PS :2.1.x版本spark的jar包,010版本的spark-streaming-kafka,无Kerberos+Sentry认证,用createDirectStream,直接设置broker,接入成功;
【实验1】可以正常运行
(1)kafka环境:无Kerberos+Sentry认证
(2)使用jar包:
(3)核心代码:
package com.xx.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class PrintRecMsg {
public static void main(String[] args) {
Map<String, Integer> topicmap = new HashMap<>();
topicmap.put(args[0], 2);
SparkConf sparkConf = new SparkConf().setAppName("PrintRecMsg").setMaster("local[2]");
final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
String zkQuorum ="host1:2181,host2:2181,host3:2181";
String group = "mygroup";
JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
JavaDStream<String> msg = lines.map(new Function<Tuple2<String,String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) throws Exception {
return tuple2._1() + "," + tuple2._2();
}
});
msg.print(20);
jssc.start();
jssc.awaitTermination();
}
}
(4)分析:
创建的流只需要三个和Kafka有关的参数:zk集群地址,消费者组,topicMap。
createStream是走的Zookeeper去获取对应集群broker信息,然后进行消费。
【实验2】无法运行
(1)kafka环境:kafka+Kerberos+Sentry认证
(2)使用jar包:(同上)
(3)核心代码:由于需要添加额外的kafka参数,因此采用了另一个“createStream”的重载方法。
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
Map<String, Integer> topicmap = new HashMap<>();
topicmap.put("gaoweiurl", 2);
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", "host1:2181,host2:2181,host3:2181";);
kafkaParams.put("group.id", "mygroup");
kafkaParams.put("auto.offset.reset", "largest");
/** 以下是和Kerberos+sentry认证相关 **/
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("sasl.mechanism", "GSSAPI");
kafkaParams.put("sasl.kerberos.service.name", "kafka");
System.setProperty("java.security.auth.login.config", "/xx/xx/kafka-jaas.conf");
System.setProperty("java.security.krb5.conf", "/xx/xx/krb5.conf");
JavaPairInputDStream<String, String> lines = KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaDStream<String> msg = lines.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) throws Exception {
return tuple2._1() + "," + tuple2._2();
}
});
msg.print(20);
jssc.start();
jssc.awaitTermination();
}
(4)问题及分析
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101733441-924180974.png)
Zookeeper可以正常连接,而且日志显示,已经成功通过Kerberos+sentry认证。
但是在开始消费消息的时候,一直报一个错误:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101734191-82989956.png)
通过异常堆栈一行行的查找源码,首先:
==>“at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)”
抛出了空指针异常。进入代码发现:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101734363-1000379096.png)
说明传入host为空。
==>“at kafka.cluster.Broker.connectionString(Broker.scala:62)”
说明是创建Broker对象的时候,调用“connectionString(host,port)”出现的。
查看这个方法,是Broker的一个成员方法
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101734613-1137618582.png)
创建Broker的时候,入参host为空,那么是谁创建Broker对象呢?
==>“at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)”
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101734753-1335459370.png)
这行代码说明,是有人先创建了“brokers”,然后在执行“fetchTopicMetadata”方法的时候,
执行broker.map(_.connectionString),而brokers的“host”为空,所以空指针异常。
这里的结果已经明确了,但是,还没有找到brokers对象创建的地方。
异常堆栈在这个地方的时候就断了。剩下的就是找到brokers的创建位置。
接下来从下往上看异常:
==>“at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)”
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101734925-1578014910.png)
线程“LeaderFinderThread”启动之后,开始执行doWork()方法。
然后创建了brokers,然后通过“ClientUtlis.fetchTopicMetadata”触发了前面的空指针错误。
那么“getAllBrokersInCluster(zkClient)”方法是怎么生成brokers的呢?
首先,这个例子是通过Zookeeper获取broker的,这个地方基本上可以确定是通过zk获取kafka的broker信息。
通过
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101735034-81512843.png)
进入“getAllBrokersInCluster”方法:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101735144-1929369646.png)
其中,BrokerIdspath为:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101735378-1120463696.png)
接下来,
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101735488-1272368281.png)
总之,就是会读取“brokers/ids/id”中的信息,返回brokerInfo,然后创建broker。
回到Broker,进入“createBroker”方法查看:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101735644-1932647216.png)
最终,看看配置了Kerberos+Sentry的kafka的broker信息吧,
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101735784-1704496152.png)
呵呵,比较下不加Kerberos+sentry的kafka的broker信息:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101735894-914756410.png)
呵呵。
【实验3】无法运行
(1)kafka环境:kafka+Kerberos+Sentry认证
(2)使用jar包:(同上)
(3)核心代码:使用createDirectStream,不用Zookeeper,直接连接broker。
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class PrintDirectMsgDirect {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
int numThreads = 2;
Map<String, Integer> topicmap = new HashMap<>();
topicmap.put("gaoweiurl", numThreads);
Set<String> topicSet = new HashSet<>();
topicSet.add("gaoweiurl");
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("bootstrap.servers", "node86:9092,node99:9092,node101:9092");
kafkaParams.put("metadata.broker.list", "node86:9092,node99:9092,node101:9092");
kafkaParams.put("group.id", "spark-executor-kafka_shjs_wlpt");
kafkaParams.put("auto.offset.reset", "smallest");
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("sasl.mechanism", "GSSAPI");
kafkaParams.put("sasl.kerberos.service.name", "kafka");
System.setProperty("java.security.auth.login.config", "D:\Kerberos\kafka-jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\Kerberos\krb5.conf");
JavaPairInputDStream<String, String> lines= KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicSet);
JavaDStream<String> msg = lines.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) throws Exception {
return tuple2._1() + "," + tuple2._2();
}
});
msg.print(20);
jssc.start();
jssc.awaitTermination();
}
}
(4)结果及分析
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101736159-1807974586.png)
然后,百度了下,呵呵。
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101736394-1846275050.png)
Spark的1.x版本不支持“Secure Kafka”。
【实验4】失败后成功
换了高版本的spark和sparkStreaming和sparkStreamingKafka的jar包
(1)kafka环境:kafka+Kerberos+Sentry认证
(2)使用jar包:全部是高版本
<spark.version>2.1.0</spark.version>
<spark-streaming-kafka.version>2.1.0</spark-streaming-kafka.version>
<!-- spark core 核心依赖包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark streaming 依赖包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark streaming kafka 依赖包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>${spark-streaming-kafka.version}</version>
</dependency>
(3)核心代码:
package com.ustcinfo.ishare.bdp.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
/**
* @Description: 所有spark任务的总入口
* @author: Beethoven.S
* @date: 2017/9/14 13:50
* @e-mail: sheng.gang@ustcinfo.com
*/
object sparkJobExecutor {
/** krb5.conf配置文件 **/
val KRB5_CONF: String = "D:\Kerberos\krb5.conf"
/** JAAS配置文件 **/
val KAFKA_JAAS_CONF: String = "D:\Kerberos\kafka-jaas.conf"
/** kafka broker地址,多个broker用逗号分开 **/
val KAFKA_BROKERS: String = "node86:9092,node99:9092,node101:9092"
def main(args: Array[String]) {
/** 添加Kerberos认证所需的JAAS配置文件到运行时环境 **/
System.setProperty("java.security.auth.login.config", KAFKA_JAAS_CONF)
/** 添加krb5配置文件到运行时环境 **/
System.setProperty("java.security.krb5.conf", KRB5_CONF)
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("sparkStreaming")
val streamingContext = new StreamingContext(sparkConf, Seconds(10))
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("gaoweiurl"), Map(
"bootstrap.servers" -> "node86:9092,node99:9092,node101:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "kafka_shjs_wlpt",
"enable.auto.commit" -> "true",
"auto.offset.reset" -> "earliest",
"sasl.kerberos.service.name" -> "kafka",
"security.protocol" -> "SASL_PLAINTEXT"
))
)
stream.foreachRDD(kv => {
println("============> ")
kv.foreach(x => println("RDD==> " + x))
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
然后,在没有数据写入的时候,很正常,但是一旦开始接收数据之后,就会出现如下错误:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101737097-36679807.png)
没错,sparkStreaming开始监听并且连接的时候,用的消费者组ID确实是我代码中配置的:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101737284-1868129524.png)
但是,一旦开始接收消息,通过RDD读取数据的时候,groupId居然被自动添加了“spark-executor-”的前缀。
(我这个开始设置的“spark-executor-kafka_shjs_wlpt”这个就是看看是不是有了前缀就不再加的,结果,
还是自动的添加了前缀,成了“spark-executor-spark-executor-kafka_shjs_wlpt”)。
然后,在spark运行的时候发现了这句话:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101737456-974031613.png)
找到“KafkaUtils”代码中,追踪到万恶之源:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101737691-505683269.png)
而且是,只要收到消息,创建RDD的时候会这么干:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101737894-170506199.png)
解决方式就是覆盖这个源码:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101738175-1618008308.png)
注意覆盖的时候,包的名称路径必须要和源码路径一模一样,否则会出现scala的私有依赖引用问题。
然后再次,执行:
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101738363-1971698453.png)
kafka的数据可以正常读取。
![](https://images2017.cnblogs.com/blog/1076786/201712/1076786-20171207101738534-619728863.png)