zoukankan      html  css  js  c++  java
  • Linkedin Kafka Design

    http://kafka.apache.org/07/design.html

    中文版的设计文档, http://www.oschina.net/translate/kafka-design

    Overview

    Use cases for activity stream and operational data

    • "动态汇总(News feed)"功能。将你朋友的各种活动信息广播给你
    • 相关性以及排序。通过使用计数评级(count rating)、投票(votes)或者点击率( click-through)判定一组给定的条目中那一项是最相关的.
    • 安全:网站需要屏蔽行为不端的网络爬虫(crawler),对API的使用进行速率限制,探测出扩散垃圾信息的企图,并支撑其它的行为探测和预防体系,以切断网站的某些不正常活动。
    • 运营监控:大多数网站都需要某种形式的实时且随机应变的方式,对网站运行效率进行监控并在有问题出现的情况下能触发警告。
    • 报表和批处理: 将数据装载到数据仓库或者Hadoop系统中进行离线分析,然后针对业务行为做出相应的报表,这种做法很普遍。

    Deployment

    The following diagram gives a simplified view of the deployment topology at LinkedIn.

    image

    It is not intended that a single Kafka cluster span data centers, but Kafka is intended to support multi-datacenter data flow topologies. This is done by allowing mirroring or "syncing" between clusters. This feature is very simple, the mirror cluster simply acts as a consumer of the source cluster. This means it is possible for a single cluster to join data from many datacenters into a single location. Here is an example of a possible multi-datacenter topology aimed at supporting batch loads:

    image

    基于pull的机制, 确实非常方便多个datacenter之间的备份和同步

    Major Design Elements

    There is a small number of major design decisions that make Kafka different from most other messaging systems:

    1. Kafka is designed for persistent messages as the common case, 使用磁盘缓存(page cache)而非进程缓存(memory cache)来buffer message 
    2. Throughput rather than features are the primary design constraint, 强调系统Throughput而非功能, 用的时候会发现功能真的很少...
    3. State about what has been consumed is maintained as part of the consumer not the server, Pull模式, Consumer记录状态, Server无状态
    4. Kafka is explicitly distributed. It is assumed that producers, brokers, and consumers are all spread over multiple machines, 完全的分布式系统

    Message Persistence and Caching

    Don't fear the filesystem!

    大家普遍认为磁盘很慢, 其实主要慢在磁盘寻道的机械操作时间, 现在磁盘吞吐的速度已经很快, 可以和网络一样快
    为了弥补, 寻道速度和吞吐速度之间巨大的Gap, OS都会使用磁盘缓存

    现代操作系变得越来越积极地将主内存用作磁盘缓存。所有现代的操作系统都会乐于将所有空闲内存转做磁盘缓存,即时在需要回收这些内存的情况下会付出一些性能方面的代价

    利用磁盘缓存可以实现read-ahead and write-behind techniques

    预读(read-ahead)就是提前将一个比较大的磁盘块中内容读入内存
    后写(write-behind)是将一些较小的逻辑写入操作合并起来组成比较大的物理写入操作

    但是这些优化都只能用于线性读写, 对于随机读写无能为力

    实际测试, 线性读写的效率其实很高, 超出人们的想象, 比随机读写快10000倍

    Kafka relies heavily on the filesystem for storing and caching messages.

    The key fact about disk performance is that, As a result the performance of linear writes on a 6 7200rpm SATA RAID-5 array is about 300MB/sec but the performance of random writes is only about 50k/sec—a difference of nearly 10000X.

    甚至ACM Queue article发现,在某些情况下线性磁盘访问能够比随即内存访问还要快!

    进程memory cache也并不完美, 尤其对线性读写

    1. 效率问题

    内存cache的空间效率问题, 尤其对于JVM, overhead过高

    a. The memory overhead of objects is very high, often doubling the size of the data stored (or worse).

    b. Java garbage collection becomes increasingly sketchy and expensive as the in-heap data increases.

    2. 重复cache问题

    对于线性读写, 如果在进程memory也保留一份cache, 会和OS的磁盘页面缓存(pagecache)重复, 导致浪费

    3. 进程重启的cache rebuild

    OS的磁盘页面缓存即使在服务重启之后会仍然保持有效,而不象进程内缓存,进程重启后还需要在内存中进行缓存重建(10G的缓存重建时间可能需要10分钟)

    4. 代码逻辑复杂化

    使用OS的磁盘页面缓存大大简化了代码,因为对缓存和文件系统之间的一致性进行维护的所有逻辑现在都是在OS中实现的,比我们在进程中做那种一次性的缓存更加高效,准确性也更高

    Kafka cache方案, 充分利用disk cache

    传统方案, maintain as much as possible in-memory and flush to the filesystem only when necessary

    Kafka方案,

    不在进程memory内做任何cache, 直接将数据发送给disk的pagecache, 但并不立即flush, write-behind
    All data is immediately transferred into the kernel's pagecache where the OS can flush it later.
    出于效率考虑, 实现基于配置的批量flush策略 
    Then we add a configuration driven flush policy to allow the user of the system to control how often data is flushed to the physical disk (every N messages or every M seconds) to put a bound on the amount of data "at risk" in the event of a hard crash.
    这种基于pagecache方式的理论基础
    This style of pagecache-centric design is described in an article on the design of Varnish here

    Message检索问题

    BTree是目前最通用的数据结构, 在消息系统中它可以用来广泛支持多种不同的事务性或非事务性语义. 它的确也带来了一个非常高的处理开销.

    更进一步讲, BTrees需要一种非常复杂的页面级或行级锁定机制才能避免在每次操作时锁定一整颗树. 实现这种机制就要为行级锁定付出非常高昂的代价, 否则就必须对所有的读取操作进行串行化(serialize).

    Intuitively a persistent queue could be built on simple reads and appends to files as is commonly the case with logging solutions. Though this structure would not support the rich semantics of a BTree implementation, but it has the advantage that all operations are O(1) and reads do not block writes or each other.

    其实Kafka根本不需要那么复杂的支持, 只需要实现顺序读写, 不需要支持对消息的随机读取
    所以直接使用最简单的设计, log file soluiton, 支持append, 和顺序读取. 复杂度当然是O(1), 而且写数据也不干扰读, 是够简单的.

    优点是, 读写效率和message总量没有线性关系, 如果是Btree就不行, message越多btree就越大, 读写效率就会变得越低.
    所以其他message system无法处理online analysis, 也是因为当message queue中的message过多时, 会导致效率大幅降低, 所以必须快速的把message push出去来减少queue的数据. 而对于kafka就无所谓, 可以保留大量的message, 对于online analyiss就算分析的慢点也没有关系, 而且现在kafka往往会过7天才删除过期的message, 这也会带来很多方便.

    这也很好体现Kafka的throughput优先的设计思路.

    Maximizing Efficiency

    There are two common causes of inefficiency: two many network requests, and excessive byte copying.

    message set, 降低网络请求量

    To encourage efficiency, the APIs are built around a "message set" abstraction that naturally groups messages. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time.

    zero-copy support, 优化excessive byte copying效率

    通常下从文件到将数据发送出去的路径,

    将数据从文件传输到socket的数据路径:

    - 操作系统将数据从磁盘中读取到内核空间里的页面缓存 
    - 应用程序将数据从内核空间读入到用户空间的缓冲区
    - 应用程序将读到的数据写回内核空间并放入socke的缓冲区
    - 操作系统将数据从socket的缓冲区拷贝到NIC(网络借口卡,即网卡)的缓冲区,自此数据才能通过网络发送出去

    而使用sendfile(os系统操作), 可以避免这些重复的拷贝操作,让OS直接将数据从磁盘页面缓存发送到网络缓冲区

    对于最常见的场景, topic的多个consumers

    采用前文所述的零拷贝优化方案,数据只需拷贝到页面缓存中一次, 然后每次发送给consumer时都对它进行重复使用即可, 而无须先保存到内存中, 然后在consume该消息时每次都需要将其拷贝到内核空间中.
    消息consume的速度就能接近网络连接的极限.
    可以看出充分利用disk page cache的高效性...

    End-to-end Batch Compression

    Of course the user can always send compressed messages without any support needed from Kafka, but this can lead to very poor compression ratios as much of the redundancy is due to repetition between messages (e.g. field names in JSON or user agents in web logs or common string values). Efficient compression requires compressing multiple messages together rather than compressing each message individually.

    Ideally this would be possible in an end-to-end fashion—that is, data would be compressed prior to sending by the producer and remain compressed on the server, only being decompressed by the eventual consumers.

    Kafka supports GZIP and Snappy compression protocols.

    两点,

    首先多条压缩比单条压缩要合算, 你知道Gzip原理就明白了

    第二是, 在broker上不用解压, Consumer用的时候再解压

    Consumer

    Message delivery semantics

    So clearly there are multiple possible message delivery guarantees that could be provided:

    • At most once—this handles the first case described. Messages are immediately marked as consumed, so they can't be given out twice, but many failure scenarios may lead to losing messages.
    • At least once—this is the second case where we guarantee each message will be delivered at least once, but in failure cases maybe delivered twice.
    • Exactly once—this is what people actually want, each message is delivered once and only once.

    最多一次, 很容易丢数据

    恰好一次, 很好, 但是太难实现

    Algorithms that provide exactly once semantics exist, two- or three-phase commits and Paxos variants being examples, but they come with some drawbacks. They typically require multiple round trips and may have poor guarantees of liveness.

    所以kafka折衷选择at least once, 至少一次

    Consumer state, 由consumer保存, 即stateless broker

    In Kafka, the consumers are responsible for maintaining state information (offset) on what has been consumed.

    通常Consumer将state保存在Zookeeper上, 这样会带来分布式一致性问题?
    consume保存成功state, 但是数据没有处理就发生crash, 这样这部分数据就被丢失
    我们解决这个问题的思路,
    取消自动commit state, 改为在处理完consume的数据后, 手工commit state
    这样有个问题是, 当发生consumer rebalance时会导致不同的consumer读到重复数据
    可以通过减小每次consumer的fetch size, 来部分解决这个问题

    另一种思路, 将state和数据处理结果存放在同一个datastore中, 这样需要使用low-level的consumer API

    事务性系统,
    - 将state和数据更新放在同一个transaction中, 更新到OLTP数据库

    非事务性系统,
    - 搜索索引可以将state同它的索引段(index segment)存储在一起
    - Hadoop从Kafka中并行加载, 每个Mapper在map任务结束前, 将它使用的最后一个消息的偏移量存入HDFS

    Push vs. pull

    In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer.

    Some recent systems, such as scribe and flume, focusing on log aggregation, follow a very different push based path where each node acts as a broker and data is pushed downstream.

    Producer

    Automatic producer load balancing

    Kafka supports client-side load balancing for message producers or use of a dedicated load balancer to balance TCP connections.

    1. A dedicated layer-4 load balancer works by balancing TCP connections over Kafka brokers

    四到七层负载均衡,就是在对后台的服务器进行负载均衡时,依据四层的信息或七层的信息来决定怎么样转发流量

    所谓layer-4(四层)就是基于IP+端口的负载均衡;七层就是基于URL等应用层信息的负载均衡;同理,还有基于MAC地址的二层负载均衡和基于IP地址的三层负载均衡。换句换说,二层负载均衡会通过一个虚拟MAC地址接收请求,然后再分配到真实的MAC地址;三层负载均衡会通过一个虚拟IP地址接收请求,然后再分配到真实的IP地址;四层通过虚拟IP+端口接收请求,然后再分配到真实的服务器;七层通过虚拟的URL或主机名接收请求,然后再分配到真实的服务器。

    In this configuration all messages from a given producer go to a single broker.
    The advantage of using a level-4 load balancer is that each producer only needs a single TCP connection, and no connection to zookeeper is needed.
    The disadvantage is that the balancing is done at the TCP connection level, and hence it may not be well balanced (if some producers produce many more messages then others, evenly dividing up the connections per broker may not result in evenly dividing up the messages per broker).

    简单的根据producer的ip+port来选择broker, 一个producer只对应单个broker
    好处, 简单, 只需要维护单个TCP链接, 不需连接zookeeper
    坏处, 无法真正做到负载均衡, 每个producer的情况可能差别很大

    2. Client-side zookeeper-based load balancing solves some of these problems
    It allows the producer to dynamically discover new brokers, and balance load on a per-request basis. Likewise it allows the producer to partition data according to some key instead of randomly, which enables stickiness on the consumer (e.g. partitioning data consumption by user id). This feature is called "semantic partitioning".

    应该属于layer-7的load balance, 完全根据内容进行划分, 称为"语义划分". 如何根据语义划分, 可以定义partitioner, 比如随机, 或hash
    好处, 负载均衡效果好, 取决于partitioner的设计
    坏处, 复杂, 需要维护所有brokers的TCP链接池, 需要通过Zookeeper watchers监控zookeeper上broker和topic的动态状况

    - a new broker comes up
    - a broker goes down
    - a new topic is registered
    - a broker gets registered for an existing topic

    Asynchronous send

    Asynchronous non-blocking operations are fundamental to scaling messaging systems. In Kafka, the producer provides an option to use asynchronous dispatch of produce requests (producer.type=async). This allows buffering of produce requests in a in-memory queue and batch sends that are triggered by a time interval or a pre-configured batch size.

    为了提高网络效率, 屯一批message后一起发送, 否则单条发overhead会很高 

    Semantic partitioning

    The partitioning function can be customized by providing an implementation of the kafka.producer.Partitioner interface, default being the random partitioner.

    For the example above, the key would be member_id and the partitioning function would be hash(member_id)%num_partitions.

    Implementation Details

    The following gives a brief description of some relevant lower-level implementation details for some parts of the system described in the above section.

    API Design

    Producer APIs

    The Producer API that wraps the 2 low-level producers - kafka.producer.SyncProducer and

    kafka.producer.async.AsyncProducer. 
    
    class Producer {
    	
      /* Sends the data, partitioned by key to the topic using either the */
      /* synchronous or the asynchronous producer */
      public void send(kafka.javaapi.producer.ProducerData producerData);
    
      /* Sends a list of data, partitioned by key to the topic using either */
      /* the synchronous or the asynchronous producer */
      public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);
    
      /* Closes the producer and cleans up */	
      public void close();
    
    }

    这个看上去很简单, 通过简单的send函数, 可以handle单条或多条的message发送.

    其实Producer的初始化需要一些参数, 参考API文档

    new Producer(config: ProducerConfig, encoder: Encoder[V], eventHandler: EventHandler[V], cbkHandler: CallbackHandler[V], partitioner: Partitioner[K]) 

    1. Asynchronous dispatch

    kafka.producer.Producer provides the ability to batch multiple produce requests (producer.type=async), before serializing and dispatching them to the appropriate kafka broker partition.
    异步, 为了提高网络效率...

    kafka.producer.EventHandler, serialize and send the data to the appropriate kafka broker partition

    异步就是批量发送, 但是如何把一堆message进行serialize, 并决定发到那个parittion, 这个逻辑和单条发送都是不一样的. 所以就可以定义eventhandler来处理.

    kafka.producer.async.CallbackHandler, plugging in custom logging/tracing code or custom monitoring logic

    提供一个log和monitor的接口

    2. encoder: Encoder[V]

    handles the serialization of data through a user-specified Encoder -

    interface Encoder<T> {
      public Message toMessage(T data);
    }

    The default is the no-op kafka.serializer.DefaultEncoder

    我的理解就是, 在发送message前, 可能需要将message转化为统一的格式, 所以这儿提供这个接口, 默认是什么都不做no-op

    比如最常用的是, 压缩, 可想而知在consumer肯定也会有decode的配置...

    3.zk.connect config parameter

    The zookeeper based broker discovery and load balancing can be used by specifying the zookeeper connection url through the zk.connect config parameter.

    如果不想用zookeeper, 也可以直接用broker.list, 问题是万一有broker挂了, producer的request会被block

    The producer can take in a static list of brokers through the broker.list config parameter. Each produce requests gets routed to a random broker partition in this case. If that broker is down, the produce request fails.

    4. kafka.producer.Partitioner

    The routing decision is influenced by the kafka.producer.Partitioner.

    interface Partitioner<T> {
       int partition(T key, int numPartitions);
    }

    The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is hash(key)%numPartitions. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the partitioner.class config parameter.

    这个好理解, 通过T key, int numPartitions就可以定义一个简单的partitioner

    如果不定义, 默认就是随机, random broker partition

    当然可能要custom partitioning strategy, 也很简单, 自己定义一个partitioner class 传给partitioner.class config parameter

    Consumer APIs

    We have 2 levels of consumer APIs.
    The low-level "simple" API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose.

    The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed.

    底层API维护了一个同单个broker建立的连接. 该API完全是无状态的, 每个请求都带有一个偏移量作为参数, 从而允许用户以自己选择的任意方式来读取数据

    高层API对使用者隐藏了代理的具体细节, 让使用者可运行于集群中的机器之上而无需关心底层的拓扑结构, 它还维护着数据使用的状态.

    Low-level API
    class SimpleConsumer {
    	
      /* Send fetch request to a broker and get back a set of messages. */ 
      public ByteBufferMessageSet fetch(FetchRequest request);
    
      /* Send a list of fetch requests to a broker and get back a response set. */ 
      public MultiFetchResponse multifetch(List<FetchRequest> fetches);
    
      /**
       * Get a list of valid offsets (up to maxSize) before the given time.
       * The result is a list of offsets, in descending order.
       * @param time: time in millisecs,
       *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
       *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
       */
      public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
    }

    The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state.

    直接通过offset从broker上取数据, FetchRequest (including topic, partition, offset). 通过getOffsetsBefore, 你可以得到offset list

    low-level API是实现high-level API的基础
    并且如果你对consumer state维护有特殊要求的话, 就只能使用low-level API, 比如hadoop consumer

    High-level API
    /* create a connection to the cluster */ 
    ConsumerConnector connector = Consumer.create(consumerConfig);
    
    interface ConsumerConnector {
    	
      /**
       * This method is used to get a list of KafkaMessageStreams, which are iterators over topic.
       *  Input: a map of <topic, #streams>
       *  Output: a map of <topic, list of message streams>
       *          Each message stream supports a message iterator.
       */
      public Map<String,List<KafkaMessageStream>> createMessageStreams(Map<String,Int> topicCountMap); 
    
    
      /**
       * You can also obtain a list of KafkaStreams, that iterate over messages
       * from topics that match a TopicFilter. (A TopicFilter encapsulates a
       * whitelist or a blacklist which is a standard Java regex.)
       */
      public List<KafkaStream> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
      /* Commit the offsets of all messages consumed so far. */
      public commitOffsets()
      
      /* Shut down the connector */
      public shutdown()
    }

    对于kafka, 一个consumer可以同时handle一个topic中的多个partition, 所以为了提高效率, 我们可以定义多个threads(streams)去consume, 你也可以定义单个线程, 那么该线程就需要依次遍历所有partition.

    但是每个partition最多只能有一个stream去consume, 所以thread数目的定义不能大于consumer所handle的partition数, 多了也浪费.

    对于createMessageStreams函数, 可以一次性创建对于多个topic的consumer streams , 这样做的目的, 上面说了, 为了效率, 因为每次都需要rebalancing the consumer/broker assignment, 所以尽量只做一次rebalance.

    所以这个函数的参数是个map(topic name : stream number)

    而返回值也是个map(topic name: stream list)

    其中的KafkaMessageStream, 都是一个iterator

    那么对于每个topic, 用户得到一堆iterator(取决于定义的stream num), 于是遍历每个iterator去取得新数据即可.

    很简单的接口, 不用去care, consumer和broker的状态变化.

    createMessageStreamsByFilter, 提供一种方式, 你可以不一一指定需要consume的topic, 而是定义一个filter来选取所有满足的topic

    Messages

    Messages consist of a fixed-size header and variable length opaque(不透明) byte array payload.
    The header contains a format version and a CRC32 checksum to detect corruption or truncation.
    Leaving the payload opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The MessageSet interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO Channel.

    Message Format

            /** 
    	 * A message. The format of an N byte message is the following: 
    	 * 
    	 * If magic byte is 0 
    	 * 
    	 * 1. 1 byte "magic" identifier to allow format changes 
    	 * 
    	 * 2. 4 byte CRC32 of the payload 
    	 * 
    	 * 3. N - 5 byte payload 
    	 * 
    	 * If magic byte is 1 
    	 * 
    	 * 1. 1 byte "magic" identifier to allow format changes 
    	 * 
    	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
    	 * 
    	 * 3. 4 byte CRC32 of the payload 
    	 * 
    	 * 4. N - 6 byte payload 
    	 * 
    	 */

    Log File

    message作为Kafka的基本单位 –> 使用Log file来组织messages –> 使用目录来组织log file (目录名 = topicname + partitionid)

    A log for a topic named "my_topic" with two partitions consists of two directories (namely my_topic_0 and my_topic_1) populated with data files containing the messages for that topic.

    Log file由log entries组成, 即message, message在磁盘上的构成如下

    On-disk format of a message
    message length : 4 bytes (value: 1+4+n) 
    "magic" value  : 1 byte
    crc            : 4 bytes
    payload        : n bytes

    发现在message构成里面, 没有id, 怎样标识每个message?

    通常的思路是创建一个GUID作为消息id
    而kafka的方式是, 直接使用message的64-bit integer offset (表示从partition开始位置) 来标识这个message
    这个方式的好处是显而易见的, 节省空间, 不用额外存储id, 不需要特别的索引去记录id和offset的对应关系
    局限就是, offset必须是不变的

    并且每个log file都有一个max log file size, 当文件大小超出, 需要产生新的log file (filename由第一个message的id(offset)决定)
    所以第一个文件必然是00000000000.kafka

    image

    Active Segment List, 并不需要在磁盘上保存这样的索引结构, broker启动时, 遍历一遍topic的目录, 临时在内存里面创建, 以便于迅速查找
    每个segment就是一个log file, 包含的message的范围就是当前segment name和下一个segment name之间区间(因为filename就是首个message的id)

    操作, 图画的很清晰
    Append, 只会append在最新的那个segment上, 并且当达到文件大小上限, 创建新的segment
    Delete, 从最old的segment开始
    Read, 不会因为append或delete有任何影响

    大家来找茬, 这张图有错误...
    明显的错误就是下面那个segment file中的messageid都是错的, copy下来没有改...

    Distribution, 分布式

    Zookeeper Directories

    The following gives the zookeeper structures and algorithms used for co-ordination between consumers and brokers.

    Broker Node Registry
    /brokers/ids/[0...N] --> host:port (ephemeral node)

    Since the broker registers itself in zookeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).

    Broker Topic Registry
    /brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

    Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.

    记录每个broker记录它所包含的topic, 和该topic的分区数 

    Consumer Id Registry

    In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory.

    /consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

    Each of the consumers in the group registers under its group and creates a znode with its consumer_id.

    The value of the znode contains a map of <topic, #streams>. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies.

    记录每个consumer, 订阅的所有topic, #streams数目 

    Consumer Offset Tracking

    Consumers track the maximum offset they have consumed in each partition. This value is stored in a zookeeper directory

    /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value (persistent node)
    这个是唯一一个需要persistent, 可以看到记录的是groupid, 而非consumerid 
    Partition Owner registry

    Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.

    /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)

    记录了每个partition, 对于某一个group, 被那个consumer所owner 

    Broker node registration

    The broker nodes are basically independent, so they only publish information about what they have.

    When a broker joins,

    1. It registers itself under the broker node registry directory and writes information about its host name and port.

    2. The broker also register the list of existing topics and their logical partitions in the broker topic registry.

    增加broker, 其实就是topic的partition增加, producer通过zookeeper触发partitiioner从新进行负载均衡

    同时consumer, 通过zookeeper的触发rebalance

    Consumer registration algorithm

    When a consumer starts, it does the following:

    1. Register itself in the consumer id registry under its group.
    2. Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.
    3. Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. Each change triggers rebalancing among all consumers in all consumer groups.
    4. Force itself to rebalance within in its consumer group.

    Consumer rebalancing algorithm

    参考, http://www.cnblogs.com/fxjwind/archive/2013/03/19/2969655.html

  • 相关阅读:
    GC算法 垃圾收集器
    Distinct
    生产者消费者实现
    单例模式(七种实现方法)
    【JUC】JDK1.8源码分析之AbstractQueuedSynchronizer
    【JUC】JDK1.8源码分析之ConcurrentHashMap
    【集合框架】JDK1.8源码分析之HashMap
    一个整数,它加上100后是一个完全平方数, 再加上168又是一个完全平方数,请问该数是多少?
    猴子吃桃问题:猴子第一天摘下若干个桃子, 当即吃了一半,还不过瘾,又多吃了一个; 第二天早上又将剩下的桃子吃掉一半,又多吃了一个。 * 以后每天早上都吃了前一天剩下的一半零一个。 到第10天早上想再吃时,见只剩下一个桃子了。 求第一天共摘了多少。 * 1.程序分析:采取逆向思维的方法,从后往前推断。
    输出9*9口诀 输出9*9乘法表
  • 原文地址:https://www.cnblogs.com/fxjwind/p/2975573.html
Copyright © 2011-2022 走看看