zoukankan      html  css  js  c++  java
  • kafka学习笔记03-消息生产者producer

    kafka学习笔记03-消息生产者producer

    发送消息整体流程示意图

    消息发送的流程示意图:

    kafka发送流程图

    (From:High-level overview of Kafka producer components – Kafka the Definitive Guide Book , 中文书名:kafka权威指南)

    一条消息写入 kafka,构造这条消息结构名称叫 ProduceRecord,ProduceRecord 的结构如上图。

    大致流程说明:

    1. 先找到 kafka 集群的 bootstrap server,一般推荐一个kafka集群最少设置 2 个bootstrap server
    2. 找到一些发送数据需要的元信息,如 topics,partitions,replica-factor 等等信息
    3. 然后找到 broker 中的 leader topic ,把消息写入到 topic 中的 partiton 里

    当然,这当中还有消息的序列化(serializer),分区器(partitioner)对数据的分区分配等步骤。

    数据分区分配机制-数据负载均衡

    生产者的数据主要发送到 topic 的分区(partition)里,一个 topic 可以有多个分区,同一个topic下的不同分区存储的消息不同,那怎么确定消息发送到哪一个分区partition?

    这就需要一种算法来确定数据发送到哪个分区(partition)。

    也就是将数据进行均匀分布,分配算法分配数据时不要导致某一个partition数据分配太多,而某一个分区数据又太少。

    怎么做到生产数据的负载均衡,kafka 里的 partitioner (分区器)来负责客户端生产层面的负载均衡。

    1. 如果提供 key 值
      partitioner 会根据 key 的哈希值(采用Murmur2Hash算法)对 partition 数量取模,根据该值决定把消息发送到哪个 partition 上,(hash(key) mod numpartitons)。

    2. 如果没有提供 key 值
      key 为 空(null,无值)时,kafka 2.4 之前有一种策略,轮询算法, 2. 4 之后,又提供了因为一种算法 黏性分区策略
      key 为 null 时,第一次调用时会随机生成一个整数,后面每次在这个整数上自增,然后这个值 对 partition 数量取模,这个就是轮询算法 - roundrobin。
      kafka 2.4 之前默认的策略就是这个轮询主题的所有分区,将消息以轮询的方式发送到每一个分区上。
      kafka 2.4 之后,社区又引进了 Sticky Partitioning Strategy(黏性分区策略),该策略能显著降低指定分区过程中的延时。具体信息看这里 KIP-480: Sticky Partitioner

    3. 如果提供了 partition:
      如果你指定了 partition分区,那么就用指定的这个分区,不用 hash(key) 的分区算法。

    每个消息在被添加到分区partition时,会分配一个 offset ,叫偏移量,它是消息在分区中的唯一编号,也可以理解为数据库中某一张表的唯一id,kafka 通过 offset 保证了消息在某一分区的顺序,offset 不跨区,它只在一个分区内消息是有序。

    比如有一个 topic 命名为:topic:student,配置了 3 个分区,分区为:p0,p1,p2,如下图:

    image-20210724184113969

    old :表示存储的旧数据,也就是 offset 值小的数据。
    new: 表示存储的新数据,新写入的数据,offset 值大的数据。
    consumer: 消费者,可以消费 partition 分区的数据。这个可以看作一个消费组只有一个consumer的情况。
    consumer group: 消费组,它可以保证每个分区只被组内的一个consumer(消费者)消费。

    生产者的一些参数配置

    生产端参数配置:http://kafka.apache.org/documentation/#producerconfigs

    key.serializer
    key 的序列化

    value.serializer
    值的序列化

    acks
    acks 指的是有多少个副本接收到数据后发送 ack 成功,生产者才会认为数据接收成功。

    1. acks = 0,只管发不等服务端确认消息,不负责对发送的消息进行确认是否接收成功。也就是说发送过程中出了问题,导致服务器没有收到消息,生产者无从得知,消息也就丢失了。并且 retries 配置也不会起作用,每次 offset 返回的值都是 -1。
    2. acks = 1,只要集群的 Leader 接收消息并返回一条 ack 确认消息,就表明成功发送
    3. acks = all,这个不光要集群的 Leader 接收消息后发送 ack 确认消息,followers 也要发送确认消息,所有的 ack 发送成功,才表明数据发送成功。所以它的延迟比其他2项高。
      它还可以设置为 -1, 即是 acks = -1。

    buffer.memory:
    设置生产者的内存缓冲区,缓冲要发送给服务器的消息。

    compression.type
    对消息启用的压缩算法。默认情况下消息不会被压缩。
    该参数可以设置为 snappy、gzip 和 lz4

    retries
    发送消息遇到错误,重试的次数。

    batch.size
    该参数指定了一个批次可以使用的内存大小,按照字节数计算
    当有多个消息发到同一个分区时,生产者会把它放到同一个批次里。

    bootstrap.servers
    连接到 kafka cluster 列表,也就是 broker 列表。默认为空。
    设置:host1:port1,host2:port2,...

    linger.ms
    在批量发送前的等待时间

    client.id
    可以是任意字符串,服务器用来识别消息来源。可以设置为空,client.id=""

    更多配置参数参考这里:http://kafka.apache.org/documentation/#producerconfigs

    == just do it ==
  • 相关阅读:
    用Azure VM + Azure Database for MySQL搭建Web服务
    Azure镜像的跨区域复制—Shared Image Gallery(共享映像库)初探
    Azure上几种常见的VM复制操作
    Exchange 2016与国内版O365混合部署(6):混合后的操作和验证
    Exchange 2016与国内版O365混合部署(5):运行AAD Connect及混合部署向导
    Exchange 2016与国内版O365混合部署(4):配置Exchange 公网证书
    Exchange 2016与国内版O365混合部署(3):安装Exchange2016并配置邮件的外网收发
    Exchange 2016与国内版O365混合部署(2):搭建域环境
    Exchange 2016与国内版O365混合部署(1):过程总览
    Cross-Tanant 步骤
  • 原文地址:https://www.cnblogs.com/jiujuan/p/15055979.html
Copyright © 2011-2022 走看看