zoukankan      html  css  js  c++  java
  • Kakfa生产者——API开发

    客户端开发

    步骤:

    • 配置生产者参数
    • 构建待发送的消息
    • 发送消息
    • 关闭生产者实例
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    
            KafkaProducer producer = new KafkaProducer(properties);
            ProducerRecord producerRecord = new ProducerRecord("topic1","key","value");
    
            try {
                producer.send(producerRecord);
            }catch (Exception e){
                e.printStackTrace();
            }
    

    必要的配置参数

    • bootstrap.servers kafka集群地址,不需要所有的集群地址,生产者会从给定的broker中查找到其他的broker信息。建议至少设置两个以上的broker地址,在初始连接中,如果其中一个宕机可以选择另一个进行连接。
    • key.serializer and value.serializer broker端接受消息必须以字节数组的形式存在(网络传输)。这里表示的序列化方式,除了示例的字符串型的序列化器,还有其他如integer、object类型的序列化器,但是字符串型的序列化器能够满足大部分的需求需要。
    • client.id 设定producer对应的客户端id,一般不用配置。

    消息的构建

    ProducerRecord是要发送的消息的构建类,除了上面指出的topic和key、value,还有其他属性供我们去选择。

        private final String topic;
        private final Integer partition;
        private final Headers headers;
        private final K key;
        private final V value;
        private final Long timestamp;
    
    • topic 主题
    • partition 分区号
    • key 指定消息的键,不仅是消息的附加信息,还可以用来计算分区号进而可以让信息发送到指定的分区,也就是说,可以通过key值得设置指定消息向同一个分区发送。
    • value 消息体,也就是要发送的消息。
    • timestamp 消息时间戳。

    也就是说除了示例的构建方法外,还有其他的重载构建方法,只不过需要提供的参数不同罢了。

         public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
            if (topic == null) {
                throw new IllegalArgumentException("Topic cannot be null.");
            } else if (timestamp != null && timestamp < 0L) {
                throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
            } else if (partition != null && partition < 0) {
                throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
            } else {
                this.topic = topic;
                this.partition = partition;
                this.key = key;
                this.value = value;
                this.timestamp = timestamp;
                this.headers = new RecordHeaders(headers);
            }
        }
        public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
            this(topic, partition, timestamp, key, value, (Iterable)null);
        }
    
        public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
            this(topic, partition, (Long)null, key, value, headers);
        }
    
        public ProducerRecord(String topic, Integer partition, K key, V value) {
            this(topic, partition, (Long)null, key, value, (Iterable)null);
        }
    
        public ProducerRecord(String topic, K key, V value) {
            this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
        }
    
        public ProducerRecord(String topic, V value) {
            this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
        }
    

    消息的发送

    消息的发送方式有三种模式:

    • 发送即忘
    • 同步发送
    • 异步发送

    示例的发送方式就是发送即忘的发送方式,在发送消息后并不关心消息是否正确到达。这种方法方式的性能最高,可靠性最差。

    send()方法的返回值并不是void类型,而是Future类型,,send()方法有两个重载方法:

        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            return this.send(record, (Callback)null);
        }
    
        public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
            return this.doSend(interceptedRecord, callback);
        }
    

    同步发送

    而要实现同步的发送方式,可以利用返回的Fature对象实现。

            try {
                Future<RecordMetadata> future = (Future<RecordMetadata>) producer.send(producerRecord).get();
                RecordMetadata recordMetadata = future.get();
                recordMetadata.topic();
                recordMetadata.partition();
                recordMetadata.offset();
                recordMetadata.timestamp();
                recordMetadata.serializedKeySize();
                recordMetadata.serializedValueSize();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    

    send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。也就是通过调用get方法阻塞等待kafka的响应,知道消息发送成功,或者发生异常,如果发生异常,就可以捕获异常进行逻辑处理。

    RecordMetadata对象包好了消息的 元数据信息,比如当前消息的主题、分区、偏移量等。

    Fature表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消,以及获取任务的结果或取消任务。Fature的get(long timeout,TimeUnit unit)方法实现可超时的阻塞。

    KafkaProducer中一般会发生两种类型的异常,可重试异常和不可重试异常。

    常见的可重试异常有:NetworkException(网络异常,可能由于网络的瞬时故障导致的异常,可通过重试解决)、LeaderNotAvailableException(Leader不存在异常,通常发生在leader副本下线而没有选出新的leader之前)、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException等。

    不可重试异常:RecordTooLargeException异常,发送消息太大,此时生产者不会进行任何重试,直接抛出异常。

    对于可重试的异常,如果配置了retries参数,那么只要在规定的重试次数内自行恢复,就不会抛出异常。

    同步发送可靠性高,消息发送要么成功,要么发生异常。如果发生异常,可以捕获相应异常进行相应处理。但是性能会差很多,需要阻塞等待一条消息发送完之后,才能发送下一条。

    异步发送

    异步发送一般是在sned()方法里指定一个CallBack的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。

             producer.send(producerRecord, new Callback() {
                 public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                     if(e == null){
                         System.out.println("send success");
                         recordMetadata.topic().....
                     }else{
                         System.out.println("send fail"+e.getMessage());
                     }
                 }
             });
    

    通过添加回调函数,重写方法,对异常进行判空处理,就可以对发生异常和发送成功进行不同的处理,这里只做了简单的输出处理,在日常开发中,可以将异常信息记录,亦可以进行消息重发。

    close

    close()方法会等待之前所有的发送请求完成后再关闭KakfaProducer,与此同时,Kafka还提供了一个带超时时间的close()方法:

     public void close(long timeout,TimeUnit timeUnit)
    

    如果调用了带超时时间的timeout的close方法,在规定时间内没有完成发送,就会强行退出,在实际应用中,一般使用无参的close方法。

  • 相关阅读:
    返回一个整数数组中最大子数组的和
    VMware安装CentOS7的详细过程
    Spark Streaming实时数据分析
    Spark SQL快速离线数据分析
    Spark-HBase集成错误之 java.lang.NoClassDefFoundError: org/htrace/Trace
    Spark2.X集群运行模式
    Spark on Yarn运行错误:Yarn application has already ended! It might have been killed or unable to launch application master
    基于IDEA环境下的Spark2.X程序开发
    Spark2.X环境准备、编译部署及运行
    Cloudera HUE大数据可视化分析
  • 原文地址:https://www.cnblogs.com/luckyhui28/p/12001662.html
Copyright © 2011-2022 走看看