zoukankan      html  css  js  c++  java
  • Kafka-生产者发消息流程、使用及常用配置

    Kafka-生产者

    生产者发送消息流程

     

    1.新建ProducerRecord对象,包含目标主题和要发送的内容。也可以指定键或分区

    2.发送ProducerRecord对象时,生产者要把键和值对象序列化成字节数组,这样它们才能在网络上传输

    3.数据被传给分区器。

    如果ProducerRecord对象中指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。

    如果没有指定分区,那么分区器会根据ProducerRecord对象的键来选择一个分区。

    选择好分区后,生产者就知道该往哪个主题和分区发送这条记录了。

    4.这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。

    有一个独立的线程负责把这些记录批次发送到相应的broker上。

    5.服务器在收到这些消息时会返回一个相应。

    如果消息成功写入kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。

    如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

    创建生产者

    kafka生产者有3个必选的属性

    bootstrap.servers

    改属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其它broker的信息。不过建议至少要提供两个broker的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

    key.serializer

    broker希望接收到的消息的键和值都是字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。

    kafka客户端默认提供了ByteArraySerializerStringSerializerIntegerSerializer,因此,如果只使用常见的Java对象类型,就没有必要实现自己的序列化器。

    value.serializer

    key.serializer一样,value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与key.serializer一样的序列化器。

    发送消息主要有三种方式

    1.发送并忘记(fire-and-forget)

    把消息发送给服务器,但并不关心它是否正常到达。

    2.同步发送

    使用send()方法发送消息,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

    3.异步发送

    调用send()方法,并指定一个回调函数,服务器在返回响应时调用该函数

    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    
    /**
     * @Author FengZhen
     * @Date 2020-03-29 12:21
     * @Description kafka生产者使用
     */
    public class KafkaProducerTest {
    
        private static Properties kafkaProps = new Properties();
        static {
            kafkaProps.put("bootstrap.servers", "localhost:9092");
            kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
    
        public static void main(String[] args) {
            KafkaProducer<String, String> producer = new KafkaProducer(kafkaProps);
            ProducerRecord<String, String> record = new ProducerRecord<>("test","message_key","message_value");
    //        simpleSend(producer, record);
    //        sync(producer, record);
            aync(producer, record);
        }
    
        /**
         * 最简单的方式发送,不管消息是否正常到达
         * @param producer
         */
        public static void simpleSend(KafkaProducer producer, ProducerRecord record){
            try {
                producer.send(record);
            } catch(Exception e){
                e.printStackTrace();
            }
        }
    
        /**
         * 同步发送
         * @param producer
         * @param record
         */
        public static void sync(KafkaProducer producer, ProducerRecord record){
            try {
                RecordMetadata recordMetadata = (RecordMetadata) producer.send(record).get();
                System.out.println("topic:" + recordMetadata.topic());
                System.out.println("partition:" + recordMetadata.partition());
                System.out.println("offset:" + recordMetadata.offset());
                System.out.println("metaData:" + recordMetadata.toString());
            } catch(Exception e){
                e.printStackTrace();
            }
        }
    
        /**
         * 异步发送
         * @param producer
         * @param record
         */
        public static void aync(KafkaProducer producer, ProducerRecord record){
            try {
                producer.send(record, new DemonProducerCallback());
                while (true){
                    Thread.sleep(10 * 1000);
                }
            } catch(Exception e){
                e.printStackTrace();
            }
        }
    
        private static class DemonProducerCallback implements Callback {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (null != e){
                    e.printStackTrace();
                }else{
                    System.out.println("topic:" + recordMetadata.topic());
                    System.out.println("partition:" + recordMetadata.partition());
                    System.out.println("offset:" + recordMetadata.offset());
                    System.out.println("metaData:" + recordMetadata.toString());
                }
    
            }
        }
    }

    输出如下

    topic:test
    partition:0
    offset:2
    metaData:test-0@2

    生产者的配置

    1.acks

    acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数消息丢失的可能性有重要影响。

    如果acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

    如果acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果发送客户端等待服务器的相应,显然会增加延迟。如果客户端是使用异步回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)

    如果acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟会更高,因为需要等待不止一个服务器节点接收消息。

    2.buffer.memory

    该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置max.block.ms参数,此参数设置抛出异常之前可以阻塞的一段时间

    3.compression.type

    默认情况下,消息发送时不会被压缩。该参数可以设置为snappygziplz4,它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩。snappy压缩算法由Google发明,它占用较少的CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip压缩算法一般会占用较多的CPU,但会提供更高的压缩比,如果网络带宽有限,可以使用这种算法。

    使用压缩可以降低网络传输开销和存储开销,这也是kafka的瓶颈所在。

    4.retries

    生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,不过可以通过retry.backoff.ms参数来改变这个时间间隔。

    5.batch.size

    当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置的很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置的很小,生产者会更频繁的发送消息,会增加一些额外的开销。

    6.linger.ms

    该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,就算批次里只有一个消息,生产者也会把消息发送出去。把此值设置成比0大的数,让生产者在发送批次之前等待一会,使更多的消息加入这个批次。虽然这样会增加延迟,但也会提升吞吐量。

    7.client.id

    可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。

    8.max.in.flight.requests.per.connection

    该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

    9.request.timeout.ms

    指定了生产者在发送数据时等待服务器返回响应的时间

    10.metadata.fetch.timeout.ms

    指定了生产者在获取元数据时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误。

    11.timeout.ms

    指定了broker等待同步副本返回消息确认的时间,与asks的配置相匹配--如果在指定时间内没有收到同步副本的确认,那么broker就会返回一个错误。

    12.max.block.ms

    指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到此值时,生产者会抛出超时异常。

    13.max.request.size

    用于设置生产者发送的请求大小。可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。

    broker对可接收的消息最大值也有自己的限制,message.max.bytes,两边的配置最好可以匹配,避免生产者发送的消息被broker拒绝

    14.receive.buffer.bytessend.buffer.bytes

    分别指定了TCP socket接收和发送数据包的缓冲区大小。如果被设置为-1,就是用操作系统默认值。如果生产者或消费者与broker处于不同的数据中细腻,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

    顺序保证

    kafka可以保证同一个分区里的消息是有序的。生产者按照一定的顺序发送消息,broker就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。

    如果应用场景要求消息是有序的,可以把max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其它的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

  • 相关阅读:
    jQuery 选择器
    使用JQuery获取对象的几种方式
    多层架构+MVC+EF+AUTOFAC+AUTOMAPPER
    ASP.NET 2.0服务器控件开发的基本概念(转载)
    系统构架设计应考虑的因素
    超级面试题
    架构的点滴
    程序员的职业素养---转载
    imovie的快速入门
    实用的设计模式【二】——类的组织
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/12593406.html
Copyright © 2011-2022 走看看