zoukankan      html  css  js  c++  java
  • Kafka:生产者

    Kafka java客户端数据生产流程解析

    image-20210205141137316

    ProducerRecord

    ProducerRecord 含义: 发送给Kafka Broker的key/value 值对

    //ProducerRecord的成员变量
    public class ProducerRecord<K, V> {
    
        private final String topic;//主题
        private final Integer partition;//分区号
        private final Headers headers;//消息头
        private final K key;//键
        private final V value;//值
        private final Long timestamp;//消息时间戳
    

    headers:可以设定一些与应用相关的信息

    KafkaProducer

    KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将它实例进行池化来统一管理。

    KafkaProducer的参数众多,bootstrap.servers,retries,key.serializer等,开发人员很难记住所有的配置,也很容易写错,所以可以用org.apache.kafka.clients.producer.ProducerConfig类预防:

            Properties properties = new Properties();
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.RETRIES_CONFIG, "10");
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.51:9092");
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    

    消息的发送

    发后即忘

    producer.send(record);
    

    上述这种方式就是发后即忘,它只往kafka中发送消息并不关心消息是否正确到达,可能会造成消息丢失,发送性能最高,但是可靠性最差。

    同步

        @Override
        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            return send(record, null);
        }
    

    因为直接send返回值是Future,我们知道Future.get()会阻塞线程直至线程运行结果返回,通过此方法即可达到同步目的。

                try {
                    producer.send(record).get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
    

    Future中可以获取一个RecordMetadata对象,包含了消息大的一些元数据信息,比如主题、分区号、分区中的偏移量(offset)、时间戳等。

    此外Future.get(long timeout, TimeUnit unit)可以实现可超时的阻塞,

    异步

    producer.send(record, (metadata, exception) -> {
        System.out.println(metadata.topic());
    });
    

    当kafka有响应时,会触发回调方法

    序列化器

    生产者需要用序列化器把对象转换成字节数组才能通过网络发送给kafka。而在消费者需要用反序列化器把字节数组转成相应对象。

    序列化器需要实现Serializer接口。客户端实现了多种序列化器供我们开发使用。

    public interface Serializer<T> extends Closeable {
    
        /**
         * Configure this class.
         * @param configs configs in key/value pairs
         * @param isKey whether is for key or value
         */
        //用来配置当前类
        void configure(Map<String, ?> configs, boolean isKey);
    
        /**
         * Convert {@code data} into a byte array.
         *
         * @param topic topic associated with data
         * @param data typed data
         * @return serialized bytes
         */
        //序列化操作
        byte[] serialize(String topic, T data);
    
        /**
         * Close this serializer.
         *
         * This method must be idempotent as it may be called multiple times.
         */
        @Override
        //关闭当前序列化器
        void close();
    }
    

    生产者和消费者使用的序列化器是需要一一对应的。例如生产者shencghan端使用了StringSerializer,那么消费者端需要使用StringDeserializer。

    分区器

    消息在通过send()方法发往broker过程中,有可能经过拦截器、序列化器、分区器的一系列作用后才能真正发送到broker。拦截器一般不是必需的,而序列化器时必需的。消息经过序列化后就需要确定它发往的分区。

    如果ProducerRecord 中指定了partition字段,那么就不需要分区器的作用。如果没有指定,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。

    kafka默认分区器是DefaultPartitioner,实现了Partitioner接口

    public interface Partitioner extends Configurable, Closeable {
    
        /**
         * Compute the partition for the given record.
         *
         * @param topic The topic name 主题名
         * @param key The key to partition on (or null if no key) 键
         * @param keyBytes The serialized key to partition on( or null if no key) 序列化后的键
         * @param value The value to partition on or null 值
         * @param valueBytes The serialized value to partition on or null 序列化后的值
         * @param cluster The current cluster metadata 集群元数据信息
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    
        /**
         * This is called when partitioner is closed.
         */
        public void close();
    
    }
    

    Partitioner的父接口Configurable

    public interface Configurable {
    
        /**
         * Configure this class with the given key-value pairs
         */
        //该方法主要用来获取配置信息以及配置初始化数据
        void configure(Map<String, ?> configs);
    
    }
    

    在默认DefaultPartitioner中的分区计算方法:

    image-20210205160014692

    我们也可以自定义分区器,然后配置在properties:

    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"....");
    

    生产者拦截器

    实现生产者拦截器只需要实现ProducerInterceptor接口

    public interface ProducerInterceptor<K, V> extends Configurable {
    
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    
    
        public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    
    
        public void close();
    }
    

    onSend方法调用时机:KafkaProducer在将消息序列化和计算分区之前会调用拦截器的onSend()方法来对消息进行相应的定制化操作,一般来说不要修改ProducerRecord 的topic、key、partition信息。因为可能会影响分区的计算,同样会影响broker端日志压缩。

    onAcknowledgement方法调用时机:该方法会在消息被应答之前或消息发送失败时调用,优先于用户设定的Callback之前执行。

    close:主要用于在关闭拦截器时执行一些资源的清理工作

    自定义生产者拦截器:

    public class MyProducerInterceptor implements ProducerInterceptor {
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
            System.out.println("onSend");
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            System.out.println("onAcknowledgement");
        }
    
        @Override
        public void close() {
            System.out.println("close");
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    //设置properties参数
    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
    

    image-20210205161251160

    此外可以指定多个拦截器,形成拦截器链,拦截器链会按照配置的拦截器顺序来一一执行(各个拦截器之间用逗号隔开)

    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName()+","+MyProducerInterceptor2.class.getName());
    

    原理分析

    image-20210205162308652

    整个生产者客户端由两个线程协调运行,分别为主线程和Sender线程(发送线程)

    RecordAccumulator:消息累加器,主要用来缓存消息以便Sender线程可以批量发送。RecordAccumulator缓存大小可以通过生产者客户端参数buffer.memory配置,默认32MB.如果生产者发送消息速度超过发送到服务器速度,会导致生产者空间不足,这时候,send方法会要么被阻塞要么抛出异常,取决于max.block.ms配置,默认60000,即60s。

    主线程发送过来的消息会被追加到RecordAccumulator的某个双端队列中Deque,即Deque<ProducerBatch>。ProducerBatch中包含一个至多个ProducerRecord。将较小的ProducerRecord拼凑成较大的ProducerBatch,可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch的大小跟batch.size有关。

    Sender从RecordAccumulator中获取到缓存的消息之后,会将原本<分区,Deque<ProducerBatch>>的保存形式转换为<Node,List<ProducerBatch>>,Node表示Kafka的broker节点。

    转换为<Node,List<ProducerBatch>>之后,会进一步<Node,Request>的形式,这样就可以将Request发往各个Node了。Request表示Kafka的各种协议请求。

    请求在发往kafka之前还会保存到InFlightRequests中,它的具体形式为Map<NodeId,Deque<Request>>,它的作用是缓存了已经发出去但还没有收到相应的请求。配置参数:max.in.flight.requests.per.connection,默认值是5,即每个连接最多只能缓存5个未响应的请求,超过该数目就不能向这个连接发送更多请求了。

    重要的生产者参数

    以下仅介绍部分参数,还有一些高级功能(事务、幂等)这里不做介绍。

    acks

    这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者者才会认为这条消息是成功写入的。

    acks的值都是字符串类型的。

    acks=1,默认值,生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。acks设置为1,是消息可靠性和吞吐量之间的这种方案。

    acks=0,生产者发送消息之后不需要等待任何服务端的响应。

    acks=-1或acks=all,生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够来自服务端的成功响应。acks=-1可以达到最强的可靠性。

    max.request.size

    这个参数用来限制生产者客户端能发送的消息的最大值,默认1MB。这个参数在对kafka整体脉络没有把控的时候,不建议修改,因为这个参数还涉及到其他参数的修改。比如broker端的message.max.bytes参数。

    retries和retry.backoff.ms

    retries参数用来配置生产者重试的次数,默认值是0

    retry.backoff.ms用来设置两次重试之间的时间间隔,避免无效的频繁重试,默认值是100ms。

    connections.max.ide.ms

    这个参数用来指定在多久之后关闭闲置的连接,默认是9分钟

    linger.mx

    指定生产者发送ProducerBatch之前等待更多消息加入ProducerBatch的时间,默认为0.

    request.timeout.ms

    用来配置Producer等待请求响应的最长时间,默认时间3000ms。请求超时后可以选择重试。

    receive.buffer.bytes

    设置socket接收消息缓冲区(SO_RECBUF)的大小,默认值32768B,即32KB,如果设置为-1,则为操作系统默认值。

    send.buffer.bytes

    设置socket发送消息缓冲区(SO_RECBUF)的大小,默认值131072B,即128KB,如果设置为-1,则为操作系统默认值。

  • 相关阅读:
    spring的@Transactional注解详细用法
    centos7安装Docker详细步骤(无坑版教程)
    FastDFS安装
    免费下载 Ksuite 2.80 for KESS V2 V5.017
    GODIAG GD201 VS Foxwell NT680 PRO
    GODIAG V600 BM 使用 BMW ICOM 软件进行 BMW FEM/BDC 模块诊断
    Launch X431 TSGUN TPMS诊断工具测试报告
    2021 Nissan Altima 为 Autel IM608 添加智能钥匙
    怎么解决2M2 Magic Tank自动更新后无法校准?
    SpringBoot之定时任务详解
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/14379069.html
Copyright © 2011-2022 走看看