zoukankan      html  css  js  c++  java
  • Kafka Producer源码简述

      接着上文kafka的简述,这一章我们一探kafka生产者是如何发送消息到消息服务器的。

     代码的入口还是从

    kafkaTemplate.send开始

     

    最终我们就会到

    org.springframework.kafka.core.KafkaTemplate#doSend方法

    这里的关键就是

    org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback)

    我们再一路点击下去,一直到

     org.apache.kafka.clients.producer.KafkaProducer#doSend方法

     

    这里将步骤分为五步

    1.更新Metadata,Metadata用于存储部分topic数据
    2.将发送内容序列化
    3.如果我们有多个分区的话,在这里会根据算法选择相应的分区
    4.向accumulator写入数据,accumulator是一种ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;结构,在这里对发送数据做零时缓存 
    5.缓存的够多了,唤醒线程发送数据。

    所以看到这里我们就明白了,kafka不是直接将数据发送到服务器。而是缓存到内存中,知道大于batchsize才去做发送

    接下来我们看下sender线程做了什么

    直接来到

    org.apache.kafka.clients.producer.internals.Sender#run(long)

    1.连接的获取,

    org.apache.kafka.clients.NetworkClient#initiateConnect

    具体的connect代码如下

    首先与kafka serve端建立了一个non blocking 的SocketChannel,然后将该channel注册到一个java.nio.channels.Selector上面,并注册OP_CONNECT事件。

    接下来,我们再看下消息的发送

    首先调用

    client.send(request, now);

    这个方法最终会调用

    org.apache.kafka.common.network.KafkaChannel#setSend

    为每个request注册

    OP_WRITE事件

     同时把send传递进来

    接下来调用 

    this.client.poll(pollTimeout, now);

    这个的调用链是

    org.apache.kafka.common.network.Selector#poll----> org.apache.kafka.common.network.Selector#pollSelectionKeys--->

    这里的

    key.isWritable()

    就是我们上文注册写事件,当所有的都准备好了,我们调用channel将消息发送到服务端

    到这里我们就知道了kafka发送消息的大致流程。本文并没有对细节深入,只想对kafka做出快速的了解。




  • 相关阅读:
    字典转模型
    iOS开发之---传值大全
    UITableViewCell重用机制
    通知/代理/block 三者比对
    内存的那些事
    C++
    C#接口实现案例
    4.2 C#-----------------------------操作符的重载------------------------------------------
    C#抽象类和抽象方法的实现
    C#----析构函数
  • 原文地址:https://www.cnblogs.com/xmzJava/p/9536351.html
Copyright © 2011-2022 走看看