zoukankan      html  css  js  c++  java
  • Kafka Producer相关代码分析【转】

    来源:https://www.zybuluo.com/jewes/note/63925

    @jewes 2015-01-17 20:36 字数 1967 阅读 1093

    Kafka Producer相关代码分析

    kafka


    Kafka Producer将用户的消息发送到Kafka集群(准确讲是发送到Broker)。本文将分析Producer相关的代码实现。

     

    类kafka.producer.Producer

    如果你自己实现Kafka客户端来发送消息的话,你就是用到这个类提供的接口来发送消息。(如果你对如何利用Producer API来发送消息还不是很熟悉的话,可以参看官方的例子)。这个类提供了同步和异步两种方式来发送消息。

    异步发送消息是基于同步发送消息的接口来实现的。异步发送消息的实现很简单,客户端消息发送过来以后,先放入到一个队列中然后就返回了。Producer再开启一个线程(ProducerSendThread)不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给Broker。

    Producer发送同步消息是委托给EventHandler做的,EventHandler是个接口,具体实现为DefaultEventHandler。它们的简化类图如下: 
    Producer类图

    可以看到,Producer类中的成员producerSendThread和queue是为了发送异步消息的,eventHandler是为了发送同步消息的,当然异步消息也需要它。KeyedMessage是封装了用户发送的消息。Seq是Scala中的序列,可以看成是Java中的List。

    KeyedMessage的简化类图如下: 
    KeyMessage类图

     

    类DefaultEventHandler

    DefaultEventHandler是接口EventHandler唯一的实现。从上一节可以看到Producer发送给EventHandler的消息格式为KeyedMessage。我们来看看在将KeyedMessage发送给Broker之前需要做哪些工作。

     

    序列化

    KeyedMessage中的KV是由用户指定的自定义类型,而在发送给broker的时候是以二进制流来发送,因此还需要将用户自定类型的数据转换为二进制流。在初始化Producer的时候需要配置serializer.class,它就是用来处理这个事情的。此外,还要把多条Message组合成MessageSet并按照用户指定的压缩方式进行压缩。

     

    找到对应的broker

    KeyedMessage中指定了该Message的topic,而一个topic可以有多个partition,每个partition有多个replica,由多个Broker来管理,这些Broker中有一个leader。只有leader broker能够响应客户端的读写请求。

    由此可见,在将KeyedMessage发送给broker之前,必须找到该条Message对应的leader broker,具体步骤为: 
    1. 找出该topic的所有partition, 
    2. 找出该KeyedMessage应该发送到的那个partition,在初始化Producer的时候配置partitioner.class就是用来对Message进行分区的 
    3. 找出对应partition所在的leader broker。

    最后,DefaultEventHandler将序列化后的Message封装成ProducerRequest,它自身并没有将ProducerRequest发送给broker的逻辑,而是将其交给SyncProducer来继续后面发送的流程。

     

    ProducerPool, SyncProducer和BlockingChannel

    它们在一起是完成最后的数据发送任务。先来看它们的类图: 
    ProducerPool等类图 
    ProducerPool中有一个HashMap,其key为brokerid,value为连接到这个broker的SyncProducer。因此ProducerPool的更准确名字应该为SyncProducerPool。

    BlockingChannel可以看成是一个Socket客户端,它有两个成员变量分别是机器名和端口号。它的connect方法会打开到对应机器的socket。它的send方法可以发送RequestOrResponse,它是真正发送数据的地方。

    SyncProducer提供了两个send方法,分别用来发送ProducerRequest和TopicMetadataRequest。它内部是调用了blockingChannel来发送数据的。

     

    小结

    Producer发送数据的简化序列图如下:序列图
    从图中可见,各个类的职责明确,BlockingChannel负责最底层的数据发送,SyncProducer负责将Request发送到一个指定的Broker那里,DefaultEventHandler负责数据转换和选择正确的Broker,直接给客户端使用的Producer则在此基础上提供了同步和异步两种发送方式。

  • 相关阅读:
    May LeetCoding Challenge22 之 比较器comparator、map按Value排成逆序、桶排序
    May LeetCoding Challenge21 之 动态规划的min使用
    May LeetCoding Challenge20 之 二叉树中序遍历
    May LeetCoding Challenge19 之 单调栈2.0
    May LeetCoding Challenge18 之 滑动窗口2.0
    May LeetCoding Challenge17 之 滑动窗口
    May LeetCoding Challenge16 之 链表重组
    APT常用命令
    DDCTF-misc-流量分析
    Wireshark学习笔记
  • 原文地址:https://www.cnblogs.com/the-tops/p/5786726.html
Copyright © 2011-2022 走看看