zoukankan      html  css  js  c++  java
  • MQ(-)kafka的原理

        目前消息队列有很多成熟的中间件,主要作用无异于异步、解耦、削峰。但是各有特色,我们要做的是在不同应用场景选择使用方便、成本低廉的框架!

    1.  kafka原理

         从17coding技术博客上盗的图,博客上面对原理说的很明白,从下图可以很清晰的看清楚各模块的职责。

       

    kafka均衡算法

        有一个topic ,然后这个topic的partition和他们所在的broker的图如下:

     

    1.A=(partition数量/同分组消费者总个数)
    2.M=对上面所得到的A值小数点第一位向上取整
    3.计算出该消费者拉取数据的patition合集:Ci = [P(M*i ),P((i + 1) * M -1)]

    按照如图所示,那么这里:

    A=6/8=0.75

    M=1

    C0=[P(1*0),P((0+1)*1-1)]=[P0,P0]
    同理:

    C1=[P(1*1),P((1+1)*1-1)]=[P1,P1]
    C2=[P(1*2),P((2+1)*1-1)]=[P2,P2]
    C3=[P(1*3),P((3+1)*1-1)]=[P3,P3]
    C4=[P(1*4),P((4+1)*1-1)]=[P4,P4]
    C5=[P(1*5),P((5+1)*1-1)]=[P5,P5]
    C6=[P(1*6),P((6+1)*1-1)]=[P6,P6]
    C7=[P(1*7),P((7+1)*1-1)]=[P7,P7]

    那么按照上面的算法:
    C0消费者消费P0的数据
    C1消费者消费P1的数据
    C2消费者消费P2的数据
    C3消费者消费P3的数据
    C4消费者消费P4的数据
    C5消费者消费P5的数据

    C6消费者消费P6的数据
    C7消费者消费P7的数据

     

    1.按照如上的算法,所以如果kafka的消费组需要增加组员,最多增加到和partition数量一致,超过的组员只会占用资源,而不起作用;

    2.kafka的partition的个数一定要大于消费组组员的个数,并且partition的个数对于消费组组员取模一定要为0,不然有些消费者会占用资源却不起作用;

    3.如果需要增加消费组的组员个数,那么也需要根据上面的算法,调整partition的个数。二者最好呈整倍关系

    Zookeeper和Kafka的关系

    1.在Kafka的设计中,选择了使用Zookeeper来进行所有Broker的管理,体现在zookeeper上会有一个专门用来进行Broker服务器列表记录的点,节点路径为/brokers/ids

    2.生产者负载均衡

     生产者需要将消息合理的发送到分布式Broker上,这就面临如何进行生产者负载均衡问题。
    对于生产者的负载均衡,Kafka支持传统的4层负载均衡,zookeeper同时也支持zookeeper方式来实现负载均衡。
    (1)传统的4层负载均衡
    根据生产者的IP地址和端口来为其定一个相关联的Broker,通常一个生产者只会对应单个Broker,只需要维护单个TCP链接。这样的方案有很多弊端,因为在系统实际运行过程中,每个生产者生成的消息量,以及每个Broker的消息存储量都不一样,那么会导致不同的Broker接收到的消息量非常不均匀,而且生产者也无法感知Broker的新增与删除。
    (2)使用zookeeper进行负载均衡
    很简单,生产者通过监听zookeeper上Broker节点感知Broker,Topic的状态,变更,来实现动态负载均衡机制,当然这个机制Kafka已经结合zookeeper实现了。

    3.消费者的负载均衡和生产负载均衡类似

    4.记录消息分区于消费者的关系,都是通过创建修改zookeeper上相应的节点实现

    5.记录消息消费进度Offset记录,都是通过创建修改zookeeper上相应的节点实现

    Kafka的IO  持久化 

    Kafka 会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度 Kafka 采用了两个技术, 顺序写入和 MMFile(Memory Mapped File)。 

    顺序写入

       每一个 Partition 其实都是一个文件 ,收到消息后 Kafka 会把数据插入到文件末尾(虚框部分)

    这种方法有一个缺陷——没有办法删除数据 ,所以 Kafka 是不会删除数据的,它会把所有的数据都保留下来,每个

    消费者(Consumer)对每个 Topic 都有一个 Offset 用来表示读取到了第几条数据 。

    为了避免磁盘被撑满的情况,Kakfa 提供了两种策略来删除数据:

    1. 基于时间 (默认七天)
    2. 基于 Partition 文件大小

    Memory Mapped Files

      mmf (Memory Mapped Files)直接利用操作系统的Page来实现文件到物理内存的映射,完成之后对物理内存的操作会直接同步到硬盘。mmf 通过内存映射的方式大大提高了IO速率,省去了用户空间到内核空间的复制。它的缺点显而易见–不可靠,当发生宕机而数据未同步到硬盘时,数据会丢失,Kafka 提供了produce.type参数来控制是否主动的进行刷新,如果 Kafka 写入到 mmf 后立即flush再返回给生产者则为同步模式,反之为异步模式。

    Kafka 提供了一个参数 producer.type 来控制是不是主动 Flush:

    • 如果 Kafka 写入到 mmf 之后就立即 Flush,然后再返回 Producer 叫同步 (Sync)。

    • 如果 Kafka 写入 mmf 之后立即返回 Producer 不调用 Flush 叫异步 (Async)。

    kafka的IO优化

    基于 Sendfile 实现零拷贝(Zero Copy)

    作为一个消息系统,不可避免的便是消息的拷贝,常规的操作,一条消息,需要从创建者的socket到应用,再到操作系统内核,然后才能落盘。同样,一条消息发送给消费者也要从磁盘到内核到应用再到接收者的socket,中间经过了多次不是很有必要的拷贝。

    传统 Read/Write 方式进行网络文件传输,在传输过程中,文件数据实际上是经过了四次 Copy 操作,其具体流程细节如下:

    • 调用 Read 函数,文件数据被 Copy 到内核缓冲区。

    • Read 函数返回,文件数据从内核缓冲区 Copy 到用户缓冲区

    • Write 函数调用,将文件数据从用户缓冲区 Copy 到内核与 Socket 相关的缓冲区。

    • 数据从 Socket 缓冲区 Copy 到相关协议引擎。

    硬盘—>内核 buf—>用户 buf—>Socket 相关缓冲区—>协议引擎

    而 Sendfile 系统调用则提供了一种减少以上多次 Copy,提升文件传输性能的方法。在内核版本 2.1 中,引入了 Sendfile 系统调用,以简化网络上和两个本地文件之间的数据传输。Sendfile 的引入不仅减少了数据复制,还减少了上下文切换。相较传统 Read/Write 方式,2.1 版本内核引进的 Sendfile 已经减少了内核缓冲区到 User 缓冲区,再由 User 缓冲区到 Socket 相关缓冲区的文件 Copy。而在内核版本 2.4 之后,文件描述符结果被改变,Sendfile 实现了更简单的方式,再次减少了一次 Copy 操作。

    Kafka 把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候 Kafka 直接把文件发送给消费者,配合 mmap 作为文件读写方式,直接把它传给 Sendfile。

     批量发送

    Kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到 Kafka 。

    • 等消息条数到固定条数。
    • 一段时间发送一次。

    数据压缩

    Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩。
    压缩的好处就是减少传输的数据量,减轻对网络传输的压力。

    Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个成本很值得。

    注意:批量发送和数据压缩一起使用,单条做数据压缩的话,效果不明显 ❗

    Kafka 如何保证消息队列不丢失

    ACK 机制

    acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。我们可以配置 acks = all ,代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。

    设置分区

    为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本,以确保消息队列的安全性。

    关闭 unclean leader 选举

    多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

    发送消息

    为了得到更好的性能,Kafka 支持在生产者一侧进行本地buffer,也就是累积到一定的条数才发送,如果这里设置不当是会丢消息的。

    生产者端设置: producer.type=async, sync,默认是 sync。

    当设置为 async,会大幅提升性能,因为生产者会在本地缓冲消息,并适时批量发送。

    如果对可靠性要求高,那么这里可以设置为 sync 同步发送。

    一般时候我们还需要设置:min.insync.replicas> 1 ,消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数,与acks配合使用。

    但如果出现两者相等,我们还需要设置 replication.factor = min.insync.replicas + 1 ,避免在一个副本挂掉,整个分区无法工作的情况!

    Consumer 端丢失消息

    consumer端丢失消息的情形比较简单:

    如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。

    由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。

    为了避免数据丢失,现给出几点建议:设置 enable.auto.commit=false

    关闭自动提交位移,在消息被完整处理之后再手动提交位移。

  • 相关阅读:
    字符编码相关
    函数之形参与实参
    文件操作模式
    函数对象,名称空间,作用域,和闭包
    吴裕雄天生自然SPRINGBOOT开发实战处理'spring.datasource.url' is not specified and no embedded datasource could be autoconfigured
    吴裕雄天生自然SPRINGBOOT开发实战处理XXXX that could not be found.
    吴裕雄天生自然SPRINGBOOT开发实战SpringBoot HTML表单登录
    吴裕雄天生自然SPRINGBOOT开发实战SpringBoot REST示例
    吴裕雄天生自然SpringBoot开发实战学习笔记处理 Could not write metadata for '/Servers'.metadata\.plugins\org.eclipse.core.resources\.projects\Servers\.markers.snap (系统找不到指定的路径。)
    吴裕雄天生自然SPRINGBOOT开发实战SpringBoot Tomcat部署
  • 原文地址:https://www.cnblogs.com/heshana/p/13810237.html
Copyright © 2011-2022 走看看