zoukankan      html  css  js  c++  java
  • Kafka 核心 API ==> Producer 生产者

    一、发送消息的模式

      在上文中介绍了AdminClient API 的使用,现在我们已经知道如何在应用中通过API去管理Kafka了。但在大多应用开发中,我们最常面临的场景就是发送消息到Kafka,或者从Kafka中消费消息,也就是典型的生产/消费模式。而本文将要演示的就是如何使用 Producer API 将消息发送至Kafka中,使应用成为一个生产者。
    Producer API具有以下几种发送模式:
    • 异步发送
    • 异步阻塞发送
    • 异步回调发送

    二、异步发送

      首先,我们需要创建一个Producer实例,并且必须配置三个参数,分别是Kafka服务的ip地址及端口号,以及消息key和value的序列化器(消息体以key-value结构形式存在)。在本例中,消息的key和value均为String类型,所以使用StringSerializer这个字符串类型的序列化器。代码示例:

    /**
     * 创建Producer实例
     */
    public static Producer<String, String> createProducer() {
        Properties prop = new Properties();
        // 指定Kafka服务的ip地址及端口号
        prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092");
        // 指定消息key的序列化器
        prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 指定消息value的序列化器
        prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return new KafkaProducer<>(prop);
    }

    new KafkaProducer 时,构造器里做了什么:

    • 读取 Properties 里的配置项,初始化 ProducerConfig
    • 基于 ProducerConfig 初始化一些配置字段
    • 初始化 MetricConfig 监控度量指标配置以及 MetricsReporter 报告器列表和 Metrics 存储库
    • 从配置中加载 partitioner 负载均衡器,当有多个 partition 时就是通过这个负载均衡器去将消息均匀的分发到不同的 partition
    • 从配置中加载消息 key 和 value 的序列化器(Serializer)
    • 初始化 RecordAccumulator,一个类似于计数器的东西,用于计算消息批次的。因为 Producer 并不是接收到一条消息就发送到一条消息,而是达到一定批量后按批次发送的,所以需要有一个计数器来存储和计算批次。
    • 初始化用于发送消息的 Sender,然后会为其创建一个守护线程,并启动

    Tips:

    • 如果细看了KafkaProducer构造器的源码,就会发现其所有的属性都是final的,并且均在构造器中完成了初始化,不存在不安全的发布或共享变量,这也就变相说明了KafkaProducer是线程安全的。
    然后调用 Producer 中的 send 方法即可实现异步发送。代码示例:
    /**
     * 异步发送单条消息
     */
    public static void sendSingleMsgAsync(String topicName, String key, String msg) {
    
        // 构建 KafkaPorducer 实体
        Producer<String, String> producer = createProducer();
        // 构造消息对象
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, msg);
        // 发送消息,不做结果处理的话就是异步模式
        producer.send(record);
        // 关闭资源
        producer.close();
    }
    
    /**
     * 异步发送多条消息
     */
    public static void sendMultipleMsgAsync(String topicName, String key, String msg) {
    
        // 构建 KafkaPorducer 实体
        Producer<String, String> producer = createProducer();
    
        for (int i = 0; i < 100; i++) {
          // 构造消息对象
          ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key + i, msg + i);
          // 发送消息,不做结果处理的话就是异步模式
          producer.send(record);
        }
        // 关闭资源
        producer.close();
    }

    producer.send(record) 里主要做了以下事情:

    • 使用序列化器去序列化消息的key和value
    • 计算分区,即计算消息具体进入哪一个partition,也就是一个负载均衡的过程
    • 计算批次,判断是否需要创建新的批次,然后都需要调用accumulator.append向批次中追加消息
    • 当批次满了,调用sender.wakeup在守护线程中去发送消息

    大致时序图如下:

    发送消息的具体流程图如下:

    三、异步阻塞发送

      send 方法会有一个 Future 类型的返回值,当我们调用 Future get 方法时,就会阻塞当前线程,此时就达到了异步阻塞发送消息的效果,即发送消息是异步的,获取结果是阻塞的。我们可以通过这种方式去获取 Future 里存储的元数据信息。代码示例:
    /**
     * 异步阻塞式发送单条消息
     * 效果等同于同步发送
     */
    public static void sendSingleMsgSync(String topicName, String key, String msg) throws Exception {
        // 构建 KafkaPorducer 实体
        Producer<String, String> producer = createProducer();
        // 构造消息对象
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, msg);
        // 发送消息,不做结果处理的话就是异步模式
        Future<RecordMetadata> future = producer.send(record);
        // 调用get时会阻塞当前线程,就能实现异步阻塞式地发送
        // 发送完就马上get已经同等于同步的效果了
        RecordMetadata metadata = future.get();
    
        System.out.println(String.format(
                "send msg to topic = %s, partition = %d, offset = %s 
    ",
                metadata.topic(),
                metadata.partition(),
                metadata.offset()));
    
        // 关闭资源
        producer.close();
    }
    
    /**
     * 异步阻塞式发送多条消息
     * 效果等同于同步发送
     */
    public static void sendMultipleMsgSync(String topicName, String key, String msg) throws Exception {
        // 构建 KafkaPorducer 实体
        Producer<String, String> producer = createProducer();
    
        for (int i = 0; i < 100; i++) {
          // 构造消息对象
          ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key + i, msg + i);
          // 发送消息,不做结果处理的话就是异步模式
          Future<RecordMetadata> future = producer.send(record);
          // 调用get时会阻塞当前线程,就能实现异步阻塞式地发送
          // 发送完就马上get已经同等于同步的效果了
          RecordMetadata metadata = future.get();
    
          System.out.println(String.format(
                  "send msg to topic = %s, partition = %d, offset = %s 
    ",
                  metadata.topic(), metadata.partition(), metadata.offset()));
        }
        // 关闭资源
        producer.close();
    }

    四、异步回调发送

      如果想要在发送完消息后获取结果,比起直接调用 Future get 方法更好的方式是使用异步回调的消息发送形式。在send方法中支持传入一个回调函数,当消息发送完毕后,会调用回调函数并将结果当作参数传入,此时我们就可以在回调函数中对结果进行处理。代码示例:

    /**
     * 异步回调发送多条消息
     */
    public static void sendMultipleMsgWithCallback(String topicName, String key, String msg) {
        // 构建 KafkaPorducer 实体
        Producer<String, String> producer = createProducer();
    
        for (int i = 0; i < 100; i++) {
          // 构造消息对象
          ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key + i, msg + i);
          // 异步回调发送
          producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
              System.out.printf("send msg to topic = %s, partition = %d, offset = %s 
    ",
                      metadata.topic(), metadata.partition(), metadata.offset());
            }
          });
        }
        // 关闭资源
        producer.close();
    }

    五、自定义负载均衡器

      在某些特殊的业务场景下我们经常会有自定义负载均衡算法的需求,在Kafka中可以通过实现Partitioner接口来自定义Partition负载均衡器。本例中所实现的负载均衡算法比较简单,就是使用keyhashcode去对 partition 的数量进行取余得出 partition 的索引,发送消息时候会根据计算结果往对应的分区发送,代码示例:

    /**
     * 自定义Partition负载均衡器
     */
    public class SimplePartitioner implements Partitioner {
    
      @Override
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取topic的分区数
        int partitionsNum = cluster.partitionsForTopic(topic).size();
        // 获取消息key的hashCode
        int hashCode = key.hashCode();
        // hashCode如果是负数则需要转换为正数
        hashCode = hashCode < 0 ? Math.abs(hashCode) : hashCode;
    
        // 用key的hashCode和topic的分区数取模,使消息均匀的发送到不同的分区上
        return hashCode % partitionsNum;
        // 也可以指定发送到哪个分区上,一半根据条件判断发送到具体的分区上
        // return 1;
      }
    
      @Override
      public void close() {
      }
    
      @Override
      public void configure(Map<String, ?> configs) {
      }
    }

    然后在创建 Producer 实例时,增加一条配置,指定 SimplePartitioner 类的包名路径即可。代码示例:

    /**
     * 创建Producer实例
     */
    public static Producer<String, String> createProducer() {
        Properties prop = new Properties();
        // 指定Kafka服务的ip地址及端口号
        prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.182.128:9092");
        // 指定消息key的序列化器
        prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 指定消息value的序列化器
        prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 指定自定义的Partition负载均衡器
        prop.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, SimplePartitioner.class.getName());
    
        return new KafkaProducer<>(prop);
    }

    六、Kafka 的消息传递保障

    我们首先要了解一下消息的传递语义,一般存在三种类型语义:

    • At most once(最多一次):消息传递过程中有可能丢失,丢失的消息也不会重新传递,其实就是保证消息不会重复发送或者重复消费
    • At least once(至少一次):消息在传递的过程中不可能会丢失,丢失的消息会重新传递,其实就是保证消息不会丢失,但是消息有可能重复发送或者重复被消费
    • Exactly once(正好一次):这个是大多数场景需要的语义,其实就是保证消息不会丢失,也不会重复被消费,消息只传递一次
      在 Kafka 中主要通过消息重发和ACK机制来保障消息的传递,消息重发机制主要是提高消息发送的成功率,并不能保证消息一定能发送成功。我们可以通过在创建Producer实例时,设置retries配置项来开启或关闭消息重发机制,代码示例:
    // 设置的值为0表示关闭,大于0则表示重发的次数
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");

    ACK机制

    另一个消息传递保障机制就是ACK机制,Kafka中的ACK机制有三种模式,需要通过配置去指定。这三种配置的含义如下:

    • acks=0:
      • Producer发送消息到发送端的buffer中就直接返回了,至于这个消息有没有真的发送到Broker Server,Producer不关心,即使消息发送失败,上面说的消息重发机制也不起作用,所以在这种场景下,可能就会丢失消息了(这就有点类似于UDP,只管发,不管对方有没有接收到消息)
    • acks=1:
      • Producer发送的消息一定要存储到对应的分区的Leader副本日志文件中才算消息发送成功,要是失败的话,则会尝试retry。在这种模式下,只有当消息已经存储在Leader副本中,但是消息还没有被Follower副本同步的时候,如果Leader副本所在的broker server挂了,消息才会丢失
    • acks=all:
      • Producer发送的消息一定要存储到对应的分区的所有的在ISR列表中的副本日志文件中才算消息发送成功,要是失败的话,则会尝试retry。这种场景下消息就很难丢失了,除非所有的副本所在的Broker Server都挂了
    同样的该配置项可以在创建Producer实例时进行设置,代码示例:
    /**
     * 此配置是 Producer 在确认一个请求发送完成之前需要收到的反馈信息的数量。 这个参数是为了保证发送请求的 "可靠性"
     *
     * acks = all  最严格的一种策略,就是必须收到                     必须一条
     * acks = 0    最不严格的策略,消息可能被收到0条,也可能收到1条     最多一条
     * acks = 1    相对严格, 消息可以被收到1到多条                  至少一条
     */
    prop.setProperty(ProducerConfig.ACKS_CONFIG, "all");

    上面的三种取值可以根据实际的业务场景来进行设置,消息的可靠性越强的,性能肯定就会越差。这三种取值就是在消息的可靠性以及性能两个方面做一个权衡:

    • 性能要求高,但可靠性要求低的,可以选择acks=0
    • 性能和可靠性都希望能够兼顾的,就选择acks=1
    • 若允许牺牲性能来保证高可靠的场景,则选择acks=all

    注:本问参考自:https://www.jianshu.com/p/7819302e459d 

  • 相关阅读:
    SpringMVC中静态获取request对象 Spring中获取 HttpServletRequest对象【转载】
    springcloud 的loadbalancer 轮询算法切换方法 2021.4.3
    springboot项目启动增加图标
    rabbitmq 端口作用以及修改方法
    centos8 安装rabbitmq
    springcloud config client Value获取不到信息的问题的处理方法
    springcloud config配置git作为数据源然后启动报错 If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
    Sublime Text的列模式如何操作
    centos8 安装redis
    jQuery简单的Ajax调用
  • 原文地址:https://www.cnblogs.com/L-Test/p/13443178.html
Copyright © 2011-2022 走看看