zoukankan      html  css  js  c++  java
  • 大数据面试(kafka)

    kfaka

    1.怎么解决kafka数据丢失的问题?

    kafka有两种发送数据的模式,异步和同步,默认选择的是同步发送消息。

    同步:在同步模式如果ack消息确认机制为1只保证主节点写入成功,在进行主从复制如果主节点宕机,从节点将没有数据,数据就会丢失。

    所以设置ack消息确认机制为-1,消息写入主节点和从节点才算成功。

    异步:在异步模式当缓冲区满了,如果ack=0就会清空缓冲池消息。

    所以在kafka配置文件设置成不限制阻塞超时时间,一直等待就保证数据不会丢失。

    同步和异步区别:

    同:发送消息到分区再进行主从复制,客户端收到服务器确认。

    异:发送消息到缓冲区,然后写入集群中,速度比较快。

    ack=0只发送一次,不管是否成功。

    ack=1主写入就成功。

    ack=-1主副本都写入成功。

    2.kafka消费组?

    消息组里面有多个消费者,消费topic下面所有分区,

    每个分区只能由同一个消费组内一个消费者来消费。

    3.kafka为什么高吞吐量。

    1)顺序读写,不断追加到文件。

    2)零拷贝,数据从内核到内核再到网卡。

    3)分区,并行消费。

    4)批量发送消息。

    5)数据压缩,生产者对数据进行压缩,消费者解压。

    4.zookeeper在kafka中的作用?

     Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集(AR)、主副本(leader)、同步的副本集(ISR))。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做出不同的相应处理。

    5.kafka为什么是有序的?

    1)kafka默认保证同一个分区内消息是有序的,设置topic只用一个分区这样就可以保证全局消息有序,但是并发量低。

    2)指定消息发送到同一个分区,保证消息有序。

    6. sparkStreaming kafka保证数据不丢失(at-least)?

    spark streaming流式处理kafka中的数据,首先是把数据接收过来,然后转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:

     1.利用Receiver接收数据。

     此方法使用Receiver接收数据。Receiver是使用Kafka高阶API接口实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。

     在默认配置下,此方法可能会在失败时丢失数据。为确保零数据丢失,必须在Spark Streaming中另外启用预写日志(Write Ahead Logs)。这将同步保存所有收到的Kafka数据到分布式文件系统(例如HDFS)上,以便在发生故障时可以恢复所有数据。 

    ( StorageLevel.MEMORY_AND_DISK_SER)在Receiver的方式中,Kafka中的topic partition与Spark Streaming中生成的RDD partition无关。所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。

     但是这并没有增加Spark    在处理数据上的并行度。

     2.直接从kafka读取数据,Direct Stream方法。

     定期向Kafka查询每个主题的每个分区中的最新偏移量(offsets)。当Spark Streaming启动处理数据的作业时,利用Kafka的低阶API读取Kafka定义的偏移范围的数据。

     Direct方式依靠checkpoint机制来保证。每次streaming 消费了kafka的数据后,将消费的kafka offsets更新到checkpoint。当你的程序挂掉或者升级的时候,就可以接着上次的读取,实现数据的零丢失。

     优点:
     这种方法相较于Receiver方式的优势在于:
     简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
     高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
     精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值(偏移量),这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二   种  方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

     缺点:
     Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本

    /**
      * Kafka 0.10的Spark Streaming集成(spark获取kafka数据的最新方式)
      */
    object KafkaDirectStream {
    
      def main(args: Array[String]): Unit = {
        //创建SparkConf,如果将任务提交到集群中,那么要去掉.setMaster("local[2]")
    
        val conf = new SparkConf().setAppName("DirectStream").setMaster("local[2]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        //创建一个StreamingContext,其里面包含了一个SparkContext
        val streamingContext = new StreamingContext(sc, Seconds(5))
    
        //配置kafka的参数
        /**
          * Kafka服务监听端口
          * 指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
          * 指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
          * 消费者ID,随意指定
          * 指定从latest(最新)还是smallest(最早)处开始读取数据
          * 如果true,consumer定期地往zookeeper写入每个分区的offset
          */
        val kafkaParams = Map[String, Object](
    
          "bootstrap.servers" -> "192.168.2.210:9092",    //kafka机器IP:端口
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "g1",
          "auto.offset.reset" -> "latest",
          "partition.assignment.strategy" -> "org.apache.kafka.clients.consumer.RangeAssignor",
          "enable.auto.commit" -> (false: java.lang.Boolean)
    
        )
    
        //要监听的Topic,可以同时监听多个
        val topics = Array("test")
    
        //在Kafka中记录读取偏移量
        val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          //位置策略(可用的Executor上均匀分配分区)
          LocationStrategies.PreferConsistent,
          //消费策略(订阅固定的主题集合)
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
    
    
        //迭代DStream中的RDD(KafkaRDD),将每一个时间点对应的RDD取出来
        stream.foreachRDD { rdd =>
          //获取该RDD对应的偏移量
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          //取出对应的数据
          rdd.foreach{ line =>
            println(line.key() + " " + line.value())
          }
    
          //异步更新偏移量到kafka中
          // some time later, after outputs have completed
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        }
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    View Code

    7. sparkStreaming kafka保证数据不重复(exactly-once)?

    1.幂等操作:重复执行不会产生问题,不需要做额外的工作即可保证数据不重复。 

    2.业务代码需要自身添加事物操作:

    dstream.foreachRDD {(rdd, time) =
      rdd.foreachPartition { partitionIterator =>
        val partitionId = TaskContext.get.partitionId()
        val uniqueId = generateUniqueId(time.milliseconds,partitionId)
        //use this uniqueId to transationally commit the data in partitionIterator   使用此唯一ID以跨方式提交分区运算符中的数据
     }
    }

    就是说针对每个partition的数据,产生一个uniqueId,只有这个partition的所有数据被完全消费,则算成功,否则算失效,要回滚。下次重复执行这个uniqueId时,如果已经被执行成功,则skip掉。

    8.kafka的topic partition ar isr osr分别是什么?

    broker:kafka集群中的服务器,一台服务器就一个broker。

    topic: 消息类别。

    partition: 一个topic包含一个或多个partition分区,物理概念。partition有一个leader副本,其余的都是follower,leader负责读与写,follower同步leader的数据。

    produce: 消息生产者。

    consumer: 消息消费者。

    consumer group: 每个consumer属于特定的consumer group,一个topic中每一个分区同一个时间只能被一个消费组里面一个线程消费。

    offset: 每一个消息添加到分区时都会被分配一个offset,它是消息在分区中唯一编号,kafka通过offset保证消息在分区内顺序是有序的。

    ar:分区中的所有副本。

    isr:所有与leader副本保持一定程度同步的副本, ISR 集合是 AR 集合的一个子集。消息会先发送到leader副本,然后follower副本才能从leader中拉取消息进行同步。同步期间,follow副本相对于leader副本而言会有一定程度的滞后。前面所说的 ”一定程度同步“ 是指可忍受的滞后范围,这个范围可以通过参数进行配置。于leader副本同步滞后过多的副本(不包括leader副本)将组成 OSR (Out-of-Sync Replied)由此可见,AR = ISR + OSR。正常情况下,所有的follower副本都应该与leader 副本保持 一定程度的同步,即AR=ISR,OSR集合为空。

    osr:leader副本同步滞后过多的副本。

    9.kafka分布式分区存储?

    https://www.cnblogs.com/yitianyouyitian/p/10287293.html

    4个broker 4个分区 2个副本 

    一个topic包含4个Partition,2 Replication(拷贝),也就是说全部的消息被放在了4个分区存储,为了高可用,将4个分区做了2份冗余,然后根据分配算法.将总共8份数据,分配到broker集群上.
    结果就是每个broker上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用.比如图中的Broker1,宕机了.那么剩下的三台broker依然保留了全量的分区数据.
    所以还能使用,如果再宕机一台,那么数据不完整了.当然你可以设置更多的冗余,比如设置了冗余是4,那么每台机器就有了0123完整的数据,宕机几台都行.需要在存储占用和高可用之间做衡量.

    10.kafka文件存储机制?

    Kafka中读写message有如下特点:
    写message
    消息从java堆转入page cache(即物理内存)。
    由异步线程刷盘,消息从page cache刷入磁盘。
    读message
    消息直接从page cache转入socket发送出去。
    当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁
    盘Load消息到page cache,然后直接从socket发出去

    11.kafka过期消息删除机制?

    kafka作为一个消息中间件,是需要定期处理数据的,否则磁盘就爆了。
    1)根据数据的时间长短进行清理,例如数据在磁盘中超过多久会被清理(默认是7天)
    2)根据文件大小的方式给进行清理,例如数据大小超过多大时,删除数据(大小是按照每个partition的大小来界定的)。

  • 相关阅读:
    用Ajax将checkbox选中的值发送给后台
    checkbox选中selec才可选和显示隐藏密码
    删除表格当前行
    Ajax本地取模板--完善窗口隐藏与共用
    Ajax向服务器请求对表单和表格进行操作
    用原生Dom实现向表格中添加数据
    正则判断表单输入
    隐藏窗口弹出及拖动效果
    原生DOM操作两个栗子,关于折叠内容和批量删除
    学习JS处理全选过程中遇到很多问题,所以把这个写了出来,希望对需要的人有帮助
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/12778562.html
Copyright © 2011-2022 走看看