zoukankan      html  css  js  c++  java
  • Kafka2.0生产者客户端使用

    1 初始化配置

      Kafka 通过 KafkaProducer 构造器初始化生产者客户端的配置。
      常用的重要配置,详见官网

    • bootstrap.servers:Kafka 集群地址(host1:post,host2:post),Kafka 客户端初始化时会自动发现地址,所以可以不填写所有地址。
    • key.serializer:实现了 Kafka 序列化接口的类,用来序列化 key。
    • value.serializer:实现了 Kafka 序列化接口的类,用来序列化 value。
    • acks:leader 接收到的 follower 确认的数量需要满足 acks 的配置。
       0:生产者把消息发送出去就认为发送完成了。
       1:leader 接收到消息后,不用等 follower 的确认,就表示发送完成了。
       all/-1:leader 接收到消息后,需要所有在 ISR 集合的 follower 确认后,才表示完成了。
    • retries:消息发送失败后的重试次数。如果允许重试,而 max.in.flight.requests.per.connection>1,则可能导致消息乱序,因为如果把两批消息发送到同一个分区,第一批失败并重试,而第二批成功了,则第二批消息可能先生成了。
    • retry.backoff.ms:消息重试发送的间隔。
    • client.id:标识客户端的 id。
    • compression.type:压缩类型。可选:none、gzip、snappy、lz4。
    • buffer.memory:记录累加器可以使用的最大内存缓冲池大小。
    • batch.size:内存缓冲池的缓冲列表大小。当 batch 的大小超过 batch.size 或者时间达到 linger.ms 就会发送 batch。
    • transactional.id:事务 ID。
    // 基础配置
    Map<String, Object> configs = new HashMap<>();
    // Kafka broker 集群
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
    // key 序列化
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // value 序列化
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
    KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
    

    2 构造消息

      Kafka 提供了6种构造器来构造消息。

    • topic:消息主题,必填;
    • partition:分区号,非必填。如果为空,会计算 key 的 hash 值,再和该主题的分区总数取余得到分区号;如果 key 也为空,客户端会生成递增的随机整数,再和该主题的分区总数区域得到分区号。
    • timestamp:时间戳,非必填。如果为空,默认为 KafkaProducer 构造器初始化的时间。
    • key:消息 key,非必填。关系到分区分配,broker 会对带 key 的消息进行日志压缩。
    • value:消息内容,必填。
    • headers:消息头,非必填。
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers);
    public ProducerRecord(String topic, Integer partition, K key, V value);
    public ProducerRecord(String topic, K key, V value);
    public ProducerRecord(String topic, V value);
    

    3 发送消息

      支持同步发送和异步发送消息。

      同步发送

    producer.send(record).get();
    

      异步发送

    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            // 回调处理流程
        }
    });
    
  • 相关阅读:
    ByteArrayOutputStream 与InputStream 互相转换
    Java生成和操作Excel文件
    导出excel通用模板(程序定义模板导出)
    使用Swagger2自动生成API接口文档
    The file cannot be validated as theho st "struts.apache.org" is currently unreachable.
    Ubuntu关闭显示器技巧
    Rational License Key Error 的解决办法
    Myeclipse For Blue/Spring/Professional 9.1 破解方法及注册机 JAVA开发相关软件
    没有找到suite objects.dll 因此这个应用程序未能启动
    myEclipse下如何配置Tomcat
  • 原文地址:https://www.cnblogs.com/bigshark/p/11182403.html
Copyright © 2011-2022 走看看