zoukankan      html  css  js  c++  java
  • MQ(-)kafka的原理

        目前消息队列有很多成熟的中间件,主要作用无异于异步、解耦、削峰。但是各有特色,我们要做的是在不同应用场景选择使用方便、成本低廉的框架!

    1.  kafka原理

         从17coding技术博客上盗的图,博客上面对原理说的很明白,从下图可以很清晰的看清楚各模块的职责。

       

    kafka均衡算法

        有一个topic ,然后这个topic的partition和他们所在的broker的图如下:

     

    1.A=(partition数量/同分组消费者总个数)
    2.M=对上面所得到的A值小数点第一位向上取整
    3.计算出该消费者拉取数据的patition合集:Ci = [P(M*i ),P((i + 1) * M -1)]

    按照如图所示,那么这里:

    A=6/8=0.75

    M=1

    C0=[P(1*0),P((0+1)*1-1)]=[P0,P0]
    同理:

    C1=[P(1*1),P((1+1)*1-1)]=[P1,P1]
    C2=[P(1*2),P((2+1)*1-1)]=[P2,P2]
    C3=[P(1*3),P((3+1)*1-1)]=[P3,P3]
    C4=[P(1*4),P((4+1)*1-1)]=[P4,P4]
    C5=[P(1*5),P((5+1)*1-1)]=[P5,P5]
    C6=[P(1*6),P((6+1)*1-1)]=[P6,P6]
    C7=[P(1*7),P((7+1)*1-1)]=[P7,P7]

    那么按照上面的算法:
    C0消费者消费P0的数据
    C1消费者消费P1的数据
    C2消费者消费P2的数据
    C3消费者消费P3的数据
    C4消费者消费P4的数据
    C5消费者消费P5的数据

    C6消费者消费P6的数据
    C7消费者消费P7的数据

     

    1.按照如上的算法,所以如果kafka的消费组需要增加组员,最多增加到和partition数量一致,超过的组员只会占用资源,而不起作用;

    2.kafka的partition的个数一定要大于消费组组员的个数,并且partition的个数对于消费组组员取模一定要为0,不然有些消费者会占用资源却不起作用;

    3.如果需要增加消费组的组员个数,那么也需要根据上面的算法,调整partition的个数。二者最好呈整倍关系

    Zookeeper和Kafka的关系

    1.在Kafka的设计中,选择了使用Zookeeper来进行所有Broker的管理,体现在zookeeper上会有一个专门用来进行Broker服务器列表记录的点,节点路径为/brokers/ids

    2.生产者负载均衡

     生产者需要将消息合理的发送到分布式Broker上,这就面临如何进行生产者负载均衡问题。
    对于生产者的负载均衡,Kafka支持传统的4层负载均衡,zookeeper同时也支持zookeeper方式来实现负载均衡。
    (1)传统的4层负载均衡
    根据生产者的IP地址和端口来为其定一个相关联的Broker,通常一个生产者只会对应单个Broker,只需要维护单个TCP链接。这样的方案有很多弊端,因为在系统实际运行过程中,每个生产者生成的消息量,以及每个Broker的消息存储量都不一样,那么会导致不同的Broker接收到的消息量非常不均匀,而且生产者也无法感知Broker的新增与删除。
    (2)使用zookeeper进行负载均衡
    很简单,生产者通过监听zookeeper上Broker节点感知Broker,Topic的状态,变更,来实现动态负载均衡机制,当然这个机制Kafka已经结合zookeeper实现了。

    3.消费者的负载均衡和生产负载均衡类似

    4.记录消息分区于消费者的关系,都是通过创建修改zookeeper上相应的节点实现

    5.记录消息消费进度Offset记录,都是通过创建修改zookeeper上相应的节点实现

    Kafka的IO  持久化 

    Kafka 会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度 Kafka 采用了两个技术, 顺序写入和 MMFile(Memory Mapped File)。 

    顺序写入

       每一个 Partition 其实都是一个文件 ,收到消息后 Kafka 会把数据插入到文件末尾(虚框部分)

    这种方法有一个缺陷——没有办法删除数据 ,所以 Kafka 是不会删除数据的,它会把所有的数据都保留下来,每个

    消费者(Consumer)对每个 Topic 都有一个 Offset 用来表示读取到了第几条数据 。

    为了避免磁盘被撑满的情况,Kakfa 提供了两种策略来删除数据:

    1. 基于时间 (默认七天)
    2. 基于 Partition 文件大小

    Memory Mapped Files

      mmf (Memory Mapped Files)直接利用操作系统的Page来实现文件到物理内存的映射,完成之后对物理内存的操作会直接同步到硬盘。mmf 通过内存映射的方式大大提高了IO速率,省去了用户空间到内核空间的复制。它的缺点显而易见–不可靠,当发生宕机而数据未同步到硬盘时,数据会丢失,Kafka 提供了produce.type参数来控制是否主动的进行刷新,如果 Kafka 写入到 mmf 后立即flush再返回给生产者则为同步模式,反之为异步模式。

    Kafka 提供了一个参数 producer.type 来控制是不是主动 Flush:

    • 如果 Kafka 写入到 mmf 之后就立即 Flush,然后再返回 Producer 叫同步 (Sync)。

    • 如果 Kafka 写入 mmf 之后立即返回 Producer 不调用 Flush 叫异步 (Async)。

    kafka的IO优化

    基于 Sendfile 实现零拷贝(Zero Copy)

    作为一个消息系统,不可避免的便是消息的拷贝,常规的操作,一条消息,需要从创建者的socket到应用,再到操作系统内核,然后才能落盘。同样,一条消息发送给消费者也要从磁盘到内核到应用再到接收者的socket,中间经过了多次不是很有必要的拷贝。

    传统 Read/Write 方式进行网络文件传输,在传输过程中,文件数据实际上是经过了四次 Copy 操作,其具体流程细节如下:

    • 调用 Read 函数,文件数据被 Copy 到内核缓冲区。

    • Read 函数返回,文件数据从内核缓冲区 Copy 到用户缓冲区

    • Write 函数调用,将文件数据从用户缓冲区 Copy 到内核与 Socket 相关的缓冲区。

    • 数据从 Socket 缓冲区 Copy 到相关协议引擎。

    硬盘—>内核 buf—>用户 buf—>Socket 相关缓冲区—>协议引擎

    而 Sendfile 系统调用则提供了一种减少以上多次 Copy,提升文件传输性能的方法。在内核版本 2.1 中,引入了 Sendfile 系统调用,以简化网络上和两个本地文件之间的数据传输。Sendfile 的引入不仅减少了数据复制,还减少了上下文切换。相较传统 Read/Write 方式,2.1 版本内核引进的 Sendfile 已经减少了内核缓冲区到 User 缓冲区,再由 User 缓冲区到 Socket 相关缓冲区的文件 Copy。而在内核版本 2.4 之后,文件描述符结果被改变,Sendfile 实现了更简单的方式,再次减少了一次 Copy 操作。

    Kafka 把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候 Kafka 直接把文件发送给消费者,配合 mmap 作为文件读写方式,直接把它传给 Sendfile。

     批量发送

    Kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到 Kafka 。

    • 等消息条数到固定条数。
    • 一段时间发送一次。

    数据压缩

    Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩。
    压缩的好处就是减少传输的数据量,减轻对网络传输的压力。

    Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个成本很值得。

    注意:批量发送和数据压缩一起使用,单条做数据压缩的话,效果不明显 ❗

    Kafka 如何保证消息队列不丢失

    ACK 机制

    acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。我们可以配置 acks = all ,代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。

    设置分区

    为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本,以确保消息队列的安全性。

    关闭 unclean leader 选举

    多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

    发送消息

    为了得到更好的性能,Kafka 支持在生产者一侧进行本地buffer,也就是累积到一定的条数才发送,如果这里设置不当是会丢消息的。

    生产者端设置: producer.type=async, sync,默认是 sync。

    当设置为 async,会大幅提升性能,因为生产者会在本地缓冲消息,并适时批量发送。

    如果对可靠性要求高,那么这里可以设置为 sync 同步发送。

    一般时候我们还需要设置:min.insync.replicas> 1 ,消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数,与acks配合使用。

    但如果出现两者相等,我们还需要设置 replication.factor = min.insync.replicas + 1 ,避免在一个副本挂掉,整个分区无法工作的情况!

    Consumer 端丢失消息

    consumer端丢失消息的情形比较简单:

    如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。

    由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。

    为了避免数据丢失,现给出几点建议:设置 enable.auto.commit=false

    关闭自动提交位移,在消息被完整处理之后再手动提交位移。

  • 相关阅读:
    [tensorflow] tf.gather使用方法
    Tensorflow Dataset.from_generator使用示例
    np.random.rand()函数
    python类
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    KNN算法
    Qt编写数据可视化大屏界面电子看板11-自定义控件
    Qt编写数据可视化大屏界面电子看板10-改造QCustomPlot
    Qt编写数据可视化大屏界面电子看板9-曲线效果
    闲谈Monaco Editor-基本使用
  • 原文地址:https://www.cnblogs.com/heshana/p/13810237.html
Copyright © 2011-2022 走看看