zoukankan      html  css  js  c++  java
  • kafka生产者客户端

    kafka的生产者

    1. 生产者客户端开发

    ​ 熟悉kafka的朋友都应该知道kafka客户端有新旧版本,老版本采用scala编写,新版本采用java编写。随着kafka版本的升级,旧版本客户端已经快被完全替代了。因此,我们以新客户端为例进行介绍。

    ​ 客户端开发的步骤如下:

    • ​ 配置生产者客户端参数及创建相应的生产者实例

    • ​ 构建待发送的信息

    • ​ 发送信息

    • ​ 关闭生产者实例

    代码如下:

      public class ProducerFastStart {
      public static final String brokerList="node112:9092,node113:9092,node114:9092";
      public static final String topic = "topic-demo";

      public static void main(String[] args) {
      //配置生产者客户端参数
          Properties prop = new Properties();
          prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          prop.put("bootstrap.servers",brokerList);
          //创建生产者客户端
          KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
          //构建所要发送到的生产者消息
          ProducerRecord record = new ProducerRecord(topic, "hello,Kafka");
          try {
              producer.send(record);//发送消息
          } catch (Exception e) {
              e.printStackTrace();
          }finally {
              producer.close();//关闭生产者客户端
          }
      }
    }

    需要maven依赖如下:

      
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.1</version>
    </dependency>

    ​ 这里有必要对构建的消息对象ProduceRecord进行说明,ProduceRecord对象包括以下几个属性:

    ​ topic和partititon用来指定消息发送到主题分区。header是指消息头部,从0.11.x这个版本引进的。Key是指消息的键,可通过分区号让消息发往特定的分区【可以使key相同的消息发送到同一分区】,有key的消息还可以支持日志压缩的功能。value为消息体,一般不为空,如果为空则表示特定的消息——墓碑消息。timestamp指消息的时间戳,有两种类型CreateTime和LogAppendTime,前者表示消息创建时间,后者表示消息追加到日志文件的时间。

      
    public class ProducerRecord<K, V> {
      private final String topic;//主题
      private final Integer partition;//分区号
      private final Headers headers;//消息头部,其实就是一个Iterable<Header>
      private final K key;//消息的key
      private final V value;//消息的value
      private final Long timestamp;//消息的时间戳
    }

    ​ 通过以下这种方式创建ProduceRecord对象,只是指定了最基本的两个属性,topic和value。ProducerRecord包括多个构造函数,可灵活使用。

    1.1 必要的参数配置

    ​ bootstrap.servers:指定客户端连接的broker地址清单。

    ​ Key.serializer和value.serializer用于指定消息的key和value的序列化器。

    ​ client.id指定KafkaProducer的id,默认系统会自动生成。

    ​ 由于参数的名称特别多,而且是字符串容易写错,因此客户端提供了一个类ProducerConfig,包括所有的参数名称。同样需要注意,由于key和value的序列化器需要类的全限定名,可通过一下方式改进。

      
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

    Producer是thread safe,通常情况下,多个线程共享一个Producer实例要比使用多个Producer实例效率要高。生产者包括一个缓冲区空间池,其中保存尚未传输到服务器的记录,以及一个后台I/O线程,该线程负责将这些记录转换为请求并将它们传输到集群。使用后不关闭生产商将泄漏这些资源。

    1.2 消息的发送

    ​ 发送消息主要有三种方式:发后即忘(fire-and-forget)、同步sync和一部async。

    ​ (1)fire-and-forget

    ​ 只管往kafka中发送消息,不管消息是否到达。大多数情况下,这种方式不会出现问题,但当发生不可重试异常时,会造成数据丢失。性能最高,可靠性最差。以下这种方式就是fire-and-forget:

      
    try {
          producer.send(record);
      } catch (Exception e) {
          e.printStackTrace();
      }
    }

    ​ (2)sync

    ​ send()方法是有返回值的,是一个Future对象。

      
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
      return send(record, null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
      ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
      return doSend(interceptedRecord, callback);
    }

    ​ 实际上send方法本身是异步,可以通过调用Future对象的get方法阻塞等待Kafka的响应,直到消息发送成功或抛出异常【对异常可以做响应的处理】,实现同步。可以从RecordMetadata获取发送成功的ProduceRecord的相关元数据,包括topic、partition、offset、timestamp等。当然也可以通过Future的get(long timeout, TimeUnit unit)实现超时阻塞。

      
    try {
          //producer.send(record).get();
          Future<RecordMetadata> future = producer.send(record);
          RecordMetadata rm = future.get();
          System.out.println(rm.topic()+"-"+rm.partition()+"-"+rm.offset()+"-"+rm.timestamp());
      } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
      }

    ​ 另外,KafkaProducer一般会产生两类异常:可重试异常和不可重试异常。可充实异常有NetworkException、LeaderNotAvaliableException、UnKnownTopicOrPartitonException、NotEnoughRepliasException、NotCoordinatorException。对于可重试异常,可以通过配置retires属性,进行特定次数的重试,重试成功不会抛出异常,重试失败抛出异常。

      
    prop.put(ProducerConfig.RETRIES_CONFIG, 10);

    ​ 对于不可重复异常,如RecordTooLargeException,发生后支持抛出异常。

    ​ 同步方式可靠性高,要么消息发送成功,要么发生异常,可捕获进行处理。不过同步方式的性能要差一些,需要阻塞等待消息发送完之后才能发送下一条消息。

    (3)async

    ​ send()方法也是重载的,可以传入一个CallBack回调函数,kafka在响应时调用该函数来实现异步发送的确认。onCompletion这两个方法是互斥的,要么exception为null,要么metadata为null。另外需要注意的是回调函数能够保证分区有序。即如果record1先于record2先发送 ,则对应的callback1先于callback2被调用。

      
    producer.send(record, new Callback() {
      @Override
      public void onCompletion(RecordMetadata metadata, Exception exception) {
          if(exception != null) {
              exception.printStackTrace();
          }else {
              System.out.println(metadata.topic() + "-" + metadata.partition());
          }
        }
    });

    ​ 对于producer的close方法也是重载,可以实现超时强行关闭,但是一般不这样使用。

      
    public void close()
    public void close(long timeout, TimeUnit timeUnit)

    1.3 序列化

    ​ 生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka集群,同样消费者必须通过与之对应的反序列化器进行解析。kafka-client提供了多种数据类型对象的序列化器,父接口为org.apache.kafka.common.serialization.Serializer接口。

      
    public interface Serializer<T> extends Closeable {

      void configure(Map<String, ?> configs, boolean isKey);//通常重写为空方法

      byte[] serialize(String topic, T data);//序列化方法
     
      default byte[] serialize(String topic, Headers headers, T data) {
          return serialize(topic, data);
      }

      @Override
      void close();//通常重写为空方法
    }

    ​ 自定义序列化器:

    ​ 这里为了方面使用了lombok框架,maven依赖如下,注意在idea中还要安装响应的插件,否则注解不生效。

      
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.20</version>
      <scope>provided</scope>
    </dependency>
      
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Builder
    public class Company {
      private String name;
      private String address;
    }

    ​ 创建Company对象的序列化器:

      
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.Map;

    public class CompanySerializer implements Serializer<Company> {
      @Override
      public void configure(Map<String, ?> configs, boolean isKey) {
    //空实现
      }

      @Override
      public byte[] serialize(String topic, Company data) {
          if(data == null) {
              return null;
          }
          byte[] name, address;
          try {
              if(data.getName() == null) {
                  name = new byte[0];
              }else {
                  name = data.getName().getBytes("UTF-8");
              }

              if(data.getAddress() == null) {
                  address = new byte[0];
              }else {
                  address = data.getAddress().getBytes("UTF-8");
              }
    //分别用4个字节用来存储name的长度和address的长度,然后是name和address,定义一种规则方便反序列化的保证正确
              ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
              buffer.putInt(name.length);
              buffer.put(name);
              buffer.putInt(address.length);
              buffer.put(address);
              return buffer.array();
          } catch (UnsupportedEncodingException e) {
              e.printStackTrace();
          }

          return new byte[0];

      }

      @Override
      public void close() {
    //空实现
      }
    }

    ​ 使用的话只需要设置prop的key.serializer等设置为CompanySerializer即可。

    1.4 分区器

    ​ 消息通过send()方法发送到broker的过程中,有可能经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才会发往broker。拦截器不是必须的,序列化器是必须的。消息进过序列化之后就要确定发往那个分区。如果ProducerRecord中指定了partition字段,则不需要分区器的作用,如果没有,则需需要依赖于分区器,根据Producerrecord的key进行分区。

    1.4.1 默认分区器

    ​ 分区器的父接口为org.apache.kafka.clients.producer.Partitioner接口。

      
    public interface Partitioner extends Configurable, Closeable {

      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

      /**
        * This is called when partitioner is closed.
        */
      public void close();

    }

    默认分区器为:org.apache.kafka.clients.producer.internals.DefalutPartitoner,,源码如下:

      
    public class DefaultPartitioner implements Partitioner {
    //线程安全的HashMap,用于存放每个topic关联的一个原子对象
      private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

      public void configure(Map<String, ?> configs) {}

      /**
        * Compute the partition for the given record.
        *
        * @param topic The topic name
        * @param key The key to partition on (or null if no key)
        * @param keyBytes serialized key to partition on (or null if no key)
        * @param value The value to partition on or null
        * @param valueBytes serialized value to partition on or null
        * @param cluster The current cluster metadata
        */
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
      //获取kafka集群中对应topic的所有分区。
          List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
          int numPartitions = partitions.size();
          //如果Record的key为null
          if (keyBytes == null) {
          //取出当前Topic的原子变量值并加1,实际就是轮训
              int nextValue = nextValue(topic);
              //获取当前可用的分区
              List<PartitionInfo> availablePa
              rtitions = cluster.availablePartitionsForTopic(topic);
              //如果存在可用的分区,则在可用分区中进行轮训
              if (availablePartitions.size() > 0) {
                  int part = Utils.toPositive(nextValue) % availablePartitions.size();
                  return availablePartitions.get(part).partition();
              } else {
                  // 如果没有可用分区就在所有分区中进行轮训
                  return Utils.toPositive(nextValue) % numPartitions;
              }
          } else {//如果Record的key不为null
              // 根据key进行hash值的计算,放入到对应分区,保证相同的key永远放入到同一分区。
              return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
          }
      }

    //nextValue()方法就是为每个topic的产生一个随机值,便于轮训
      private int nextValue(String topic) {
          AtomicInteger counter = topicCounterMap.get(topic);
          if (null == counter) {
              counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
              AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
              if (currentCounter != null) {
                  counter = currentCounter;
              }
          }
          return counter.getAndIncrement();
      }

      public void close() {}

    }

    默认分区器的逻辑就是:

    ​ 如果key不为空,则进行对key进行hash计算分区

    ​ 如果为空,且存在可用分区,则在可用分区中轮训,不存在可用分区,则在所有分区中轮训。

    1.4.2 自定义分区器

    ​ 可通过实现partitioner接口,自定义分区器。

    ​ 分区逻辑就是,有key进行hash分区,无key在所有分区中轮训

      
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.utils.Utils;

    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicInteger;

    public class CustomPartitioner implements Partitioner {
      private final AtomicInteger counter = new AtomicInteger(0);
      @Override
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
          List<PartitionInfo> partitioners = cluster.availablePartitionsForTopic(topic);
          int numPartitions = partitioners.size();
          if (null == keyBytes) {
              return counter.getAndIncrement() % numPartitions;
          } else {
              return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
          }
      }

      @Override
      public void close() {

      }

      @Override
      public void configure(Map<String, ?> configs) {

      }
    }

    分区器的配置:

      
    prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

    1.5 生产者拦截器

    ​ 拦截器是在Kafka0.10.0.0版本出现的,有生产者拦截器和消费者拦截器两种。

    ​ 生产者拦截器将消息进行序列化和计算分区之前进行"拦截",这里所谓拦截主要体现在两个方面:

    ​ (1)为消息提供定制化的操作

    ​ (2)可以用来在发送回掉逻辑前做一些定制化的需求。

    ​ 拦截器通过自定义实现org.apache.kafka.clients.producer.ProducerInterceptor。onSend()方法对消息进行相应的定制化操作;onAcknowledgement()方法会在消息在应答之前或消息发送失败时被调用,因此此方法优先于callback方法执行。

      
    public interface ProducerInterceptor<K, V> extends Configurable {
      public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
      public void onAcknowledgement(RecordMetadata metadata, Exception exception);
      public void close();
    }

    public class ProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String,String> {
      private volatile long sendSuccess = 0;
      private volatile long sendFaliure = 0;
      @Override
      public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
          String modifiedValue = "prefix-" + record.value();
          return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue);
      }

      @Override
      public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
          if(metadata == null) {
              sendFaliure++;
          }else {
              sendSuccess++;
          }
      }

      @Override
      public void close() {
          System.out.println("发送成功率为:" + (double) sendSuccess / (sendSuccess + sendFaliure));
      }

      @Override
      public void configure(Map<String, ?> configs) {

      }
    }

    ​ 拦截器的使用:

      
    prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class.getName());

    ​ 多个拦截器之间用“,”隔开,注意多个拦截器是有顺序的。

    2. 原理分析

    ​ KafkaProudcer在真正把消息发往Kafka集群时,会依次经历拦截器、序列化器、和分区器,然后缓存到消息累加器RecordAccumulator中,Sender线程负责从RecordAccumulator中获取消息并发送到Kafka集群。

    2.1 消息发送到RecordAccumulator

    ​ KafkaProducer调用send方法发送ProducerRecord,首先会通过拦截器链进行定制化操作,然后调用了doSend方法。发放如下:

      
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
          TopicPartition tp = null;
          try {
              throwIfProducerClosed();
              // first make sure the metadata for the topic is available
              ClusterAndWaitTime clusterAndWaitTime;
              try {
              //1. 阻塞式获取metaData
                  clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
              } catch (KafkaException e) {
                  if (metadata.isClosed())
                      throw new KafkaException("Producer closed while send in progress", e);
                  throw e;
              }
              long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
              Cluster cluster = clusterAndWaitTime.cluster;
              byte[] serializedKey;
              try {
              //2.对key进行序列化
                  serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
              } catch (ClassCastException cce) {
                  throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                          " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                          " specified in key.serializer", cce);
              }
              byte[] serializedValue;
              try {
              //3.value进行序列化
                  serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
              } catch (ClassCastException cce) {
                  throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                          " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                          " specified in value.serializer", cce);
              }
              //根据消息和集群metaData计算发往的分区
              int partition = partition(record, serializedKey, serializedValue, cluster);
              //创建主题分区
              tp = new TopicPartition(record.topic(), partition);

              setReadOnly(record.headers());
              Header[] headers = record.headers().toArray();
    //估算消息记录的总长度
              int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                      compressionType, serializedKey, serializedValue, headers);
              //消息长度的有效性检验
              ensureValidRecordSize(serializedSize);
              long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
              log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
              // producer callback will make sure to call both 'callback' and interceptor callback
              //创建CallBack对象
              Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

              if (transactionManager != null && transactionManager.isTransactional())
                  transactionManager.maybeAddPartitionToTransaction(tp);
    //消息记录提交到Accumulator。
              RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                      serializedValue, headers, interceptCallback, remainingWaitMs);
              if (result.batchIsFull || result.newBatchCreated) {
                  log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                  this.sender.wakeup();
              }
              return result.future;
              // handling exceptions and record the errors;
              // for API exceptions return them in the future,
              // for other exceptions throw directly
          } catch (ApiException e) {
              log.debug("Exception occurred during message send:", e);
              if (callback != null)
                  callback.onCompletion(null, e);
              this.errors.record();
              this.interceptors.onSendError(record, tp, e);
              return new FutureFailure(e);
          } catch (InterruptedException e) {
              this.errors.record();
              this.interceptors.onSendError(record, tp, e);
              throw new InterruptException(e);
          } catch (BufferExhaustedException e) {
              this.errors.record();
              this.metrics.sensor("buffer-exhausted-records").record();
              this.interceptors.onSendError(record, tp, e);
              throw e;
          } catch (KafkaException e) {
              this.errors.record();
              this.interceptors.onSendError(record, tp, e);
              throw e;
          } catch (Exception e) {
              // we notify interceptor about all exceptions, since onSend is called before anything else in this method
              this.interceptors.onSendError(record, tp, e);
              throw e;
          }
      }

    (1)waitOnMetadata阻塞式获取metaData,超过${max.block.ms}时间依旧未获取到,则抛TimeoutException,消息发送失败。

    (2)对key和value进行序列化

    (3)根据消息和集群metaData计算发往的分区,,并创建主题分区对象。

    (4)估算消息记录的总长度的上限,并对消息记录的总长度进行检验。如果上限大于{max.request.size},抛出RecordTooLargeException异常。如果上限大于{buffer.memory},也会抛出RecordTooLargeException异常。

      
    private void ensureValidRecordSize(int size) {
      if (size > this.maxRequestSize)
          throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the maximum request size you have configured with the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
          if (size > this.totalMemorySize)
              throw new RecordTooLargeException("The message is " + size +" bytes when serialized which is larger than the total memory buffer you have configured with the " +
                      ProducerConfig.BUFFER_MEMORY_CONFIG +
                      " configuration.");
      }

    (5)将消息记录append到RecordAccumulator。

    ​ RecordAccumulator对象就是为了缓存消息便于Sender线程可以批量发送,减少网络传输的资源消耗,提升性能。既然是缓存,就肯定有大小。RecordAccumulator可由参数{buffer.memory}指定,默认是32M。那它内部数据的组织形式是怎样的呢。它内部有个private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches属性,是一个Map对象,key为主题分区,value是ProducerBatch的双端队列,正如图上所示,它对消息记录按照分区进行缓存,每个分区对应一个ProducerBatch的双端队列。那ProducerBatch又是什么呢。ProducerBatch就是ProducerRecord的批次,可以包括一个或多个ProduerRecord,ProducerBatch的大小可以通过batch.size这个参数设置,不过当一个ProducerRecord的大小超过batch.size的大小时,就会生产一个新的ProducerBatch,这个ProducerBatch的大小就是该ProducerRecord的大小。也许你会产生一个疑问,既然ProducerBatch的大小不一定等于batch.size,那么为什么还要使用这个参数,其实是为了更好的管理内存,在kafka中通过java.io.ByteBuffer实现消息内存的创建和释放,不过为了减少频繁的创建和释放内存空间,RecordAccumulator内部使用了BufferPool实现对特定大小的ByteBuffer进行管理,实现复用,特定大小就是通过batch.size这个参数进行设置,同样如果当前ProducerBatch的大小超过batch.size,那个这个ByteBuffer不能实现复用。

    ​ RecordAccumulator通过append方法将ProducerRecord追加到具体的ProducerBatch中,过程如下:

    ​ (1)记录当前正在进行append消息的线程数,方便当客户端调用 KafkaProducer.close()强制关闭发送消息操作时放弃未处理完的请求,释放资源

    ​ (2)getOrCreateDeque,获得或创建主题分区对应的ProducerBatch的双端队列。

    ​ (3)tryAppend(timestamp, key, value, headers, callback, dq),尝试将消息append到双端队列。

      
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,Callback callback, Deque<ProducerBatch> deque) {
    //从双端队列中获得最后一个ProducerBatch
          ProducerBatch last = deque.peekLast();
          if (last != null) {//如果ProducerBatch存在,则尝试将消息追加到这个ProducerBatch中。
              FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
              //如果追加不成功,关闭这个batch的记录追加
              if (future == null)
                  last.closeForRecordAppends();
              else//追加陈宫返回一个RecordAppendResult对象
                  return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
          }
          //如果这个队列为空,即不存在任何一个ProducerBatch,返回null
          return null;
      }

    再来看一下ProducerBatch如何尝试append消息。

      
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    //检查是否有足够的空间用来缓存该消息,如果没有返回null
          if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
              return null;
          } else {//如果有则进行缓存
          //追加消息
              Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
              //计算最大消息记录
              this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                      recordsBuilder.compressionType(), key, value, headers));
              //计算最后一个添加消息的时间
              this.lastAppendTime = now;
              //构建一个FutureRecordMetadata对象
              FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                      timestamp, checksum,
                                                                      key == null ? -1 : key.length,
                                                                      value == null ? -1 : value.length,
                                                                      Time.SYSTEM);
              //添加thunck对象
              thunks.add(new Thunk(callback, future));
              //记录加1
              this.recordCount++;
              return future;
          }
      }

    (4)若上述尝试append消息失败,即返回null,此时需要向BufferPool申请空间用于创建新的ProducerBatch对象,并将消息append到新创建的ProducerBatch中,最后返回处理结果。

      
    public RecordAppendResult append(TopicPartition tp,
                                        long timestamp,
                                        byte[] key,
                                        byte[] value,
                                        Header[] headers,
                                        Callback callback,
                                        long maxTimeToBlock) throws InterruptedException {
          //记录当前正在进行append消息的线程数,方便当客户端调用 KafkaProducer.close()强制关闭发送消息操作时放弃未处理完的请求,释放资源
          appendsInProgress.incrementAndGet();
          ByteBuffer buffer = null;
          if (headers == null) headers = Record.EMPTY_HEADERS;
          try {
              // 获得或创建主题分区对应的ProducerBatch的双端队列
              Deque<ProducerBatch> dq = getOrCreateDeque(tp);
              synchronized (dq) {
                  if (closed)
                      throw new KafkaException("Producer closed while send in progress");
                  //尝试将消息append到双端队列
                  RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                  if (appendResult != null)
                      return appendResult;
              }

               
              byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
              int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
              log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
              // 4.向BufferPool申请空间用于创建新的ProducerBatch对象
              buffer = free.allocate(size, maxTimeToBlock);
              synchronized (dq) {
                  // Need to check if producer is closed again after grabbing the dequeue lock.
                  if (closed)
                      throw new KafkaException("Producer closed while send in progress");
    //将消息append到新创建的ProducerBatch中
                  RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                  if (appendResult != null) {
                      // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                      return appendResult;
                  }

                  MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                  ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                  FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

                  dq.addLast(batch);
                  incomplete.add(batch);

                  // Don't deallocate this buffer in the finally block as it's being used in the record batch
                  buffer = null;
                  return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
              }
          } finally {
              if (buffer != null)
                  free.deallocate(buffer);
              appendsInProgress.decrementAndGet();
          }
      }

    2.2 Sender发送消息至Kafka集群

    ​ Sender从RecordAccumulator中获取获取缓存的消息后,会进一步将<TopicPartition, Deque<ProducerBatch>>封装成<Node, List<ProducerBatch>>,还会进一步封装为<Node,Request>的形式。Sender发送到kafka之前还会保存到保存到InFlightRequest中,InFlightRequest保存对象的具体格式为Map<NodeId, Deque<Request>>,主要作用是缓存了已经发出去的Request。其中可通过一个参数max.in.flight.requests.per.connection(默认为5)设置客户端与每个Node之间缓存的Request的最大值。超过这份最大值,就不能再向这个连接发送请求了。因此可以通过 Deque<Request>的size来判断对用的Node中是否堆积了很多未处理的消息,如果真是如此,说明Node节点的网络负载较大或者连接有问题。

    2.3 元数据的更新

    所谓的元数据是指的Kafka集群的元数据,包括集群中的主题、分区、Leader、Follower等,当客户端不存在需要使用的元数据信息或者超过metadata.max.age.ms[默认5分钟],会引起元数据的更新。当元数据需要更新时,会首先挑选出负载最小的node,向他发送MetaDataRequest请求,这个更新操作由send线程发起,同样会存入InFlightRequest中。由于主线程也需要元数据,因此需要通过synchronize和final关键字保证。

    2.4 生产者客户端的重要参数

    • acks

      取值有0,1,-1,用于指定分区中至少有多少副本收到这个现象,之后生产者才会认为该消息被写入。

    • max.request.size

      限制生产者客户端发送消息的最大值

    • reties 生产者发送出现异常时的重试次数

    • retry.backoff.ms 每次重试的时间间隔

    • compression.type 生产者端消息的压缩方式

    • connections.max.idles.ms 连接限制关闭时间

    • linger.ms 用于配置ProducerBatch等待加入ProducerRecord的时间

    • receive.buffer.bytes Socket接收消息缓冲区

    • send.buffer.bytes Socket发送器的缓冲区

    • request.timeout.ms 生产者等待请求响应的最长时间,请求超时可以进行重试。这个参数大于broker端的replia.lag.time.max.ms

    • buffer.memory 生产者客户端用于缓存消息的缓冲区大小

    • batch.size 指定ProducerBatch可以复用缓冲区的大小

    • max.block.ms 生产者send方法和paritionFor方法的阻塞时间

    • max.in.flight.requests.per.connection 限制每个链接最多缓存的请求数量

    • metadata.max.age.ms 更新元数据的时间

  • 相关阅读:
    LIBSVM使用介绍
    Symbian开发平台的搭建之VC++6.0&&Carbide C++ 2.0
    traits:Traits技术初探
    SDK与IDE的选择(附上设置默认SDK)
    浅析COM的思想及原理
    Windows Live Writer 支持的博客
    JQuery笔记(四) 通用选择的尝试
    JQuery笔记(一)
    JQuery笔记(二) animate支持的属性
    在DW绿化版或者精简版中使用扩展管理
  • 原文地址:https://www.cnblogs.com/gdy1993/p/11084056.html
Copyright © 2011-2022 走看看