什么是kafka?
Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成。
活动流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。
近年来,活动和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持。现在最新版本已经到2.x.x
kafka的架构
一个典型的Kafka体系架构包括若干Producer(可以是服务器日志,业务数据,页面前端产生的page view等等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer (Group),以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。Producer使用push(推)模式将消息发布到broker,Consumer使用pull(拉)模式从broker订阅并消费消息。
kafka的优点
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输
- 同时支持离线数据处理和实时数据处理
- Scale out:支持在线水平扩展
Kafka与传统MQ区别:
- 更快!单机上万TPS,高吞吐量、低延迟、高并发
- 持久性、可靠性。传统的MQ,消息被消化掉后会被mq删除,而kafka中消息被消化后不会被删除,而是到配置的expire时间后,才删除
- 传统的MQ,消息的Offset是由MQ维护,而kafka中消息的Offset是由客户端自己维护
- 分布式,可扩展性容错性,把写入压力均摊到各个节点。可以通过增加节点降低压力
为何要用消息系统
- 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
- 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
- 灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
- 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
- 缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
- 异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
常用Message Queue对比
- RabbitMQ
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
- Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
- ZeroMQ
ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。
- ActiveMQ
ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
- Kafka/Jafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
关键概念
- Broker(服务代理)
Kafka集群包含一个或多个服务器,这种服务器被称为broker。每个broker通常就是一台物理机器,在上面运行kafka server的一个实例,所有这些broker实例组成kafka的服务器集群。每个broker会给自己分配一个唯一的broker id。broker集群是通过zookeeper集群来管理的。每个broker都会注册到zookeeper上,有某个机器挂了,有新的机器加入,zookeeper都会收到通知。在0.9.0中,producer/consumer已经不会依赖Zookeeper来获取集群的配置信息,而是通过任意一个broker来获取整个集群的配置信息。
- Topic(话题)
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,Topic这是一个逻辑上的概念,而Partition是物理上的概念。(物理上,不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。所以我们在谈论topic往往和partition一起讨论,两者有紧密的联系:
每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。
- Partition(片)
Parition是物理上的概念,每个Topic包含一个或多个Partition,不同Partition可位于不同节点(kafka server实例)。同时每个Partition在物理上对应一个本地文件夹,每个Partition包含一个或多个Segment,每个Segment包含一个数据文件和一个与之对应的索引文件。在逻辑上,可以把一个Partition当作一个非常长的数组,可通过这个“数组”的索引(offset)去访问其数据。
每一条消息被发送到broker中,会根据partition规则(有默认规则,当然也可以自定义规则)选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量,当然可以在topic创建之后去修改partition的数量。
在发送一条消息时,可以指定这个消息的key,producer根据这个key和partition机制来判断这个消息发送到哪个partition。partition机制可以通过指定producer的partition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。
一方面,由于不同Partition可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于Partition在物理上对应一个文件夹,即使多个Partition位于同一个节点,也可通过配置让同一节点上的不同Partition置于不同的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。
利用多磁盘的具体方法是,将不同磁盘mount到不同目录,然后在server.properties中,将log.dirs设置为多目录(用逗号分隔)。Kafka会自动将所有Partition尽可能均匀分配到不同目录也即不同目录(也即不同disk)上。注:虽然物理上最小单位是Segment,但Kafka并不提供同一Partition内不同Segment间的并行处理。因为对于写而言,每次只会写Partition内的一个Segment,而对于读而言,也只会顺序读取同一Partition内的不同Segment。
- Producer(生产者)
负责发布消息到Kafka broker,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。使用的更多的是第二种。
- Consumer(消费者)
消息消费者,向Kafka broker读取消息的客户端。在消费消息时只需要指明topic名称即可,不需要关心具体的消息存在了哪个borker上。注意,一个topic只能被一个group组中的一个cusmer来消费,但是可以被多个group组来同时消费;consumer只能修改被commit状态的消息;
发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到所有的consumer中。Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上。如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。更常见的是,每个topic都有若干数量的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。
由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个
相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。
在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。
Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。
- Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
-
replica/leader/follower
每个topic的partion的所有消息,都不是只存1份,而是在多个broker上冗余存储,从而提高系统的可靠性。这多台机器就叫一个replica集合。
在这个replica集合中,需要选出1个leader,剩下的是follower。也就是master/slave。
发送消息的时候,只会发送给leader,然后leader再把消息同步给followers(以pull的方式,followers去leader上pull,而不是leader push给followers)。
那这里面就有一个问题:leader收到消息之后,是直接返回给producer呢,还是等所有followers都写完消息之后,再返回? 关于这个请看下面博客中的kafka机制内容
关键点:这里replica/leader/follower都是逻辑概念,并且是相对”partion”来讲的,而不是”topic”。也就说,同一个topic的不同partion,对于的replica集合可以是不一样的。
比如 :
“abc-0” <1,3,5> //abc_0的replica集合是borker 1, 3, 5, leader是1, follower是3, 5
“abc-1” <1,3,7> //abc_1的replica集合是broker 1, 3, 7,leader是1, follower是3, 7
“abc_2” <3,7,9>
“abc_3” <1,7,9>
“abc_4” <1,3,5>
持久化文件删除策略
什么是数据分片
复制备份
kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower和consumer一样,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.(不同于其他分布式存储,比如hbase需要"多数派"存活才行)
leader选举
使用场景
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
- 消息系统:解耦和生产者和消费者、缓存消息等;
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
- 流式处理:比如spark streaming和storm;
消息队列的各种策略和语义
对于消息队列的使用,表面上看起来很简单,一端往里面放,一端从里面取。但就在这一放一取中,存在着诸多策略。
Producer的策略
是否ACK
所谓ACK,是指服务器收到消息之后,是存下来之后,再给客户端返回,还是直接返回。很显然,是否ACK,是影响性能的一个重要指标。在kafka中,request.required.acks有3个取值,分别对应3种策略:
request.required.acks
//0: 不等服务器ack就返回了,性能最高,可能丢数据
//1. leader确认消息存下来了,再返回
//all: leader和当前ISR中所有replica都确认消息存下来了,再返回(这种方式最可靠)
备注:在0.9.0以前的版本,是用-1表示all
同步发送 vs 异步发送
所谓异步发送,就是指客户端有个本地缓冲区,消息先存放到本地缓冲区,然后有后台线程来发送。
在0.8.2和0.8.2之前的版本中,同步发送和异步发送是分开实现的,用的Scala语言。从0.8.2开始,引入了1套新的Java版的client api。在这套api中,同步实际上是用异步间接实现的:
在异步发送下,有以下4个参数需要配置:
(1)队列的最大长度
buffer.memory //缺省为33554432, 即32M
(2)队列满了,客户端是阻塞,还是抛异常出来(缺省是true)
block.on.buffer.full
//true: 阻塞消息
//false:抛异常
(3)发送的时候,可以批量发送的数据量
batch.size //缺省16384字节,即16K
(4)最长等多长时间,批量发送
linger.ms //缺省是0
//类似TCP/IP协议中的linger algorithm,> 0 表示发送的请求,会在队列中积攥,然后批量发送。
很显然,异步发送可以提高发送的性能,但一旦客户端挂了,就可能丢数据。
对于RabbitMQ, ActiveMQ,他们都强调可靠性,因此不允许非ACK的发送,也没有异步发送模式。Kafka提供了这个灵活性,允许使用者在性能与可靠性之间做权衡。
(5)消息的最大长度
max.request.size //缺省是1048576,即1M
这个参数会影响batch的大小,如果单个消息的大小 > batch的最大值(16k),那么batch会相应的增大
Consumer的策略
Push vs Pull
所有的消息队列都要面对一个问题,是broker把消息Push给消费者呢,还是消费者主动去broker Pull消息?
kafka选择了pull的方式,为什么呢? 因为pull的方式更灵活:消息发送频率应该如何,消息是否可以延迟然后batch发送,这些信息只有消费者自己最清楚!
因此把控制权交给消费者,消费者自己控制消费的速率,当消费者处理消息很慢时,它可以选择减缓消费速率;当处理消息很快时,它可以选择加快消费速率。而在push的方式下,要实现这种灵活的控制策略,就需要额外的协议,让消费者告诉broker,要减缓还是加快消费速率,这增加了实现的复杂性。
另外pull的方式下,消费者可以很容易的自适应控制消息是batch的发送,还是最低限度的减少延迟,每来1个就发送1个。
消费的confirm
在消费端,所有消息队列都要解决的一个问题就是“消费确认问题”:消费者拿到一个消息,然后处理这个消息的时候挂了,如果这个时候broker认为这个消息已经消费了,那这条消息就丢失了。
一个解决办法就是,消费者在消费完之后,再往broker发个confirm消息。broker收到confirm消息之后,再把消息删除。
要实现这个,broker就要维护每个消息的状态,已发送/已消费,很显然,这会增大broker的实现难度。同时,这还有另外一个问题,就是消费者消费完消息,发送confirm的时候,挂了。这个时候会出现重复消费的问题。
kafka没有直接解决这个问题,而是引入offset回退机制,变相解决了这个问题。在kafka里面,消息会存放一个星期,才会被删除。并且在一个partion里面,消息是按序号递增的顺序存放的,因此消费者可以回退到某一个历史的offset,进行重新消费。
当然,对于重复消费的问题,需要消费者去解决。
broker的策略
消息的顺序问题
在某些业务场景下,需要消息的顺序不能乱:发送顺序和消费顺序要严格一致。而在kafka中,同一个topic,被分成了多个partition,这多个partition之间是互相独立的。
之所以要分成多个partition,是为了提高并发度,多个partition并行的进行发送/消费,但这却没有办法保证消息的顺序问题。
一个解决办法是,一个topic只用一个partition,但这样很显然限制了灵活性。
还有一个办法就是,所有发送的消息,用同一个key,这样同样的key会落在一个partition里面。
消息的刷盘机制
我们都知道,操作系统本身是有page cache的。即使我们用无缓冲的io,消息也不会立即落到磁盘上,而是在操作系统的page cache里面。操作系统会控制page cache里面的内容,什么时候写回到磁盘。在应用层,对应的就是fsync函数。
我们可以指定每条消息都调用一次fsync存盘,但这会较低性能,也增大了磁盘IO。也可以让操作系统去控制存盘。
消息的不重不漏 – Exactly Once
一个完美的消息队列,应该做到消息的“不重不漏”,这里面包含了4重语义:
消息不会重复存储;
消息不会重复消费;
消息不会丢失存储;
消息不会丢失消费。
先说第1个:重复存储。发送者发送一个消息之后,服务器返回超时了。那请问,这条消息是存储成功了,还是没有呢?
要解决这个问题:发送者需要给每条消息增加一个primary key,同时服务器要记录所有发送过的消息,用于判重。很显然,要实现这个,代价很大
重复消费:上面说过了,要避免这个,消费者需要消息confirm。但同样,会引入其他一些问题,比如消费完了,发送confirm的时候,挂了怎么办? 一个消息一直处于已发送,但没有confirm状态怎么办?
丢失存储:这个已经解决
丢失消费:同丢失存储一样,需要confirm。
总结一下:真正做到不重不漏,exactly once,是很难的。这个需要broker、producer、consumer和业务方的协调配合。
在kafka里面,是保证消息不漏,也就是at least once。至于重复消费问题,需要业务自己去保证,比如业务加判重表。
常遇到的问题
- 日志过大时处理方式
- 因为kafka写消息的书读非常开,但是在消费时候的速度受到业务的影响,常出现消息积压的问题?如何解决?
由于一些博客,在kafka介绍方面已经非常的完善,所以这里只收集一些有内涵的文章来供大家学习参考:
学习链接