zoukankan      html  css  js  c++  java
  • kafka概念

    一、结构与概念解释

    1.基础概念

    topics: kafka通过topics维护各类信息。

    producer:发布消息到Kafka topic的进程。

    consumer:订阅kafka topic进程和处理订阅的消息的进程。

    broker:kafka集群的每个server叫broker.

    提供了语言无关、高性能、简单的client-server的链接方式。

    2.Topics and Logs

    (1)topic是发送消息的类别名称。每个partition是持续添加的有序的不可变的消息序列-commit log. partition内部的消息分配唯一的Id number--offset.

    (2)无论是否消费,kafka在一段可配置时间内会保留所有提交的消息。如设置2天,发布后两天内都可以消费,两天后就会腾出空间。kafka的性能是恒定量,多保留些数据不是问题。

    (3)每个消费者实际需要保留的元数据信息是消费者处理log的位置-offset.  offset由consumer来控制。 通常随着读取messages,consumer会线性增长offset,  但实际上这个值由consumer控制,可以以喜欢的任何顺序来消费,如可以设置成一个old offset,重新处理消息。

    (4)这些组合,造成kafka consumer非常轻量级,基本上不会对其他consumer造成影响。(如我们可以用命令行tail等操作 察看内容,不会改变已经存在的consumer的消费行为).

    这些特性好处:允许日志量超出单台服务器,每个独立分区必须适应拥有它的server,一个topic可以有多个partition(持有任意数量的数据);他们作为并行单元执行。

    3. Distribution

    (1)日志分区分布在Kafka集群上,每个server处理一部分分区的数据和请求,每个partition都可以有一个配置大小的容错server.

    (2)每个Partition会有一个Leader,有0个或多个followers. leader处理这个partition的所有读写请求,followers被动的复制leader.  leader fails,其中一个followers自动成为leader.每个server都做一部分partition的leader,其他partition的follower,所以负载在集群上是均匀的。

    4.producers

    producer负责选择哪个消息给topic中的哪个partition。这可以用轮询的方式简单的balance load,或者基于语义partition function(如根据消息Key).

    5.consumers

    (1)消息传统两种使用模式:队列和发布-订阅。 队列模式下,几个消费者可以从服务器读取,消息只会到一个消费者;发布订阅--消息会广博到所有消费者。

    (2)消费者给自己一个consumer group 名称,发布给topic每个消息传递到每个consumer订阅组中的一个实例中。consumer progress可以独立线程或者独立机器。

    如果所有的consumer instance都有相同的consumer group, 类似于传统的queue,负载到consumers.

    如果所有的consumer instance都有不同的consumer group, 类似于发布-订阅,广播到所有的consumers.

    更常见的是有多个consumer group,每个逻辑订阅组一个名字。每个组为了扩展性和容错性由多个consumer instance组成。这超出了发布订阅模式语义,每个订阅者是consumer群,不是单个进程。

    (3)kafka比常见的消息系统有更强的排序保证。

    传统队列系统,并行consumer会让本来有序的队列,处理完的速度并不相同。

    kafka可以保证顺序。通过分配partition给consumer group中的consumer保证每个分区由consumer中的一个consumer消费。(这样保证consumer是partiton的唯一reader,按序消费数据,因为分区多,还是能平衡负载.这不能有比分区更多的partition.)

    当然Kafka只提供了分区内部的有序性,不能跨partition. 每个分区的有序性,结合按Key分partition的能力对大多应用都够用了。全序---one topic-one partition-one consumer,失去了并行。

    6. Guarantees

    (1)partition会按照publish message的顺序排放消息。

    (2)consumer看到消息的顺序与在Log存储的顺序相同。

    (3)对于复制因子N,容忍N-1 server宕机不影响服务。

    二、使用场景

    1.messaging消息队列

    可以用来替换传统消息代理(解耦生产者消费者,缓存未处理消息),kafka与以往的软件比更大的吞吐量、内置的分区和分区复制机制、容错机制。更适合做大型消息处理的解决方案。

    根据经验消息系统经常需要吞吐量不高,但是需要端到端低延迟、经常以来Kafka提供的持久化保证。(与activemq和rabbitmq对比).

    2.website activity tracking 网站活动跟踪

    (1)kafka的原始用途:通过重建用户跟踪管道为一组实时的发布-订阅feeds。这意味着网站活动(pv,search,用户的其他活动)根据类型被发布到中心topics的一个topic. 这些feed广泛用于大量用例的订阅包括实时处理、实时监控、load to hadoop或者离线数据仓库为离线处理和报告。

    (2)活动跟踪需要高吞吐量,许多活动消息由每个用户pv生成。

    3.Metrics 度量.

    kafka经常用于运营监测数据;包括聚合统计信息从分布式应用来生成运营数据中心化。

    4.log aggregation日至聚合。 分散文件放入集中位置(file server/hdfs)处理

    kafka做日志聚集的替代。kafka抽出文件内容,给日志和事件数据一个清洁提取做为消息流。这允许低延迟处理和易于支持多个数据源和多个消费者消费。对比中心化聚合系统scribe/flume,kafka提供更好的性能,更强的持久化保证,由于replication,更低的端到端延迟。

    5.stream processing流处理

    许多用户做阶段性的数据处理(数据消耗原始数据,然后聚合、补充、或者重新转换为Kafka topic进一步处理)。

    例如:文章推荐的一个处理流可能从rss抓取文章内容 然后发布到article topic;进一步处理可能规范化或者重复数据删除生成一个清洗过的article content;最后阶段可能试图匹配这个内容给用户。

    这从个人topic创建了实时数据流图;storm和Samza是实现这类转换的工具。

    6.event sourcing 事件溯源。

    事件溯源是一种设计模式,状态改变被记录为一系列记录。Kafka支持大量的日志数据,非常适合做这种类型数据的后端。

    7.Commit Log

    kafka可以做为一种分布式系统外部commit-log工具。日志帮助复制节点间的数据,为失败节点重新存储数据的重同步机制。日志压缩功能帮助支持这种用途,类似于apache bookkeeper项目。

    三、配置与搭建。

    1.单机

    (1)解压

    (2)tar -xzf kafka_2.9.2-0.8.1.tgz

    (3)启动:bin/zookeeper-server-start.sh config/zookeeper.properties

    bin/kafka-server-start.sh config/server.properties

    (4)建立topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    bin/kafka-topics.sh --list --zookeeper localhost:2181

    也可以配置broker自动创建topic,当提交到不存在的topic时。

    (5)发送消息

    Kafka提供了命令行工具从文件获取输入或者标准输入 ,作为消息发送到kafka集群,默认一行一个消息。

    标准输入: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    (6)启动consumer.

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    所有命令行工具都有使用说明,不跟参数就会显示使用说明。

    2.建立集群。对kafka来说单实例,就是集群中起了一个实例。多broker就是多启动几个代理就行。

    (1)配置

    cp config/server.properties config/server-1.properties

    cp config/server.properties config/server-2.properties

    config/server-1.properties:
        broker.id=1
        port=9093
        log.dir=/tmp/kafka-logs-1
     
    config/server-2.properties:
        broker.id=2
        port=9094

       log.dir=/tmp/kafka-logs-2

    broker.id是集群中每个节点的唯一和永久名称。重载端口号和日志位置(因为我们在同一台机器上)。

    (2)启动另外两台。

    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &

    (3)创建一个新topic,副本设置为3.

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

    (4)看每个broker做什么。

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

    Topic:my-replicated-topic  PartitionCount:1   ReplicationFactor:3  Configs:

       Topic: my-replicated-topic   Partition: 0  Leader: 2  Replicas: 2,1,0   Isr: 2,1,0

    第一行是所有Partition的总结。后面的行是每个partition一行。

    Leader---node编号。Replicas:node list(所有的列表-包括不是alive)."isr" is the set of "in-sync" replicas.(alive 和追赶leader的replicate).

    (5)发送新消息:

    
    

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

    (6)测试容错性,杀掉server-1. (Leader).

    ps -ef |grep server-1.properties                 kill.

     bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

    Topic:my-replicated-topic  PartitionCount:1  Replication   Factor:3   Configs:

           Topic: my-replicated-topic   Partition: 0   Leader: 2   Replicas: 2,1,0   Isr: 2,0

    (7)生态:

    有大量的主发行版意外的与kafka整合的工具,生态系统页列出了很多,包括:数据流系统、hadoop集成、监控、部署工具。

    3.API:

    (1)Producer API

    class kafka.javaapi.producer.Producer<K,V>
    {

     public Producer(ProducerConfig config);

    //发送数据到一个topic,用key partition,使用同步或者异步的Producer.message是封装了key-message数据的生产者数据对象。

         public void send(KeyedMessage<K,V> message);

     //发送数据到多个topic.
     public void send(List<KeyedMessage<K,V>> messages);
    
      public void close(); //关闭生产者池到所有broker的链接。
    

    }

    (2)high level consumer api:

    class Consumer { //config最少要指定,consumer的group id和zookeeper的连接。
    

    public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(config: ConsumerConfig);

    }
    public interface kafka.javaapi.consumer.ConsumerConnector {

    //为每个topic创建type T创建消息流。topicCountMap是topic和#stream的map对;decoder把message转换为T;返回topic和kafka stream的列表(列表大小是#stream,每个stream支持迭代message/offset对)。

    public <K,V> Map<String, List<KafkaStream<K,V>>>    createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

    //用默认的decoder为每个topic生成类型T的事件流列表。

    public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);

    //为匹配通配符的topics,生成消息流。numStreams返回流的数量。

    public <K,V> List<KafkaStream<K,V>>    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

    //用默认decoder.

    public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

    //返回一个stream.

    public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);

    
    

    //Commit the offsets of all topic/partitions connected by this connector

    public void commitOffsets();

    //关闭连接。

    public void shutdown();}

    (3)class kafka.javaapi.consumer.SimpleConsumer {

    //从topic获取消息集合

    public FetchResponse fetch(request: kafka.javaapi.FetchRequest);

    //获取topic的metadata.   request指定versionid/clientid/seqence of topics.

    public kafka.javaapi.TopicMetadataResponse send(request: kafka.javaapi.TopicMetadataRequest);

    //返回有效的offset列表(知道maxsize),before given time.

    public kafka.javaapi.OffsetResponse getOffsetsBefore(request: OffsetRequest);

    public void close();}

    大多数应用high level api足够好,一些应用想要一些特性(high level consumer没有暴露).(如重启consumer重设定初始offset.) 这就可以用我们底层的SimpleConsumer,逻辑有点复杂。

    (4)kafka hadoop consumer api.

    基本用例:聚合和加载数据到hadoop.支持这个特性,提供一个hadoop-based的consumer,生成大量的map任务,并行的从kafka集群拉取数据。提供了十分快速的拉取hadoop数据的能力。(我们能填满网络,用很少的kafka集群)。

    四、配置项多详见http://kafka.apache.org/documentation.html#gettingStarted

    1.broker configs:  分必要的配置和topic-level配置。

    (1)必要的配置:broker.id / log.dirs/zookeeper.connect.

    scala class:kafka.server.KafkaConfig 类中有详细内容。.

    
    

    (2)topic-level配置。主题相关的配置既有全局的默认值,也有可选的每个主题的覆盖。

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1         --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1 (可选的覆盖通过--config指定).

    后来修改可选覆盖。-alter.

    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic     --config max.message.bytes=128000

    移除覆盖

    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic     --deleteConfig max.message.bytes

    2. Consumer Configs

    (1)group.id:consumer进程属于的组字符串唯一标识,通过设置相同的id表示多个进程属于相同consumer group.

    zookeeper.connect.           hostname1:port1,hostname2:port2,hostname3:port3.

    consumer.id不设置自动生成。

    (2)在scala的kafka.consumer.ConsumerConfig.

    3. Producer Configs

    (1)必要的配置:

    metadata.broker.list  为了启动,producer用它来获取元数据(topic/partition/replica).  建立链接发送实际数据,建立的链接基于metadata返回的broker信息,The format is host1:port1,host2:port2,可以是brokers子集,也可以是vip.

    request.required.acks. 控制何时认为producer请求完成,配置有多少broker 确认数据到Log。0--表示不等待确认(最低的延迟和最弱的持久化保证);1--leader接收数据后返回确认信息;-1---所有的in-sync replicas 接收。

    producer.type:sync. 是否异步. async(后台线程,允许客户端机器扔掉数据)/sync.

    serializer.class:kafka.serializer.DefaultEncoder.          The default encoder takes a byte[] and returns the same byte[].

    (2)kafka.producer.ProducerConfig.

    4.new Producer configs:未来会替换成新的配置,现在是beta版。

    bootstrap.servers

    acks

    ...

    四、设计

    1.Motivation

    2.Persistence

    (1)不要怕存储慢。顺序读跟内存差不了多少。jmm:内存耗费大,经常是硬盘存储的两倍;java随着堆数据增加,回收越来月琐碎和缓慢。

    (2)常量存取时间。B树在硬盘的存取时间并不快(随机取数);简单读取和追加可以在O(1)。  常数时间让我们可以存放更多的数据,而不是尽快删除,有了更多的灵活性。

    3.Efficiency

    低效的两个操作:小io和字节拷贝(byte copying).

    小io通过批量操作解决。

    字节拷贝通过解决。高负载下这个消耗显着,通过在producer/broker/consumer都使用标准的的二进制格式来避免这一点。broker维护的消息日志只是一些文件的目录,每个文件由一系列消息集组成,消息集以生产者和消费者都使用的二进制格式写到磁盘上。维护通用格式允许最大优化:持久化日志块的网络传输。

    正常拷贝4步2系统调用,这种sendfile可以让pagecache 直接到network.只需要在最后拷贝1次。

    我们希望一个topic多个消费者,用上面的zero-copy优化,数据拷贝到pagecache一次,在每次消费重用。(可以以接近网络限制的速度消耗掉)。

    端到端批量压缩:支持snappy和gzip.(通过一个集合压缩,提高压缩比)。

    4.The Producer

    (1)Load balancing负载均衡。kafka让producer直接发送数据到broker(partiton leader),不用任何路由。让producer这样做,需要请求元数据信息:哪些server还存活,一个topic的哪些partition是leader。

    客户端控制将消息发送到哪个分区,可以用随机实现(实现一种随机的负载均衡或者语义分区),暴漏了语义分析的接口(提供key, hash 到一个partition).这可以反过来让consumer做出本地的假设,这种风格被消费者设计成本地敏感的处理。

    (2)Asynchronous send 异步发送。可以配置积累指定数量的消息或者指定的延迟。

    5.The Consumer

    可以指定offset,获取之后的一段日志块,必要时可以重新消费。

    (1)push and pull.   consumer-pull 还是broker -push. -------kafka遵从了从producer push,从consumer pull设计。而scribe/flume采用的是push设计。

    两种设计都有优缺点:目标都是让消费者最大速率,而push方式可能会造成消费者不堪重负。  另一个优点:适合于消费者批处理聚合数据。(push一次一个,pull修复)

    不足:没有数据时,消费者在无用的忙碌,等待数据到来;避免这种情况,允许消费者阻塞在一个长poll等待直到数据到来。

    更可靠(不需要生产者的持久性)。

    (2)consume position.

    追踪哪些被消费是消息系统的关键点;许多消息系统扩展性差立即删除是务实的选择。

    broker和consumer达成协议--哪些被消耗并不明显。如果每次消费者消费掉后就删掉数据(消费者死掉,可能会丢失数据,为解决这个问题--消息系统都加了确认机制);这种策略解决了丢数据问题,引发了新的问题:在发送确认前失败会导致处理两次消息;性能问题--对每个消息保留多个状态。

    kafka的处理机制不同:topic分为完全有序的partiiton集合,每个partition在一个时刻只有一个consumer消费。这意味着-consumer在每个partiiton的位置仅仅是一个单一的整数(下一个要处理的消息位置),这让标记什么已经被消耗的状态耗费小(每个分区一个状态);这个状态可以周期性的checkpoint,使得消息确认确认很便宜。

    另外的好处:倒回,重新消费数据。

    (3)offline dataload

    扩展的持久性,提供批量加载可能性(批量到hadoop或数据仓库)。

    在hadoop应用,分散加载在map任务中,one for each node/topic/partition combination,充分并行;hadoop提供了任务管理,重跑不会重复数据,只是从原来的位置重新启动。

    6.Message Delivery Semantics

    (1)多个可能的消息语义保证:至多一次(失败不重传);至少一次(不会丢失,可能重传);精确一次。  

    值得拆成另外两个问题:发布消息的持久化保证和消费消息的保证。

    (2)持久化:kafka只要一个broker存活就可以继续用。遇到网络错误,可以类似数据的key保证,使得生产者重试是幂等的(在将来版本添加)。大多数用例不需要这么完全的保证。

    (3)消费消息:consumer控制position. 如果consumer从不失败,可以内存保存位置;如果可能失败--几个方案更新:

    ----可以在日志中保存处理的位置,然后处理消息。有可能保存了offset,但是获取处理消息结果前进程失败。会丢失几条消息----at most once语义保证。

    ---可以读取消息、处理消息、保存position.  处理完了没有保存position.  ----at least once.

    ---exactly once:不是消息系统特性,需要与实际存储结果协调consumer的位置。经典的两阶段commit.保存结果和保存position时。可以简单的处理将Offset保存在某个地方(因为很多系统不支持两阶段commit).  这方面的一个例子,我们etl工具在hdfs存储offset,随着读取数据,offset或者都更新或者都没有。

    保证至少一次投递:禁用生产者重试、提交offset先于处理数据。exactly-once:需要目标系统的协作;但是Kafka提供offset让这个实现直接。

    7.Replication

    复制的单元是topic partition.

    kafka node存活条件:必须能在zookeeper维持链接;如果是slave必须去复制Leader. 满足这两个条件叫'in sync',而不是叫'alive‘。leader跟踪in sync node. (死/卡/跟不上,会移除这个node).只能处理那些突然失败。

    commited---所有in syc replicas都提交。只有commited log才给consumer. 生产者需要在延迟和持久性权衡---设置request.required.acks 是生产者的偏好。

    (1)Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)

    复制Log让kafka专注队列实现即可。选了确认的数量,选取Leader的比较数量-quorums.

       对这种commit决定和leader选举权衡的通用方案-使用大多数。 好处--取决于比较快的机器的反馈。(很多这类优秀算法用于zookeeper的zab,raft, viewstamped replication;kafka的实际实现是用的ms的pacificA).缺点:不用很多失败就没有候选人了。(一个失败需要3个副本),这也是只有zookeeper这类软件使用,hadoop namenode高可用并没有采用原因。

       kafka采取了略微不同的算法:kafka维护一个isr---可以跟上leader的列表。这个列表的单元才会选取leader. 这种方式可以容忍N-1的失败。复制因子和吞吐量的权衡。

       kafka重要设计区别:kafka不要求失败的节点所有数据完整无误的恢复。建立在稳定存储(在失败恢复不丢数据)假设之上。两个主要问题:硬盘错误是最经常出现的问题,不会让数据不完整;即使这没有问题,没有调用fsync提高性能,我们的协议让replica追上的时候再加入isr.

    (2)Unclean leader election: What if they all die? 等待一个isr副本存活;选择第一个活过来的replica。现在版本用后面的,以后改成可配置。  

    (3)Replica Management  副本管理。平衡压力。

    可用性关键窗口--优化leader选举。由一个broker选为controller负责选举,批量选举,提高速度。

    8.Log Compaction 。  让kafka总是能为每个message key保留至少最后已知值(在一个topic partition的数据日至中)。表达用例:应用坏掉后的状态重建,应用重启重新加载cache.

    之前的保存策略---固定时间周期/到达设定的大小,适合于临时事件数据。对一类数据---有主键的/可变的数据(如数据库改变)。

        123 => bill@microsoft.com
    
        123 => bill@gatesfoundation.org

       123 => bill@gmail.com

    保证至少保留最近一次修改,可以重建状态,不用保留所有变化。

    (1)使用场景:

    数据库更改订阅。变化的改变需要反应到cache中/hadoop/solr中,只需要最新日志。(但是如果重建--需要完整数据集).

    事件溯源.

    高可用日志.Samza.

    (2)这些场景实时处理改变,但是有时候机器坏掉需要重建,重新处理,重新加载;实时处理和重建恢复两种情况,log compaction在相同的后端topic都支持。日志的这种使用风格,见http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying。

    (3)思想很简单:保存所有变化日志,就能随时重建;但是这个假设的完整log并不适用,即便对稳定的数据集随着日志没有边界的增长更新一条记录会有多次。更简单的保持策略:扔掉old update缩减空间,但是日志不再是简单的恢复当前状态。

    log compaction是一种更细的每条记录的保存机制,而不是比较粗的基于时间的保存。思路是:有选择的删除记录,我们有每个记录更新的更新;这种方式每条记录有最新的状态。

    retention policy可以每个topic设置;可以看到一个集群上的topic可以有基于时间保留的、基于大小保留的、基于compaction保留的。

    这个架构起源于Linkedin 最早的和最成功的的底层架构-一个数据库改变缓存服务(databus).不向其他基于Log的存储系统--kafka专门为订阅和快速线性读写组织数据。

    (4)log-compaction基础.

    log compaction打删除标记,后台自动删除。 多次修改也在后台通过复制log进行,可以通过控制吞吐量避免影响producer和consumer.

    (5) log compaction 提供的保证。

    每个跟上头部的consumer会有顺序的offset.

    log顺序还会保证(不会重排序,只会删除某些日志)。

    offset不会变,它是log的永久标识。

    任何从0offset读的处理会看到至少所有记录的最后状态。【假设reader到达log头(且在一个时间范围内 delete.retention.ms 配置参数),所有的删除标记会被看见;】这很重要,删除标记不会删除,只要reader没读过它。

    任何consumer从日志头处理,会看到所有记录的最终状态。

    (6)Log Compaction Details

    log compaction有 log cleaner处理,后台线程池重新复制日志线程段,removing之前已经出现的log.每个compactor进程工作原理:选择head-tail中元素占比最多的log;为每个Key创建简洁的摘要在日志头;从头到尾重新复制log(删除后面还会出现的Log);summary of the log head仅仅是一个压缩存储的hash表。

    (7)配置logcleaner. log.cleaner.enable=true; log.cleanup.policy=compact;

    log compaction限制:不能配置还有多少日志没有compact;还不能与压缩的topic兼容。

    五、实现

    (一).API

    1.producer:新producer包装了同步和异步producer的底层。

    (1)可以处理多个producer的queueing/buffering请求,batch data的异步调度,kafka.producer.Producer提供了多个异步调度能力(producer.type=async)。(在序列化和调度之前)。 调度的规模可以配置---queue.time; batch.size ;这两个配置到了就一起发送。kafka.producer.async.ProducerSendThread解队列数据,kafka.producer.EventHandler发送数据到合适的broker的合适partition.

    一个定制的事件处理可以通过event.handler配置。在producer pipeline中注入callback很重要(注入定制的log/跟踪代码;定制的监控)。 通过实现kafka.producer.async.CallbackHandler接口和配置callback.handler参数给class.

    (2)处理序列化数据,通过用户定义的encoder.

    interface Encoder<T> {
      public Message toMessage(T data);
    }  默认的是不操作的kafka.serializer.DefaultEncoder。
    (3)提供基于zookeeper的自动broker发现。
    通过  zk.connect 参数获得。对一些应用对zookeeper的依赖不合适,这种情况下可以配置  broker.list  。这种情况下broker随机选择一个broker的partition,这种情况下broker down请求就失败。
    (4)通过自定义的partitioner 软负载。 路由决定受kafka.producer.Partitioner影响。
    interface Partitioner<T> {
       int partition(T key, int numPartitions);
    }使用key和分多少区,返回一个partitionid. 这个id被用作index.默认hash(key)%numPartitions。若key 为null,用随机数。可以用partitioner.class插入。
    2.consumer. 
    (1)low-level。 维持对一个broker的连接和相应网络关闭请求,完全无状态,每次传送offset,允许用户保存他们选择的元数据。
    class SimpleConsumer {
        public ByteBufferMessageSet fetch(FetchRequest request); 
        public MultiFetchResponse multifetch(List<FetchRequest> fetches);
        public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);}
    底层api用于实现高层api,也会被一些我们的离线consumer使用(如hadoop consumer)-对维护状态有特别请求。
    (2)high-level.隐藏broker细节,允许不考虑底层拓扑离开集群consumer,也会维护已经消费的状态。高层api提供订阅topic,匹配filter expression的能力。
    ConsumerConnector connector = Consumer.create(consumerConfig);
    interface ConsumerConnector {
        public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
        public List<KafkaStream> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
        public commitOffsets();
        public shutdown();}
        这个api围绕iterator,由KafkaStream 实现,每个KafkaStream 代表从一个或多个kafka server的一个或多个partitions的消息流。每个流用于单线程处理,因此client可以在create call中提供想要的流的数量。这样一个流可以代表多个server patitions的merge(相应的处理线程的数量),但是一个partiiton仅会到一个stream.
    createMessageStreams 调用会注册topic消费,这会导致consumer/broker的重新分配。这个api鼓励创建多个topic streams,最小化这个rebalancing.createMessageStreamsByFilter 注册watchers 发现新的topic符合filter.返回的每个流的message可能来自于多个topic.
    (二)网络层。网络层是很直接的NIO服务器,不会详细说。sendfile实现通过MessageSet 接口的writeTo方法。这使得基于文件消息集 使用有效的transferTo实现而不是in-process缓存写。
        线程模型是一个单一的acceptor和N个 processor线程每个处理固定大小的连接。(设计经过了广泛测试高效。)协议简单,易于其他语言实现。

    (三)消息层
    消息由固定大小的header和变长的opaque字节数组负载。header由一个标准的版本和CRC32校验码(检测corruption和trunction)。留下opaque payload是正确决定:现在的序列化库有很大的进步,任何选择都不能满足所有情况。不用说kafka的特定应用会要求一个特殊的序列化类型,messageSet接口是messages上的简单迭代器有特殊的方法来通过NIOchannel块读写.

    (四)消息格式
    /**
         * A message. The format of an N byte message is the following:
         *
         * If magic byte is 0
         *
         * 1. 1 byte "magic" identifier to allow format changes
         *
         * 2. 4 byte CRC32 of the payload
         *
         * 3. N - 5 byte payload
         *
         * If magic byte is 1
         *
         * 1. 1 byte "magic" identifier to allow format changes
         *
         * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
         *
         * 3. 4 byte CRC32 of the payload
         *
         * 4. N - 6 byte payload
         *
         */
    (五)log
    1.一个my_topic的2个分区的topic.由两个目录组成 my_topic_0 andmy_topic_1 ,内有很多文件包含topic的消息。log文件的格式是一系列Log entries.
    (1)每个log entry组成:
        4byte的整数N,存储消息长度
        N个消息字节。
    (2)每个消息由64bit的offset 给出byte position.   给到这个topic这个partition的消息流头部。硬盘存储格式。
    message length : 4 bytes (value: 1+4+n)
    "magic" value  : 1 byte
    crc            : 4 bytes
    payload        : n bytes
    mesage格式由标准接口维护。message set可以在producer/broker/client传输不用recopying和转换。

    (3)每个logfile的命名由本file包含的第一条消息的offset命名。    第一个文件就是 00000000000.kafka。
    并且每个额外的文件从以前文件有一个整数名大约S字节;S是配置的最大log file size.
    (4)使用消息的offset作为消息的id是不正常的。初步的想法是使用producer生成的GUID,在每个broker维护一个GUID-offset的hash.但是因为每个consumer必须维护每个server的ID,GUID的全局唯一性没有价值。维护guid-offset需要重量级的索引结构(必须同步到磁盘,必须要一个持久化的随机访问结构),为了简化lookup结构,我们决定用一个简单的每个parition的atomic counter,需要加上Node_id和partition_id一起才能标识消息;这让查找结构简单(尽管每个consumer的多个查找是可能的);这样一旦用了counter跳跃用offset很自然,毕竟单调递增整数对一个partiiton是唯一的。因为offset隐藏的consumer api后,这个细节完全是实现细节。
    2.写
    log 允许serial append,追加到最后一个文件,到达一个配置的大小就会滚动到一个新文件。两个配置参数:一个M条消息fush到磁盘;S多少秒后强制刷新。持久化保证(最多丢失M条消息或者S秒内的文件)。

    3.读
    (1)给64bit的offet和S-byte的最大chunksize(块大小),这回返回一个迭代器包含在S-byte buffer中。S要大于任何一个消息的大小;如果特别大的消息--读取会尝试多次,每次buffer size翻倍,直到消息完全读出;maximum message和buffer size可以指定让server拒绝大于指定大小的消息,并且给一个最大限制需要读取一个完整消息;可能buffer里以部分消息结束,易于通过大小delimiting检测出。
    (2)实际读,从segment-file定位offset,计算基于文件的offset,从文件的那个offset读取,通过为每个文件维护的内存范围搜索通过简单的二分查找。
    (3)log提供获取最新写入日志的能力(让客户端从现在订阅),在consumer没有消费SLA指定天数内的数据中也是有用的。试图消费不存在的offset,给出OutOfRangeException,可以失败,也可以复位。
    (4)sent to consumer格式。
    MessageSetSend (fetch result)
     
    total length     : 4 bytes
    error code       : 2 bytes
    message 1        : x bytes
    ...
    message n        : x bytes
    MultiMessageSetSend (multiFetch result)
     
    total length       : 4 bytes
    error code         : 2 bytes
    messageSetSend 1
    ...
    messageSetSend n
    4.删除
    一个时间删除一个日志段;log manager允许可插入的日志删除策略,选择哪些文件适合删除;现在的策略是N天前Log删除或者NGb大小。避免删除时阻塞读取,会删除时用copy-on-write segment-list实现提供一致性视图,允许二分查找静态snapshot视图。

    5.保证
    如果大小和以及offset小于文件的长度以及CRC32符合message entry有效。
    需要处理两类损坏:由于crash为写入的块截断;无用的块添加到文件末尾。(oS不保证导致)

    (六)Distribution分布
    1.ZooKeeper Directories . 下面给出consumer和broker的zookeeper结构。
    2.Notation
    path的元素记为 [xyz] ,表示xyz不固定,对每一个值都是一个znode.​ [0...5]  代表子目录0 1 2 3 4.  -> 表示znode的内容。 /hello -> world  表示hello节点的内容是world.

    3.Broker Node Registry
    /brokers/ids/[0...N] --> host:port      (ephemeral node)
     配置在配置文件中。这种方式让broker迁移到另一个节点不影响consumer.
     4.Broker Topic Registry 
    /brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)            [0..N]是broker.
     5.Consumers and Consumer Groups
    consumer也注册在zookeeper中,用于平衡数据消费,跟踪他们消费的每个patition的offset.
    多个consumer可以组成一个组,一起消费一个topic.一个组内每个consumer给一个共享的group_id. group_id需要在consumer的配置文件中指出.
    一个组内的多个consumer尽可能公平的分割partition,每个partition由一个consumer消费。
     6.Consumer Id Registry
    /consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)
    group_id全局唯一的,consumer_id是临时的唯一的id.(hostname:uuid).
    group内的每个consumers,用consumer_id创建一个znode. value of znode由 <topic, #streams>.的map组成。这个Id只是简单表示group内哪个是活跃的。如果consumer死掉,就会消失。
     7.Consumer Offset Tracking
    /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)
    已经消耗的最大offset.

     8.Partition Owner registry.
    /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
    一个partition由一个consumer消耗。consumer必须建立对指定partition的占有关系。
     9.Broker node registration
    broker nodes基本上是独立的,仅仅发布他们有什么消息。当一个broker加入,注册自己在broker下写下hostname:port. broker还会注册其上的topic列表,broker topic注册的逻辑分区。new topics 动态创建。
     10.Consumer registration algorithm(consumer启动)
    (1)在group下注册自己consumer_id.
    (2)对改变(加入或离开)注册一个watch,每个改变都会引发reblancing.
    (3)注册broker改变(加入或离开)的watch,在broker_Id注册下。每次改变引发reblancing.
    (4)如果consumer用filter创建  message stream .还会创建watch对改变(topic变化)。(Each change will trigger re-evaluation of the available topics)
    (5)强制自己在一个组内平衡。
     11.Consumer rebalancing algorithm 允许组内的所有consumer对哪个cosumer消耗哪个partition达成共识。
    consumer reblance发生时机:broker node的添加删除和同一组内的其他consumer.  给定的topic和组,broker partitions会在组内的consumer均匀分配,一个partiiton总是由一个consumer来消费,设计简化了实现。(如果让并发访问一个partition,会有竞争和锁定问题)。如果consumers比partition多,一些consumer永远不会有数据。在reblancing过程中,我们我们试图分配partition给一个consumer,降低连接的broker数量。    

    Each consumer does the following during rebalancing:

       1. For each topic T that Ci
    subscribes to
       2.   let P
    T
    be all partitions producing topic T
       3.   let C
    G be all consumers in the same group as Ci
    that consume topic T
       4.   sort P
    T
    (so partitions on the same broker are clustered together)
       5.   sort C
    G   6.   let i be the index position of Ci in CG and let N = size(PT)/size(CG
    )
       7.   assign partitions from i*N to (i+1)*N - 1 to consumer C
    i   8.   remove current entries owned by Ci
    from the partition owner registry
       9.   add newly assigned partitions to the partition owner registry
            (we may need to re-try this until the original partition owner releases its ownership)

    一个consumer会引发同组的其他consumer同时reblancing.
     六、操作

    1.基本操作

    (1)Adding and removing topics

     bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
           --partitions 20 --replication-factor 3 --config x=y
    如果设置成自动创建,自动创建的一些参数需要设定。
    (2)Modifying topics
     bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name  --partitions 40
    add config: 
        bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
    remove config:
        bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
    delete topic: 
        bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
    现在Kafka不支持降低partiitons,以及不支持修改replication factor.
    (3)Graceful shutdown关闭。 用关闭命令不是kill,会自动同步;用 controlled.shutdown.enable=true控制leadership迁移。、
    (4)Balancing leadership 。重启后的机器只能做Follower不利于读写。
    bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot。
    设置auto.leader.rebalance.enable=true为自动的。

    (5)Mirroring data between clusters 。读取一个或多个源到一个目的地。
    用例:数据中心备份。
    bin/kafka-run-class.sh kafka.tools.MirrorMaker
           --consumer.config consumer-1.properties --consumer.config consumer-2.properties
           --producer.config producer.properties --whitelist my-topic           (可以启动多个线程,提高吞吐量和容错性)。
    目标集群的partition/replication/offset都可以不同。  --whitelist 指定要复制的topic.(可以使正则表达式)。  --blacklist 黑名单。

    目标配置auto.create.topics.enable=true 
    (6)Checking consumer position
    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
    (7)Expanding your cluster
    添加节点很简单,就是给一个broker.id, 启动就可以了。但是不会自动分配partition,只能move过去,不能工作直到新的topic建立。手动迁移的过程会完全自动化。
    kafka新迁移的partition也是follower,允许完全复制那个partition的数据,加入in-sync 列表(一个已经存在的replica 会删除它的数据);
    partition reassignment可以用来在brokers间移动partition.0.8.1还不能自动获得分布且计算出如何移动。
    这个工具可以在3个互相排斥的模式下运行:--generate(对指定的topics移动所有的partition到新的brokers.)/--execute(指定--reassignment-json-file用自定义的重新分配方案 )/--verify(确认最后一个-execute的所有Partition的重新分配状态).
    ---Automatically migrating data to new machines 可以移动一些topics离开现在的broker集合到新加的brokers. 移动整个topic而不是partition. 需要提供topic-list和broker-list.
        移动完了这些topic只在目标的topic列表里。 接收的topic-list以json文件列出。 
        > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate 
        这时候还没有移动。上面产生移动方案。
        bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute 移动方案存在json文件中。原来的replica-replica保存,为了rollaback时用。
        bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify  确认迁移的状态。
     ------Custom partition assignment and migration
        cat custom-reassignment.json
         bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
        bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
    (8)Decommissioning brokers
    重新分配工具还不能自动生成重新分配计划为了decommissioning. 必须为这个decommison broker生成移动计划。 有点繁琐,需要将所有的replica都要移动。这个移动计划会在0.8.2添加进来。

    (9)Increasing replication factor
    cat increase-replication-factor.json
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

    2.datacenters.数据中心。

    建议kafka之间复制,应用都采取本地策略。可以采取一个所有中心的聚合集群。

    提高tcp socket包大小,用 socket.send.buffer.bytes和socket.receive.buffer.bytes配置。不建议一个集群跨机房。

    3.kafka配置

    (1)重要的client配置。

    最重要的生产者配置:

    • compression 压缩
    • sync vs async production 生产者提交方式
    • batch size (for async producers) 批量大小

    最重要的consumer size是fetch size.    

    (2)A Production Server Config

     Here is our server production server configuration:
    # Replication configurations
    num.replica.fetchers=4
    replica.fetch.max.bytes=1048576
    replica.fetch.wait.max.ms=500
    replica.high.watermark.checkpoint.interval.ms=5000
    replica.socket.timeout.ms=30000
    replica.socket.receive.buffer.bytes=65536
    replica.lag.time.max.ms=10000
    replica.lag.max.messages=4000
     
    controller.socket.timeout.ms=30000
    controller.message.queue.size=10
     
    # Log configuration
    num.partitions=8
    message.max.bytes=1000000
    auto.create.topics.enable=true
    log.index.interval.bytes=4096
    log.index.size.max.bytes=10485760
    log.retention.hours=168
    log.flush.interval.ms=10000
    log.flush.interval.messages=20000
    log.flush.scheduler.interval.ms=2000
    log.roll.hours=168
    log.cleanup.interval.mins=30
    log.segment.bytes=1073741824
     
    # ZK configuration
    zk.connection.timeout.ms=6000
    zk.sync.time.ms=2000
     
    # Socket server configuration
    num.io.threads=8
    num.network.threads=8
    socket.request.max.bytes=104857600
    socket.receive.buffer.bytes=1048576
    socket.send.buffer.bytes=1048576
    queued.max.requests=16
    fetch.purgatory.purge.interval.requests=100
    producer.purgatory.purge.interval.requests=100

    Our client configuration varies a fair amount between different use cases.

    (3)java 版本 1.6以后
    java -server -Xms3072m -Xmx3072m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
         -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSConcurrentMTEnabled -XX:+CMSScavengeBeforeRemark
         -XX:CMSInitiatingOccupancyFraction=30
         -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution
         -Xloggc:logs/gc.log -Djava.awt.headless=true
         -Dcom.sun.management.jmxremote -classpath <long list of jars> the.actual.Class

    4.硬件和系统os.

    (1)内存:需要用大量的内存来缓存活跃的reader和writer.可以做内存的简单估计,假设希望缓存30s计算内存需要写吞吐量*30.

    硬盘吞吐量比较重要,linkedin用8*7200磁盘,硬盘是瓶颈,越多硬盘越好。依赖你配置的刷新策略,可能不会从贵的硬盘得到好处(更频繁flush,RPM SAS磁盘会好一些)

    (2)OS:linux/unix. 系统层的配置修改。提升文件句柄数量(因为有大量的topic/连接);提升最大的socket buffer size确保数据中心间数据传输。

    5.磁盘和文件系统

    (1)用多个磁盘,不要让Kafka和系统常用磁盘共享。0.8后可以raid,或者format和加载每个磁盘作为自己的目录。

    可以提供round-bin 数据目录,每个partition会完整的在其中一个目录。  (或者用raid)

    (2)Application vs. OS Flush Management

    建议用默认的flush策略(整个禁用application fync). 意味着使用Kafka和os的flush策略。

     (3)Understanding Linux OS Flush Behavior
    linux pdfflush是可配置的,
     cat /proc/meminfo
     (4)Ext4 Notes。调优
    data=writeback。 默认是data=ordered.  
    Disabling journaling
    commit=num_secs.  小,较少丢数据概率;大,吞吐量。
    nobh (order设置)
    delalloc(知道物理写入的时候才分配块)帮助顺序读写。

    6.监控

    (1) kafka使用 Yammer Metrics ,配置成可插入的配置连上监控系统。

    DescriptionMbean nameNormal value
    Message in rate "kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics"  
    Byte in rate "kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"  
    Request rate "kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"  
    Byte out rate "kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics"  
    Log flush rate and time "kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"  
    # of under replicated partitions (|ISR| < |all replicas|) "kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager" 0
    Is controller active on broker "kafka.controller":name="ActiveControllerCount",type="KafkaController" only one broker in the cluster should have 1
    Leader election rate "kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats" non-zero when there are broker failures
    Unclean leader election rate "kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats" 0
    Partition counts "kafka.server":name="PartitionCount",type="ReplicaManager" mostly even across brokers
    Leader replica counts "kafka.server":name="LeaderCount",type="ReplicaManager" mostly even across brokers
    ISR shrink rate "kafka.server":name="ISRShrinksPerSec",type="ReplicaManager" If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.
    ISR expansion rate "kafka.server":name="ISRExpandsPerSec",type="ReplicaManager" See above
    Max lag in messages btw follower and leader replicas "kafka.server":name="([-.w]+)-MaxLag",type="ReplicaFetcherManager" < replica.lag.max.messages
    Lag in messages per follower replica "kafka.server":name="([-.w]+)-ConsumerLag",type="FetcherLagMetrics" < replica.lag.max.messages
    Requests waiting in the producer purgatory "kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory" non-zero if ack=-1 is used
    Requests waiting in the fetch purgatory "kafka.server":name="PurgatorySize",type="FetchRequestPurgatory" size depends on fetch.wait.max.ms in the consumer
    Request total time "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics" broken into queue, local, remote and response send time
    Time the request waiting in the request queue "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"  
    Time the request being processed at the leader "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"  
    Time the request waits for the follower "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics" non-zero for produce requests when ack=-1
    Time to send the response "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"  
    Number of messages the consumer lags behind the producer by "kafka.consumer":name="([-.w]+)-MaxLag",type="ConsumerFetcherManager"
     
     
    (2)推荐监控gc time, server status --cpu/io等。 在client端监控 the message/byte rate (global and per topic), request rate/size/time, and on the consumer side, max lag in messages among all partitions and min fetch request rate。
    (3)audit:数据传输的正确性,衡量每条消息发送到消费的延迟,完整性没获得报警。
    7.zookeeper.
    (1)稳定版本:3.3.4
    (2)投入运行,安装。
    物理/硬件/网络布局:不在一个机架。
    io隔离:与应用在不用的磁盘上。
    应用隔离
    小心使用虚拟化
    ZooKeeper configuration and monitoring配置监控。确保有足够的heap size. JMZ和4 letter命令有用。
    不要过度建设集群
    使用3-5个node的集群。
  • 相关阅读:
    ASP.NET FAQ
    IIS的默认站点的ASP.NET选项消失的处理方法
    AJAX.NET使用基础
    关于CRM系统中员工,招商经理,招商专员等和代理商对于进销存系统信息查询的限制设计
    重装MSDTC
    做电子商务需要注意的问题
    .net开发的过程
    MVC3出现“提供程序未返回 ProviderManifestToken 字符串”的解决办法
    处理“数据库连接出错,请检查数据库名称及路径是否正确。”
    引用不到using System.Data.Entity.Database;(MVC3)
  • 原文地址:https://www.cnblogs.com/zlingh/p/4362645.html
Copyright © 2011-2022 走看看