zoukankan      html  css  js  c++  java
  • Kafka 新版Producer Java版代码阅读

    Kafka 新版Producer Java版代码阅读

    Kafka在0.8.2.1出了新版Producer,支持ack(仅Java版,因为通过JavaClient实现的)。因此对代码进行了简单阅读,并记录如下:

    接口如下:

    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) 
    

    封装一个Record之后,每次调用同时传入一个callback。该函数在Kafka返回结果时被调用。

    根据官方example的调用方式:

    public void onCompletion(RecordMetadata metadata, Exception exception) {
    	if (metadata != null) {
        		//means success
        	} else {
        		//means fail
          	exception.printStackTrace();
          	}
        }
    

    ======================

    1. 如果我来做,怎么做?

    我觉得如果我来设计,至少需要考虑如下几个问题:

    • 发送的时候callback是否跟着发到Kafka Server?

    • Kafka支持了batch send,ack的时候是一个一个ack还是batch ack?同样,如果是batch ack,call back怎么调用?

    • 每次callback都会单独使用一个线程调用么?还是共享一个线程?

    • 如果Callback不发送到KafkaServer,在客户端是怎样存储的?进程fail掉的时候是否会丢ack?

    ======================

    2. 带着这几个问题,我们来看人家怎么做的

    ======================

    2.1 基本逻辑

    先从Kafka Producer的send方法看起,

    send的全部代码就是这样,简单来说做了这样几件事

    1. 判断partition

    2. 序列化消息

    3. 判断消息大小是否符合格式

    4. 重点是第四步:

       RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
      

    accumulator 是每个Producer单独持有唯一一个的,每次调用appen之后返会一个包含有(FutureRecordMetadata)的执行result.

    追进去看一下这个append方法,注释是这样说的

    	Add a record to the accumulator, return the append result。The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created.
    

    简单来说这个方法就是把一个message序列化之后加入到accumulator的发送队列里,等会再详细介绍Acculator。

    1. 第五步,调用sender的awake方法

       if (result.batchIsFull || result.newBatchCreated) {
       	this.sender.wakeup();
       }
      

    看到这里感觉啥都没干啊,所以我们需要进一步看一下Acculator以及Sender究竟在做什么。

    ======================

    2.2 Accumulator

    在Producer里通过Accumulator的append方法把消息加入异步发送队列,我们先看看Accumulator的实现。

    private final BufferPool free;
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    private final IncompleteRecordBatches incomplete;
    

    Accumulator里有三个结构必须要说一下,BufferPool用于管理batch发送的缓存,等会细说,batches显然是一个以partition为Key的map,value是一个double-ended-queue,每个queue里的元素是一个RecordBatch,显然是用来做发送缓冲的。最后还有一个Incoplete,用于记录未完成的所有batch。

    Accumulator的append方法代码比较长,简要说一下做了这样几个事情

    1. 根据partition从batches里找到deque,然后peeklast().tryappend(),也就是调用了RecordBatch的tryappend方法

       	Deque<RecordBatch> dq = dequeFor(tp);
           synchronized (dq) {
               RecordBatch last = dq.peekLast();
               if (last != null) {
                   FutureRecordMetadata future = last.tryAppend(key, value, callback);
                   //you.meng   futrue==true means no more room for the new coming message
                   if (future != null)
                       return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
               }
           }
      

      这个tryappend方法比较简单,就是看看recordbatch里面地方够不够,不够就返回null,够就加上,Recordbatch里用一个List 来存储每个msg的callback。但整个BatchRecord封装成一个future返回。代码如下:

       public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
       	if (!this.records.hasRoomFor(key, value)) {
           	return null;
       	} else {
           	this.records.append(0L, key, value);
           	this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
           	FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
           	if (callback != null)
               	thunks.add(new Thunk(callback, future));
           	this.recordCount++;
           	return future;
       	}
       }
      

    由此可知Kafka对Batch消息的确认是一次批量确认,但callback应该是一次批量确认之后一个一个发送的。

    1. 第一步如果成功吧msg加入batch显然后面啥都不用做了,如果返回是null,则需要从新来一个RecordBatch。然后先申请空间

       ByteBuffer buffer = free.allocate(size);
      

      注意,这个申请空间是有可能block的(当然也要看用户设置),所以在申请空间之后,可能已经过了很久,物是人非了,所以代码很小心的从新调用了一遍batches.get(partition).peeklast.tryappend。

      //哈哈 自从用了scala 妈妈再也不担心我读不懂长代码了。

      如果这个时候tryappend发现有地方了,这时候释放空间,加进去拉倒。

       free.deallocate(buffer);
      

      当然也可能依然坑爹的tryappend返回null,即表示notEnoughRoom for new msg,那么进入第三步

    2. 最后只有两种情况没有讨论了,要不就是partition下面 d-e-queue是空的,要不就是现有的空间都不够了。所以第二部申请的空间(buffer)必须用了啊,然后我们新来做一个RecordMessage吧。

       MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
       RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
       FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
       dq.addLast(batch);
       incomplete.add(batch);
      

    我们用新申请的空间buffer生成了新的MemoryRecord,然后做出来batch,加入d-e-queue,加入未完成队列。

    看到这里已经相对清晰了,我们捋一捋几个悬而未解的问题。

    • free.allocate(size) 还有free.deallocate(buffer)是咋做的?
    • MemoryRecords怎么使用的buffer?
    • 那个坑爹的Sender怎么awake的?
    • callback什么时候被调用的?

    根据这几个问题,我们逐一分析一下:

    ======================

    2.3 Buffered pool

    ======================

    2.4 MemoryRecords && Compressor && RecordBatch
    2.4.1 MemoryRecords

    在accumulator中我们看到,MemoryRecord 对 bytebuffer进行了封装,而后RecordBatch对MemoeryRecord进行封装

    	MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
    	RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
    

    先看Memory Recored,Memory Record继承自Record接口,其定义4byte的size,8byte的offest所以每个Record size<2^32 位 文件小于2^64 位。

    /**
    * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
    * for the in-memory representation.
    */
    public interface Records extends Iterable<LogEntry> {
    
    int SIZE_LENGTH = 4;
    int OFFSET_LENGTH = 8;
    int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
    

    MemoryRecord中还持有Compressor以及buffer,主要被调用append方法将buffer中的数据写入compressor

    /**
     * Append the given record and offset to the buffer
     */
    public void append(long offset, Record record) {
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");
    
        int size = record.size();
        compressor.putLong(offset);
        compressor.putInt(size);
        compressor.put(record.buffer());
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        record.buffer().rewind();
    }
    
    2.4.2 RecordBatch

    先跳过Compressor,我们看一下MemoryRecord继续被向上封装成了RecordBatch。因为MemoryRecord只有对IO的操作,并没有对Kafka逻辑的支持,因此RecordBatch在其基础之上封装了一些计数参数之外还增加了几个变量:

    public final MemoryRecords records;
    public final TopicPartition topicPartition;
    public final ProduceRequestResult produceFuture;
    private final List<Thunk> thunks;
    

    即对一次producer的batch提交过程进行了封装,包括发送的topicPartition,提交batch返回的produceFutrue以及存储这个batch里所有msg对应callback的thunks。

    RecordBatch的tryAppend方法已经在2.2节介绍,除此之外,RecordBatch还有一个Done方法,看名字也知道用于对batch的返回结果进行确认:如果没有exception就直接调用thunks list里所有的callback,异常就按异常格式调用。

    看了一下done的调用序列,序列的根都在Sender中,我们等会再来看sender。

    2.4.3 Compressor

    //TBD

    ======================

    2.5 callback && Future && ProduceRequestResult && FutureRecordMetadata

    Futrue FutureTask Callable 的概念就不再复述了,请自行查阅。

    2.5.1 ProduceRequestResult (short for PRResult)

    ProduceRequestResult是象征意义上的返回结果,但事实上该Result是在Client端生成的,其构造函数只有一种空构造函数,参数只有这几个。

    	private final CountDownLatch latch = new CountDownLatch(1);
    	private volatile TopicPartition topicPartition;
    	private volatile long baseOffset = -1L;
    	private volatile RuntimeException error;
    	
    	public ProduceRequestResult() {
    	}
    

    看到

    latch = new CountDownLatch(1);
    

    就很容易明白这个PRResult的作用了,因为支持message.get()的阻塞等待,因此需要对产生的结果进行阻塞控制,只有当Server端回复结果之后才能让message.get()方法进入结束阻塞。而这个过程的实现,就是使用PRResult的latch实现的。

    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
        this.topicPartition = topicPartition;
        this.baseOffset = baseOffset;
        this.error = error;
        this.latch.countDown();
    }
    

    这是PRResult的Done方法,可以看到,该Result的latch在初始化就自动生成,直到Done方法被调用才能解除阻塞,其他任何wait在latch上的方法都将被阻塞。

    我们看看有哪些方法调用了latch.await吧: 追到根 发现使用了该方法的包括:

    	FutureRecordMetadata.get 
    	KakfaProducer.flush
    

    第一个是官方定义的清清楚楚的接口,第二个是flush,简单易懂,不再介绍。

    2.5.2 FutureRecordMetadata &&RecordMetadata

    如果说PRResult只是具有了一个阻塞功能的结果存储器,那么FutureRecordMetaData就是在其基础上有封装了执行过程。

    	public final class FutureRecordMetadata implements Future<RecordMetadata> {
    	private final ProduceRequestResult result;
    private final long relativeOffset;
    

    FRMetaData持有result的同时,继承了Future,所以调用FRMetaData的get方法时,通过实现其封装的PRResult.await()进行阻塞。直到PRResult被Done,latch.countDown()被调用为止。

    对多线程以及阻塞感兴趣更多的可以参考FutrueTask。

    @Override
    public RecordMetadata get() throws InterruptedException, ExecutionException {
        this.result.await();
        return valueOrError();
    }
    
    RecordMetadata value() {
        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
    }
    

    ======================

    2.6 前部分小结

    简要总结一下

    1. Accumulator:消息的总控端,负责对发送,接收进行实际控制,但并非线程类
    2. 消息的封装,Batch封装(RecordBatch/MemoryRecords/Comporessor)
    3. 消息对内存的使用(Buffer Pool)
    4. 调用结果返回(FutureRecordMetadata/ProduceRequestResult/Callback)

    基本涉及了发送过程的全部静态实现,唯独缺少了多线程控制。当然,在这上面的实现中,也多次涉及到了sender.run(time) 以及sender.wakeup()等方法。

    所以在最后,我们来看看Sender的实现。

    ======================

    2.7 Sender

    Sender 主要持有如下对象

    public class Sender implements Runnable {
    
    
        /* the state of each nodes connection */
    private final KafkaClient client;
    
    /* the record accumulator that batches records */
    private final RecordAccumulator accumulator;
    
    /* the metadata for the client */
    private final Metadata metadata;
    
    /* the maximum request size to attempt to send to the server */
    private final int maxRequestSize;
    
    /* the number of acknowledgements to request from the server */
    private final short acks;
    
    /* the max time in ms for the server to wait for acknowlegements */
    private final int requestTimeout;
    
    /* the number of times to retry a failed request before giving up */
    private final int retries;
    
    /* the clock instance used for getting the time */
    private final Time time;
    
    /* true while the sender thread is still running */
    private volatile boolean running;
    
    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    private volatile boolean forceClose;
    
    /* metrics */
    private final SenderMetrics sensors;
    
    /* param clientId of the client */
    private String clientId;
    

    看半天鸡毛用没有,对吧。值得一提的是,sender持有的accumulator和KafkaProducer持有的accumulator是同一个。而且Sender是继承自线程的,其唯一的一次初始化是在new KafkaProducer的时候,且被KafkaProducer持有,被KafkaProducer的iothread装了起来。

    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    this.ioThread.start();
    

    首先找一下sender怎么被使用的吧,ioThread只有在KProducer.close调用了一下。而sender在KProducer的send,flush等方法里多次被调用wakeup()方法.

    下面我们仔细看一下他的run.() , run.(time)以及wakeup()方法吧。

    //是不是俺的写法带点scala风格拉?

    run方法内部调用了run(time)方法,二者不分家。先看run

    // main loop, runs until close is called
    while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
    

    带停止标志位的循环执行,后面就剩下关闭相关操作了。

    	// okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }
    

    关闭过程其实很值得看看,没啥可说的。看看run(time)和wakeup()吧。

    	/**
     * Wake up the selector associated with this send thread
     */
    public void wakeup() {
        this.client.wakeup();
    }
    

    然后就没了,client是NetworkClient,封装了网络的NIOSelector的wakeup,我觉得这个问题还是开一篇单讲了。读者就就按照注释的意思理解吧。

    ======================

    2.7 KafkaProducer

    经历了这一切,我们从新回到KafkaProducer 来。似乎只剩下了一个close方法。
    没啥好说的,就到这里吧。

  • 相关阅读:
    WPF进程之间通讯
    win7切换到classic主题后,控件问题
    How to host win32 in wpf?
    WPF 个人经验总结:需要注意的地方
    ListView 的三种数据绑定方式
    用DebugVIew 跟踪调试WPF
    屏幕变小后,wpf窗口被截掉的问题。
    WPF中DPI 的问题
    css中元素居中总结
    arcmap vba 根据DEM高程值生成Shp高程字段
  • 原文地址:https://www.cnblogs.com/mengyou0304/p/4759747.html
Copyright © 2011-2022 走看看