zoukankan      html  css  js  c++  java
  • Kafka-处理请求(生产请求、获取请求)

    Kafka-处理请求

    broker的大部分工作室处理客户端、分区副本和控制器发送给分区首领的请求。kafka提供了一个二进制协议(基于TCP),指定了请求消息的格式以及broker如何对请求作出响应--包括成功处理请求或在处理请求过程中遇到错误。客户端发起连接并发送请求,broker处理请求并作出响应。broker按照请求到达的顺序来处理它们--这种顺序保证让kafka具有了消息队列的特性,同时保证保存的消息也是有序的。

    所有的请求消息都包含一个标准消息头:

    Request type:也就是API key

    Request version:(broker可以处理不同版本的客户端请求,并根据客户端版本作出不同的响应)

    Correlation ID:一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里(用于诊断问题)

    broker会在它所监听的每一个端口上运行一个Acceptor线程,这个线程会创建一个连接,并把它交给processor线程去处理。processor线程(也被叫做网络线程)的数量是可以配置的。网络线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。

    请求消息被放到请求队列后,IO线程会负责处理它们。

    生产请求:生产者发送的请求,它包含客户端要写入broker的消息。

    获取请求:在消费者和跟随者副本需要从broker读取消息时发送的请求。

    kafka处理请求的内部流程:

     

    生产请求和获取请求都必须发送给分区的首领副本。如果broker收到一个针对特定分区的请求,而该分区的首领在另一个broker上,那么发送请求的客户端会收到一个非分区首领的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的broker上,也会出现同样的错误。kafka客户端要自己负责把生产请求和获取请求发送到正确的broker上。

    客户端使用了另一种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主体列表。服务器端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任意一个broker,因为所有broker都缓存了这些信息。

    一般情况下,客户端会把这些信息缓存起来,并直接往目标broker上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过metadata.max.age.ms参数来配置),从而知道元数据是否发生了变化--比如,在新broker加入集群时,部分副本会被移动到新的broker上。另外,如果客户端收到非首领错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客户端正在使用过期的元数据信息,之前的请求被发到了错误的broker上。

    1.生产请求

    acks配置参数:指定了需要多少个broker确认才可以认为一个消息写入是成功的。不同的配置对写入成功的界定是不一样的,如果acks=1,那么只要首领收到消息就认为写入成功;如果acks=all,那么需要所有同步副本收到消息才算写入成功;如果acks=0,那么生产者在把消息发出去之后,完全不需要等待broker的响应。

    包含首领副本的broker在收到生产请求时,会对请求做一些验证。

    a.发送数据的用户是否有主题写入权限。

    b.请求里包含的acks值是否有效(只允许出现01all

    c.如果acks=all,是否有足够多的同步副本保证消息已经被安全写入(我们可以对broker进行配置,如果同步副本的数量不足,broker可以拒绝处理新消息)

    之后,消息被写入本地磁盘。在Linux系统上,消息会被写到文件系统缓存里,并不保证它们何时会被刷新到磁盘上。kafka不会一直等待数据被写到磁盘上--它依赖复制功能来保证消息的持久性。

    在消息被写入分区的首领之后,broker开始检查acks配置参数--如果acks被设为01,那么broker立即返回响应;如果acks被设为all,那么请求会被保存在一个叫做炼狱的缓冲区里,直到首领发现所有跟随者副本都复制了消息,响应才会被返回给客户端。

    2.获取请求

    Broker处理获取请求的方式与处理生产请求的方式很相似。客户端发送请求,向broker请求主题分区里具有特定偏移量的消息。客户端还可以指定broker最多可以从一个分区里返回多少数据。这个限制是非常重要的,因为客户端需要为broker返回的数据分配足够的内存。如果没有这个限制,broker返回的大量数据有可能耗尽客户端的内存。

    请求需要先到达指定的分区首领上,然后客户端通过查询元数据来确保请求的路由是正确的。首领在收到请求时,它会先检查请求是否有效:比如,指定的偏移量在分区上是否存在,如果客户端请求的是已经被删除的数据,或者请求的偏移量不存在,那么broker将返回一个错误。

    如果请求的偏移量存在,broker将按照客户端指定的数量上限从分区里读取消息,再把消息返回给客户端。kafka使用零复制技术向客户端发送消息:也就是说,kafka直接把消息从文件(更确切地说是Linux文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这是kafka与其它大部分数据库系统不一样的地方,其它数据库在将数据发送给客户端之前会先把它们保存在本地文件缓存里。这项技术避免了字节复制,也不需要管理内存缓冲区,从而获得更好的性能。

    客户端除了可以设置broker返回数据的上限,也可以设置下限。例如,如果把下限设置为10KB,就是在告诉broker,等到有10KB数据的时候再发送给客户端。在主题消息流量不是很大的情况下,这样可以减少CPU和网络开销。客户端发送一个请求,broker等到有足够的数据时才把它们返回给客户端,然后客户端再发出请求,而不是让客户端每隔几毫秒就发送一次请求,每次只能得到很少的数据甚至没有数据。如下图所示,来回传送次数更少,因此开销更小。

     

    客户端一直等待broker积累数据也是不现实的,在等待了一段时间之后,就可以把可用的数据拿回处理,而不是一直等下去。所以客户端可以定义一个超时时间,告诉broker,如果无法在X毫秒内积累满足要求的数据量,那么就把当前这些数据返回。

    并不是所有保存在分区首领上的数据都可以被客户端读取。大部分客户端只能读取已经被写入所有同步副本的消息(跟随者副本也不行,尽管它们也是消费者--否则复制功能就无法工作)。分区首领知道每个消息会被复制到哪个副本上,在消息还没有被写入所有同步副本之前,是不会发送给消费者的--尝试获取这些消息的请求会得到空的响应而不是错误。

    因为还没有被足够多副本复制的消息被认为是不安全的。如果首领发生崩溃,另一个副本称为新首领,那么这些消息就丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。所以我们会等到所有同步副本复制了这些消息,才允许消费者读取它们。这也意味着,如果broker间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数replica.lag.time.max.ms来设置,它指定了副本在复制消息时可被允许的最大延迟时间。

  • 相关阅读:
    linux下通过命令行重启服务,查看id,更改tv密码
    windows渗透相关、hideadmin工具隐藏用户账号、添加隐藏用户
    windows服务隐藏 以及进程隐藏
    nat32 winh命令远程执行难点
    anydesk命令行使用
    html页面,能用鼠标滚轮滑动,但是不能触屏滑动
    Java 全局统一异常捕获
    git 常用操作
    vue v-for强制刷新
    flutter-TextField垂直居中
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/12740991.html
Copyright © 2011-2022 走看看