zoukankan      html  css  js  c++  java
  • kafka消费者

    1.消费方式

      consumer采用pull(拉)模式从broker中读取数据。

      push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能的以最快速度传递消息,但是这样很容易造成consumer来不及处理消息。典型的表现就是拒绝服务以及网络拥塞,而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

      pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时间为timeout。

    2.分区分配策略

      一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。

      kafka有两种分配策略,一是RoundRobin,二是Range;

      2.1 分区分配的条件

        1.同一个消费组内消费者的新增,关闭或崩溃

        2.订阅的主题新增分区

      2.2 RoundRobin

        使用RoundRobin策略有两个前提条件必须满足:

          1.同一个consumer group里面的所有消费者的num.streams必须相等;

          2.每个消费者订阅的主题必须相同;

        RoundRobin策略的工作原理:将所有主题的分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,分发给每个消费者。(其实就是按分区名hash排序后平均分配给每一个消费者的线程)

                                                                                 

      2.3 Range

        是对每个主题而言。首先按照分区序号排序,然后将消费者排序。分区数/消费者数=m,如果m!=0,前m个消费者多消费一个分区(每个主题)

        

    3.offset的维护

      由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到哪个offset,以便故障恢复后继续消费; 

      我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同。

      首先来说说消费者如果是根据javaapi来消费,也就是【kafka.javaapi.consumer.ConsumerConnector】,我们会配置参数【zookeeper.connect】来消费。这种情况下,消费者的offset会更新到zookeeper的       

      【consumers/{group}/offsets/{topic}/{partition}】目录下,例如:

    [zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
    1
    cZxid = 0x20006d28a
    ctime = Wed Apr 12 18:20:51 CST 2017
    mZxid = 0x30132b0ed
    mtime = Tue Aug 22 18:53:22 CST 2017
    pZxid = 0x20006d28a
    cversion = 0
    dataVersion = 5758
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 4
    numChildren = 0

      kafka0.9版本之前,consumer消费者默认将offset保存到zookeeper中,0.9版本开始,consumer默认将offset保存到kafka一个内置的topic中,该topic为_consumer_offsets;

      

  • 相关阅读:
    MySQL主主同步方案
    Mysql增量备份与恢复
    配置合适的存储引擎
    基于Amoeba读写分离
    部署myaql主从异步复制
    MySQL完全备份操作
    echo 命令详解
    ELK 基本部署
    zabbix 简介
    基于 Git Tag 发布及回滚代码
  • 原文地址:https://www.cnblogs.com/wnwn/p/12396228.html
Copyright © 2011-2022 走看看