zoukankan      html  css  js  c++  java
  • kafka+SparkStreaming以及Kafka+Kerberos+Sentry+SparkStreaming代码调试问题与分析

    【调试背景】
    目前测试kafka集群有两套,版本为 0.10.x。有一套是添加了Kerberos+Sentry认证,另一套没有添加。
    现在需要通过sparkStreaming接入kafka做实时分析。
    【总体结论】
    实验1:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,无Kerberos+Sentry认证,用createStream,可以从zk中获取broker,接入成功;
    实验2:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,有Kerberos+Sentry认证,用createStream,无法zk中获取broker,接入失败,报空指针;
    实验3:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,有Kerberos+Sentry认证,用createDirectStream,直接设置broker,接入失败,报EOFException;
    实验4:2.1.x版本spark的jar包,010版本的spark-streaming-kafka,有Kerberos+Sentry认证,用createDirectStream,直接设置broker,需要修改“KafkaUtils”源码,接入成功;
    PS     :2.1.x版本spark的jar包,010版本的spark-streaming-kafka,无Kerberos+Sentry认证,用createDirectStream,直接设置broker,接入成功;
    【实验1】可以正常运行
    (1)kafka环境:无Kerberos+Sentry认证
    (2)使用jar包:

     
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>1.6.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>1.6.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.10</artifactId>
                <version>1.6.1</version>
            </dependency>
    (3)核心代码:
    package com.xx.kafka;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    import scala.Tuple2;
    
    public class PrintRecMsg {
        
        public static void main(String[] args) {
     
            Map<String, Integer> topicmap = new HashMap<>();
            topicmap.put(args[0], 2);
            SparkConf sparkConf = new SparkConf().setAppName("PrintRecMsg").setMaster("local[2]");
    
            final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
            String zkQuorum ="host1:2181,host2:2181,host3:2181";
            String group = "mygroup";
            
            JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
    
            JavaDStream<String> msg = lines.map(new Function<Tuple2<String,String>, String>() {
    
                @Override
                public String call(Tuple2<String, String> tuple2) throws Exception {
                    return tuple2._1() + "," + tuple2._2();
                }
            });
            msg.print(20);
            jssc.start();
            jssc.awaitTermination();
        }
    }

     

     
    (4)分析:
    创建的流只需要三个和Kafka有关的参数:zk集群地址,消费者组,topicMap。
    createStream是走的Zookeeper去获取对应集群broker信息,然后进行消费。
     
    【实验2】无法运行
    (1)kafka环境:kafka+Kerberos+Sentry认证
    (2)使用jar包:(同上)
    (3)核心代码:由于需要添加额外的kafka参数,因此采用了另一个“createStream”的重载方法。
    public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]"); 
            final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); 
    
            Map<String, Integer> topicmap = new HashMap<>();
            topicmap.put("gaoweiurl", 2);
            
            HashMap<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("zookeeper.connect", "host1:2181,host2:2181,host3:2181";);
            kafkaParams.put("group.id", "mygroup");
            kafkaParams.put("auto.offset.reset", "largest");
            /** 以下是和Kerberos+sentry认证相关  **/
            kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
            kafkaParams.put("sasl.mechanism", "GSSAPI");
            kafkaParams.put("sasl.kerberos.service.name", "kafka");
            System.setProperty("java.security.auth.login.config", "/xx/xx/kafka-jaas.conf");
            System.setProperty("java.security.krb5.conf", "/xx/xx/krb5.conf");
            
            JavaPairInputDStream<String, String> lines = KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap, StorageLevel.MEMORY_AND_DISK_SER_2());
            
            JavaDStream<String> msg = lines.map(new Function<Tuple2<String, String>, String>() {
    
                @Override
                public String call(Tuple2<String, String> tuple2) throws Exception {
                    return tuple2._1() + "," + tuple2._2();
                }
            });
            msg.print(20);
            jssc.start();
            jssc.awaitTermination();
        } 
     (4)结果及分析:
    (4)问题及分析
     
     
     
    Zookeeper可以正常连接,而且日志显示,已经成功通过Kerberos+sentry认证。
    但是在开始消费消息的时候,一直报一个错误:
    通过异常堆栈一行行的查找源码,首先:
    ==>“at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)”
    抛出了空指针异常。进入代码发现:
    说明传入host为空。
    ==>“at kafka.cluster.Broker.connectionString(Broker.scala:62)”
    说明是创建Broker对象的时候,调用“connectionString(host,port)”出现的。
    查看这个方法,是Broker的一个成员方法
    创建Broker的时候,入参host为空,那么是谁创建Broker对象呢?
    ==>“at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)”
     
    这行代码说明,是有人先创建了“brokers”,然后在执行“fetchTopicMetadata”方法的时候,
    执行broker.map(_.connectionString),而brokers的“host”为空,所以空指针异常。
    这里的结果已经明确了,但是,还没有找到brokers对象创建的地方。
    异常堆栈在这个地方的时候就断了。剩下的就是找到brokers的创建位置。
    接下来从下往上看异常:
    ==>“at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)”
    线程“LeaderFinderThread”启动之后,开始执行doWork()方法。
    然后创建了brokers,然后通过“ClientUtlis.fetchTopicMetadata”触发了前面的空指针错误。
    那么“getAllBrokersInCluster(zkClient)”方法是怎么生成brokers的呢?
    首先,这个例子是通过Zookeeper获取broker的,这个地方基本上可以确定是通过zk获取kafka的broker信息。
    通过
    进入“getAllBrokersInCluster”方法:
    其中,BrokerIdspath为:
    接下来,
    总之,就是会读取“brokers/ids/id”中的信息,返回brokerInfo,然后创建broker。
    回到Broker,进入“createBroker”方法查看:
    最终,看看配置了Kerberos+Sentry的kafka的broker信息吧,
    呵呵,比较下不加Kerberos+sentry的kafka的broker信息:
    呵呵。
     
    【实验3】无法运行
    (1)kafka环境:kafka+Kerberos+Sentry认证
    (2)使用jar包:(同上)
    (3)核心代码:使用createDirectStream,不用Zookeeper,直接连接broker。
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    import kafka.serializer.StringDecoder;
    import scala.Tuple2;
    
    public class PrintDirectMsgDirect {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
            final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
    
            int numThreads = 2;
            Map<String, Integer> topicmap = new HashMap<>();
            topicmap.put("gaoweiurl", numThreads);
            Set<String> topicSet = new HashSet<>();
            topicSet.add("gaoweiurl");
            
            HashMap<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("bootstrap.servers", "node86:9092,node99:9092,node101:9092");
            kafkaParams.put("metadata.broker.list", "node86:9092,node99:9092,node101:9092");
            kafkaParams.put("group.id", "spark-executor-kafka_shjs_wlpt");
            kafkaParams.put("auto.offset.reset", "smallest");
            kafkaParams.put("enable.auto.commit", "true");
            kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
            kafkaParams.put("sasl.mechanism", "GSSAPI");
            kafkaParams.put("sasl.kerberos.service.name", "kafka");
            System.setProperty("java.security.auth.login.config", "D:\Kerberos\kafka-jaas.conf");
            System.setProperty("java.security.krb5.conf", "D:\Kerberos\krb5.conf");
            JavaPairInputDStream<String, String> lines= KafkaUtils.createDirectStream(jssc,  String.class, String.class,
                    StringDecoder.class, StringDecoder.class, kafkaParams, topicSet);
        
            JavaDStream<String> msg = lines.map(new Function<Tuple2<String, String>, String>() {
                @Override
                public String call(Tuple2<String, String> tuple2) throws Exception {
                    return tuple2._1() + "," + tuple2._2();
                }
            });
            msg.print(20);
            jssc.start();
            jssc.awaitTermination();
        }
    }
    (4)结果及分析
    然后,百度了下,呵呵。
    Spark的1.x版本不支持“Secure Kafka”。
    【实验4】失败后成功
    换了高版本的spark和sparkStreaming和sparkStreamingKafka的jar包
    (1)kafka环境:kafka+Kerberos+Sentry认证
    (2)使用jar包:全部是高版本
     <spark.version>2.1.0</spark.version>
            <spark-streaming-kafka.version>2.1.0</spark-streaming-kafka.version> 
    <!-- spark core 核心依赖包 -->
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.10</artifactId>
                    <version>${spark.version}</version>
                </dependency>
    
                <!-- spark streaming 依赖包 -->
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming_2.10</artifactId>
                    <version>${spark.version}</version>
                </dependency>
    
                <!-- spark streaming kafka 依赖包 -->
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
                    <version>${spark-streaming-kafka.version}</version>
                </dependency>
    (3)核心代码:
    package com.ustcinfo.ishare.bdp.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    /**
     * @Description: 所有spark任务的总入口
     * @author: Beethoven.S
     * @date: 2017/9/14 13:50 
     * @e-mail: sheng.gang@ustcinfo.com
     */
    
    object sparkJobExecutor {
    
        /** krb5.conf配置文件 **/
        val KRB5_CONF: String = "D:\Kerberos\krb5.conf"
        /** JAAS配置文件 **/
        val KAFKA_JAAS_CONF: String = "D:\Kerberos\kafka-jaas.conf"
        /** kafka broker地址,多个broker用逗号分开 **/
        val KAFKA_BROKERS: String = "node86:9092,node99:9092,node101:9092"
    
        def main(args: Array[String]) {
            /** 添加Kerberos认证所需的JAAS配置文件到运行时环境 **/
            System.setProperty("java.security.auth.login.config", KAFKA_JAAS_CONF)
            /** 添加krb5配置文件到运行时环境 **/
            System.setProperty("java.security.krb5.conf", KRB5_CONF)
    
            val sparkConf = new SparkConf().setMaster("local[4]").setAppName("sparkStreaming")
            val streamingContext = new StreamingContext(sparkConf, Seconds(10))
            val stream = KafkaUtils.createDirectStream[String, String](
                streamingContext,
                PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](Array("gaoweiurl"), Map(
                    "bootstrap.servers" -> "node86:9092,node99:9092,node101:9092",
                    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                    "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                    "group.id" -> "kafka_shjs_wlpt",
                    "enable.auto.commit" -> "true",
                    "auto.offset.reset" -> "earliest",
                    "sasl.kerberos.service.name" -> "kafka",
                    "security.protocol" -> "SASL_PLAINTEXT"
                ))
            )
            stream.foreachRDD(kv => {
                println("============> ")
                kv.foreach(x => println("RDD==> " + x))
            })
    
    
            streamingContext.start()
            streamingContext.awaitTermination()
        }
    }
     
    然后,在没有数据写入的时候,很正常,但是一旦开始接收数据之后,就会出现如下错误:
    没错,sparkStreaming开始监听并且连接的时候,用的消费者组ID确实是我代码中配置的:
    但是,一旦开始接收消息,通过RDD读取数据的时候,groupId居然被自动添加了“spark-executor-”的前缀。
    (我这个开始设置的“spark-executor-kafka_shjs_wlpt”这个就是看看是不是有了前缀就不再加的,结果,
    还是自动的添加了前缀,成了“spark-executor-spark-executor-kafka_shjs_wlpt”)。
    然后,在spark运行的时候发现了这句话:
     
    找到“KafkaUtils”代码中,追踪到万恶之源:
    而且是,只要收到消息,创建RDD的时候会这么干:
     
    解决方式就是覆盖这个源码:
     
    注意覆盖的时候,包的名称路径必须要和源码路径一模一样,否则会出现scala的私有依赖引用问题。
    然后再次,执行:
    kafka的数据可以正常读取。
     
     
  • 相关阅读:
    Extjs 动态改变列名
    ext+dwr DynamicGridPanel 封装 态创建ext grid
    extjs 动态表单模板二
    extjs 动态表单模板
    EXTJS 学习笔记(共17项,20090924更新)
    Ext核心API详解Ext.tree.TreePanel
    ExtJS GridPanel动态加载列
    ExtJs之combobox详解
    在 LotusScript 中为自定义对象模拟事件
    Firefox下如何将回车键转为Tab键
  • 原文地址:https://www.cnblogs.com/shenggang/p/7997792.html
Copyright © 2011-2022 走看看