zoukankan      html  css  js  c++  java
  • Kafka Consumer

    Consumer

    High Level Consumer

    简要:很多应用场景下,客户程序知识希望从Kafka顺序读取并处理数据,并不太关心具体的offset

    同时也希望提供一些语义,例如同一条消息只被一个Consumer消费(单播)或被所有Consumer消费(广播)。

    Kafka High Level API提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节,并提供丰富的语义。

    Consumer Group

    High Level Consumer将从某个Patition读取的最后一条消息的offset存在Zookeeper中(从0.8.2开始同时支持将offset存在Zookeeper中和专用的Kafka topic中)。

    这里就有一个问题:如果消费一条消息就将其offset存在Topic中,那么存在Topicoffset数据量将和被消费的数据量是一样的。如果我们想要知道哪条消息被哪个Consumer消费,那么我们就得再起一个Consumerpull这个存有offsetTopic。而且消息被删除后,可能会影响到offset。这样Kafka不是通过时间等去回收offset,而是通过Compaction的方式,将key想同的value只保留最后一条offset

    这个offset基于客户端程序提供给Kafka的名字来保存,这个名字被称为Consumer Group

    Consumer Group是整个Kafka集群全局唯一的,而非针对某个Topic

    每个High level Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group

    消息被消费后,并不会被删除,只是相应的offset加一。

    对于每条消息,在同一个Consumer Group里只会被一个Consumer消费。

    不同Consumer Group可消费同一条消息。

    (假设一个Broker里就一个Patition

     

    Kafka的设计理念之一就是同时提供对离线批处理和在线流处理的支持

    可同时使用Hadoop系统进行离线批处理,Storm或其它流处理系统进行流处理

    可使用KafkaMirror Maker将消息从一个数据中心镜像到另一个数据中心

    验证  消息的单播和广播方式

    1.重新部署集群,目的是将数据清空,启动KafkaZookeeper集群

     

    2.创建Topic,设置TopicPatiton的数目为3个,replication1

     

    3.我们设置2Consumer Group,一组2Consumer,另外一组1Consumer

     

    4.Producer发送3条消息test0,test1,test2,发现

     

    High Level Consumer Rebalance

     

    Consumer启动及Rebalance流程

    High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为/consumers/[consumer group]/ids/[consumer id]

    /consumers/[consumer group]/ids上注册Watch,为了监控Consumer挂了。

    /brokers/ids上注册Watch,为了监控Broker是否挂了

    如果Consumer通过Topic Filter创建消息流,则它会同时在/brokers/topics上创建Watch

    强制自己在其Consumer Group内启动Rebalace

    Consumer Rebalance算法

    将目标Topic下所有Partition排序,存于集合P

    对某个Consumer Group下所有Consumer排序,存于集合C中,第iConsumer记为Ci

    N=size(P)/size(C),向上取整

    解除Ci对原来分配的Partition的消费权(i0开始)

    将第i*N(I+1)*N-1Partitoon分配给Ci

    Consumer rebalance算法缺陷及改进

    任何Broker或者Consumer的增减都会促发所有的ConsumerRebalance

    Split Brain(脑裂)每个Consumer分别单独通过Zookeeper判断哪些BrokerConsumer宕机,同时Consumer在同一时刻从Zookeeper”看”到的view可能不一样,这是由Zookeeper特性决定的

    调整结果不可控,所有Consumer分别进行Rebalance,彼此不知道对应的Rebalance是否成功。

     

    Low Level Consumer

    使用Low Level ConsumerSimple Consumer)的主要原因是,用户希望比Consumer Group更好的控制数据的消费,如

    同一条消息读多次,方便Replay

    只消费某个Topic的部分Partiton

    管理事务,从而确保每条消息被处理一次(Exactly once

     

    High Level Consumer相对,Low Level Consumer要求用户做大量的额外工作

    在应用程序中跟踪处理offset,并决定下一条消费哪条消息

    获知每个PartitionLeader

    处理Leader的变化

    处理多Consumer的协作

  • 相关阅读:
    【并查集】亲戚
    【图论】Car的旅行线路 NOIP 2001
    【贪心】排座椅
    【DP】花店橱窗布置
    【NOIP】NOIP考纲总结+NOIP考前经验谈
    【NOIP】考前须知
    NOIP 2016 PJ T4 魔法阵
    NOIP 2016 PJ T3 海港
    【高精度】麦森数 NOIP 2003
    【带权并查集】食物链 NOIP 2001
  • 原文地址:https://www.cnblogs.com/WardSea/p/7428751.html
Copyright © 2011-2022 走看看