zoukankan      html  css  js  c++  java
  • kafka 0.8.1 新producer 源码简单分析

     

    1 背景

    最近由于项目需要,需要使用kafka的producer。但是对于c++,kafka官方并没有很好的支持。

    在kafka官网上可以找到0.8.x的客户端。可以使用的客户端有C版本客户端,此客户端虽然目前看来还较为活跃,但是代码问题还是较多的,而且对于c++的支持并不是很好。

    还有c++版本,虽然该客户端是按照c++的思路设计,但是最近更新时间为2013年12月19日,已经很久没有更新了。

    从官方了解到,kafka作者对于现有的producer和consumer的设计是不太满意的。他们打算在kafka 0.9版本里发布新的producer与consumer。

    其中新的producer已经被包含到了kafka0.8.1的源码里,官方描述如下。

    3.4 New Producer Configs

    We are working on a replacement for our existing producer. The code is available in trunk now and can be considered beta quality. Below is the configuration for the new producer

    现在新producer还是属于beta版。但是在kafka0.9版本里,新producer与consumer都会成为稳定版,并提供了更多的功能。旧版的producer是由scala实现,为java提供调用api。而新版的producer直接是用java实现的。

    具体文档在这https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

    2 producer基本类的介绍

    源码树如下

    image

    其中,org.apache.kafka.clients.tools包下的ProducerPerformance.java里包含了producer的最基本用法。

    该程序原本是有三个参数的,直接给三个参数硬编码赋值后,代码如下:

    public static void main(String[] args) throws Exception {
            String url = "10.134.58.155:9092";
            int numRecords = 100;
            int recordSize = 100;
            Properties props = new Properties();
            props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
            props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
            props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
            props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
    
            KafkaProducer producer = new KafkaProducer(props);
            Callback callback = new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null)
                        e.printStackTrace();
                }
            };
            byte[] payload = new byte[recordSize];
            Arrays.fill(payload, (byte) 1);
            ProducerRecord record = new ProducerRecord("test6", payload);
            long start = System.currentTimeMillis();
            long maxLatency = -1L;
            long totalLatency = 0;
            int reportingInterval = 1;
            for (int i = 0; i < numRecords; i++) {
                long sendStart = System.currentTimeMillis();
                producer.send(record, callback);
                long sendEllapsed = System.currentTimeMillis() - sendStart;
                maxLatency = Math.max(maxLatency, sendEllapsed);
                totalLatency += sendEllapsed;
                if (i % reportingInterval == 0) {
                    System.out.printf("%d  max latency = %d ms, avg latency = %.5f
    ",
                                      i,
                                      maxLatency,
                                      (totalLatency / (double) reportingInterval));
                    totalLatency = 0L;
                    maxLatency = -1L;
                }
            }
            long ellapsed = System.currentTimeMillis() - start;
            double msgsSec = 1000.0 * numRecords / (double) ellapsed;
            double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
            System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
            producer.close();
        }

    可以看到,运行producer需要三个基本的类ProducerConfig,KafkaProducer,ProducerRecord,另外还有回调函数的类Callback。

    ProducerConfig类包含了kafka的各种配置信息,并提供了默认的配置。

    ProducerRecord类是向broker发送的消息载体,包括topic,partition,key和value属性。

    上面这两个类都很简单。

    producer所有操作都包含在KafkaProducer类中。

    这个类由Partitioner,Metadata,RecordAccumulator,Sender,Metrics这些类组成。

    Partitioner是用来计算一个消息的分片的类。

    Metadata顾名思义保存的是kafka集群的元数据,metadata的更新和topic有关。

    RecordAccumulator类似于一个队列,所有producer发出的消息都先送到队列中,等待处理。

    Sender类使用NIO方式实现了producer消息的发送与接收,sender是一个守护线程,监听读写事件,并

    Metrics类,kafka本来是被用于分布式的日志收集与监控的,Metrics类可以注册一些关注的内容,供监控使用。

    3源码分析

    我们以发送一条消息来分析producer的工作过程。

    发送一条消息可以分为异步的两个过程。

    入队过程

    @Override
        public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
            try {
                Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
                int partition = partitioner.partition(record, cluster);
                ensureValidSize(record.key(), record.value());
                TopicPartition tp = new TopicPartition(record.topic(), partition);
                FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
                this.sender.wakeup();
                return future;
            } catch (Exception e) {
                if (callback != null)
                    callback.onCompletion(null, e);
                return new FutureFailure(e);
            }
        }

    该send函数首先根据topic获取集群的基本数据,如果topic不存在,该函数会阻塞,并更新metadata。

    接下来获取分区,并将数据写入该TopicPartition下的队列中。

    public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            // check if we have an in-progress batch
            Deque<RecordBatch> dq = dequeFor(tp);
            synchronized (dq) {
                RecordBatch batch = dq.peekLast();
                if (batch != null) {
                    FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
                    if (future != null)
                        return future;
                }
            }
    
            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            ByteBuffer buffer = free.allocate(size);
            synchronized (dq) {
                RecordBatch first = dq.peekLast();
                if (first != null) {
                    FutureRecordMetadata future = first.tryAppend(key, value, compression, callback);
                    if (future != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
                        // often...
                        free.deallocate(buffer);
                        return future;
                    }
                }
                RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
                dq.addLast(batch);
                return future;
            }
        }

    这个函数上面有一大段关于send函数的用法,简单来说,send函数可以实现简单的阻塞式发送(利用Future.get()方法),以及利用回调函数,实现非阻塞发送。

    因为这个是一个向套接字写数据的过程,所以入队之后,立刻调用wakeup函数,唤醒阻塞在读数据的sender上,并发送数据。

    出队过程

    该过程是由守护线程完成的,守护线程不断循环在run函数上

    public int run(long now) {
            Cluster cluster = metadata.fetch();
            // get the list of partitions with data ready to send
            List<TopicPartition> ready = this.accumulator.ready(now);
    
            // prune the list of ready topics to eliminate any that we aren't ready to send yet
            List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
    
            // should we update our metadata?
            List<NetworkSend> sends = new ArrayList<NetworkSend>(sendable.size());
            InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
            if (metadataReq != null) {
                sends.add(metadataReq.request);
                this.inFlightRequests.add(metadataReq);
            }
    
            // create produce requests
            List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
            List<InFlightRequest> requests = collate(cluster, batches);
            for (int i = 0; i < requests.size(); i++) {
                InFlightRequest request = requests.get(i);
                this.inFlightRequests.add(request);
                sends.add(request.request);
            }
    
            // do the I/O
            try {
                this.selector.poll(5L, sends);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            // handle responses, connections, and disconnections
            handleSends(this.selector.completedSends());
            handleResponses(this.selector.completedReceives(), now);
            handleDisconnects(this.selector.disconnected());
            handleConnects(this.selector.connected());
    
            return ready.size();
        }

    代码注释很清晰了。。

    handleSends实现了入队过程中的future以及回调。

    后续的一些对网络协议的封装就不再赘述。下一篇,我会接着分析kafka producer的c客户端librdkafka

    第一次写博客或许写的不是很清楚,望大家可以多提提意见,谢谢。

  • 相关阅读:
    mysql索引类型 normal, unique, full text
    16.信号量互斥编程
    15.信号通信编程
    14.有名管道通信
    13.无名管道通讯编程
    12.多进程程序的操作
    11.进程控制理论
    10.时间编程
    9. 库函数方式文件编程
    8.Linux文件编程
  • 原文地址:https://www.cnblogs.com/xhcqwl/p/3893415.html
Copyright © 2011-2022 走看看