zoukankan      html  css  js  c++  java
  • 大数据面试(kafka)

    kfaka

    1.怎么解决kafka数据丢失的问题?

    kafka有两种发送数据的模式,异步和同步,默认选择的是同步发送消息。

    同步:在同步模式如果ack消息确认机制为1只保证主节点写入成功,在进行主从复制如果主节点宕机,从节点将没有数据,数据就会丢失。

    所以设置ack消息确认机制为-1,消息写入主节点和从节点才算成功。

    异步:在异步模式当缓冲区满了,如果ack=0就会清空缓冲池消息。

    所以在kafka配置文件设置成不限制阻塞超时时间,一直等待就保证数据不会丢失。

    同步和异步区别:

    同:发送消息到分区再进行主从复制,客户端收到服务器确认。

    异:发送消息到缓冲区,然后写入集群中,速度比较快。

    ack=0只发送一次,不管是否成功。

    ack=1主写入就成功。

    ack=-1主副本都写入成功。

    2.kafka消费组?

    消息组里面有多个消费者,消费topic下面所有分区,

    每个分区只能由同一个消费组内一个消费者来消费。

    3.kafka为什么高吞吐量。

    1)顺序读写,不断追加到文件。

    2)零拷贝,数据从内核到内核再到网卡。

    3)分区,并行消费。

    4)批量发送消息。

    5)数据压缩,生产者对数据进行压缩,消费者解压。

    4.zookeeper在kafka中的作用?

     Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集(AR)、主副本(leader)、同步的副本集(ISR))。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做出不同的相应处理。

    5.kafka为什么是有序的?

    1)kafka默认保证同一个分区内消息是有序的,设置topic只用一个分区这样就可以保证全局消息有序,但是并发量低。

    2)指定消息发送到同一个分区,保证消息有序。

    6. sparkStreaming kafka保证数据不丢失(at-least)?

    spark streaming流式处理kafka中的数据,首先是把数据接收过来,然后转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:

     1.利用Receiver接收数据。

     此方法使用Receiver接收数据。Receiver是使用Kafka高阶API接口实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。

     在默认配置下,此方法可能会在失败时丢失数据。为确保零数据丢失,必须在Spark Streaming中另外启用预写日志(Write Ahead Logs)。这将同步保存所有收到的Kafka数据到分布式文件系统(例如HDFS)上,以便在发生故障时可以恢复所有数据。 

    ( StorageLevel.MEMORY_AND_DISK_SER)在Receiver的方式中,Kafka中的topic partition与Spark Streaming中生成的RDD partition无关。所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。

     但是这并没有增加Spark    在处理数据上的并行度。

     2.直接从kafka读取数据,Direct Stream方法。

     定期向Kafka查询每个主题的每个分区中的最新偏移量(offsets)。当Spark Streaming启动处理数据的作业时,利用Kafka的低阶API读取Kafka定义的偏移范围的数据。

     Direct方式依靠checkpoint机制来保证。每次streaming 消费了kafka的数据后,将消费的kafka offsets更新到checkpoint。当你的程序挂掉或者升级的时候,就可以接着上次的读取,实现数据的零丢失。

     优点:
     这种方法相较于Receiver方式的优势在于:
     简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
     高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
     精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值(偏移量),这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二   种  方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

     缺点:
     Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本

    /**
      * Kafka 0.10的Spark Streaming集成(spark获取kafka数据的最新方式)
      */
    object KafkaDirectStream {
    
      def main(args: Array[String]): Unit = {
        //创建SparkConf,如果将任务提交到集群中,那么要去掉.setMaster("local[2]")
    
        val conf = new SparkConf().setAppName("DirectStream").setMaster("local[2]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        //创建一个StreamingContext,其里面包含了一个SparkContext
        val streamingContext = new StreamingContext(sc, Seconds(5))
    
        //配置kafka的参数
        /**
          * Kafka服务监听端口
          * 指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
          * 指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
          * 消费者ID,随意指定
          * 指定从latest(最新)还是smallest(最早)处开始读取数据
          * 如果true,consumer定期地往zookeeper写入每个分区的offset
          */
        val kafkaParams = Map[String, Object](
    
          "bootstrap.servers" -> "192.168.2.210:9092",    //kafka机器IP:端口
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "g1",
          "auto.offset.reset" -> "latest",
          "partition.assignment.strategy" -> "org.apache.kafka.clients.consumer.RangeAssignor",
          "enable.auto.commit" -> (false: java.lang.Boolean)
    
        )
    
        //要监听的Topic,可以同时监听多个
        val topics = Array("test")
    
        //在Kafka中记录读取偏移量
        val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          //位置策略(可用的Executor上均匀分配分区)
          LocationStrategies.PreferConsistent,
          //消费策略(订阅固定的主题集合)
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
    
    
        //迭代DStream中的RDD(KafkaRDD),将每一个时间点对应的RDD取出来
        stream.foreachRDD { rdd =>
          //获取该RDD对应的偏移量
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          //取出对应的数据
          rdd.foreach{ line =>
            println(line.key() + " " + line.value())
          }
    
          //异步更新偏移量到kafka中
          // some time later, after outputs have completed
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        }
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    View Code

    7. sparkStreaming kafka保证数据不重复(exactly-once)?

    1.幂等操作:重复执行不会产生问题,不需要做额外的工作即可保证数据不重复。 

    2.业务代码需要自身添加事物操作:

    dstream.foreachRDD {(rdd, time) =
      rdd.foreachPartition { partitionIterator =>
        val partitionId = TaskContext.get.partitionId()
        val uniqueId = generateUniqueId(time.milliseconds,partitionId)
        //use this uniqueId to transationally commit the data in partitionIterator   使用此唯一ID以跨方式提交分区运算符中的数据
     }
    }

    就是说针对每个partition的数据,产生一个uniqueId,只有这个partition的所有数据被完全消费,则算成功,否则算失效,要回滚。下次重复执行这个uniqueId时,如果已经被执行成功,则skip掉。

    8.kafka的topic partition ar isr osr分别是什么?

    broker:kafka集群中的服务器,一台服务器就一个broker。

    topic: 消息类别。

    partition: 一个topic包含一个或多个partition分区,物理概念。partition有一个leader副本,其余的都是follower,leader负责读与写,follower同步leader的数据。

    produce: 消息生产者。

    consumer: 消息消费者。

    consumer group: 每个consumer属于特定的consumer group,一个topic中每一个分区同一个时间只能被一个消费组里面一个线程消费。

    offset: 每一个消息添加到分区时都会被分配一个offset,它是消息在分区中唯一编号,kafka通过offset保证消息在分区内顺序是有序的。

    ar:分区中的所有副本。

    isr:所有与leader副本保持一定程度同步的副本, ISR 集合是 AR 集合的一个子集。消息会先发送到leader副本,然后follower副本才能从leader中拉取消息进行同步。同步期间,follow副本相对于leader副本而言会有一定程度的滞后。前面所说的 ”一定程度同步“ 是指可忍受的滞后范围,这个范围可以通过参数进行配置。于leader副本同步滞后过多的副本(不包括leader副本)将组成 OSR (Out-of-Sync Replied)由此可见,AR = ISR + OSR。正常情况下,所有的follower副本都应该与leader 副本保持 一定程度的同步,即AR=ISR,OSR集合为空。

    osr:leader副本同步滞后过多的副本。

    9.kafka分布式分区存储?

    https://www.cnblogs.com/yitianyouyitian/p/10287293.html

    4个broker 4个分区 2个副本 

    一个topic包含4个Partition,2 Replication(拷贝),也就是说全部的消息被放在了4个分区存储,为了高可用,将4个分区做了2份冗余,然后根据分配算法.将总共8份数据,分配到broker集群上.
    结果就是每个broker上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用.比如图中的Broker1,宕机了.那么剩下的三台broker依然保留了全量的分区数据.
    所以还能使用,如果再宕机一台,那么数据不完整了.当然你可以设置更多的冗余,比如设置了冗余是4,那么每台机器就有了0123完整的数据,宕机几台都行.需要在存储占用和高可用之间做衡量.

    10.kafka文件存储机制?

    Kafka中读写message有如下特点:
    写message
    消息从java堆转入page cache(即物理内存)。
    由异步线程刷盘,消息从page cache刷入磁盘。
    读message
    消息直接从page cache转入socket发送出去。
    当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁
    盘Load消息到page cache,然后直接从socket发出去

    11.kafka过期消息删除机制?

    kafka作为一个消息中间件,是需要定期处理数据的,否则磁盘就爆了。
    1)根据数据的时间长短进行清理,例如数据在磁盘中超过多久会被清理(默认是7天)
    2)根据文件大小的方式给进行清理,例如数据大小超过多大时,删除数据(大小是按照每个partition的大小来界定的)。

  • 相关阅读:
    eclipse如何与git 配合工作。
    git托管代码(二)
    PPC2003 安装 CFNET 3.5成功
    我的Window Mobile WCF 項目 第三篇 WM窗体设计
    我的Window Mobile WCF 項目 第一篇Mobile开发和WinForm开发的区别
    我的Window Mobile WCF 項目 第七天
    我的Window Mobile WCF 項目 第二篇 WindowsMobile访问WCF
    WCF 用vs2010 和 vs2008的简单对比测试
    vs2010beta1 和 搜狗输入法 冲突,按下 Ctrl 键就报错,重装搜狗解决
    我的Window Mobile WCF 項目 第六天 (二)
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/12778562.html
Copyright © 2011-2022 走看看