zoukankan      html  css  js  c++  java
  • Kafka的API实战案例

                  Kafka的API实战案例

                                       作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。 

    一.Producer API

    1>.消息发送流程

      Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。
    
      main线程将消息发送给RecordAccumulator,
    
      Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
    
      相关参数:
        batch.size:
          只有数据积累到batch.size之后,sender才会发送数据。
        linger.ms:
          如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。

    2>.异步发送数据-不带回调函数的API案例

    package com.yinzhengjie.kafka.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class CustomProducer {
    
        public static void main(String[] args){
    
            /**
             *  需要用到的类:
             *      KafkaProducer:
             *          需要创建一个生产者对象,用来发送数据
             *      ProducerConfig:
             *           获取所需的一系列配置参数
             *      ProducerRecord:
             *          每条数据都要封装成一个ProducerRecord对象
             */
    
            //创建Properties对象,用于配置kafka集群的信息
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.ACKS_CONFIG,"all");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG,1);
    
            //创建生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
            //调用生产者send方法发送数据
            for (int i = 1;i<=10000;i++){
                producer.send(new ProducerRecord<String, String>("yinzhengjie-kafka",i + "","message-" + i));
            }
    
            //关闭生产者
            producer.close();
        }
    }
    案例代码
    [root@kafka201.yinzhengjie.com ~]# kafka-console-consumer.sh --bootstrap-server kafka201.yinzhengjie.com:9092 --topic yinzhengjie-kafka
    ......
    message-9402
    message-9412
    message-9447
    message-9453
    message-9462
    message-9475
    message-9477
    message-9486
    message-9493
    message-9528
    message-9545
    message-9548
    message-9613
    message-9616
    message-9622
    message-9644
    message-9646
    message-9655
    message-9662
    message-9689
    message-9700
    message-9727
    message-9746
    message-9780
    message-9783
    message-9784
    message-9791
    message-9806
    message-9812
    message-9829
    message-9854
    message-9864
    message-9898
    message-9901
    message-9951
    message-9994
    [root@kafka201.yinzhengjie.com ~]# kafka-console-consumer.sh --bootstrap-server kafka201.yinzhengjie.com:9092 --topic yinzhengjie-kafka      #运行上面的生产者代码时建议先启动一个消费者可以立即看到效果

    3>.异步发送数据-带回调函数的API

    package com.yinzhengjie.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class ProducerCallback {
        public static void main(String[] args){
    
            /**
             *  需要用到的类:
             *      KafkaProducer:
             *          需要创建一个生产者对象,用来发送数据
             *      ProducerConfig:
             *           获取所需的一系列配置参数
             *      ProducerRecord:
             *          每条数据都要封装成一个ProducerRecord对象
             */
    
            //创建Properties对象,用于配置kafka集群的信息
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.ACKS_CONFIG,"all");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG,1);
    
            //创建生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
            //调用生产者send方法发送数据
            for (int i = 100;i<=200;i++){
                producer.send(new ProducerRecord<String, String>("yinzhengjie-kafka", Integer.toString(i), "Message-callback-" + Integer.toString(i)),(recordMetadata, exception) -> {
                    /**
                     *      回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
                     *      温馨提示:
                     *          消息发送失败会自动重试,不需要我们在回调函数中手动重试。
                     */
                    if (exception == null){
                        System.out.println("message send successful!");
                    }else {
                        exception.printStackTrace();
                    }
                });
            }
    
            //关闭生产者
            producer.close();
        }
    }
    案例代码
    [root@kafka201.yinzhengjie.com ~]# kafka-console-consumer.sh --bootstrap-server kafka201.yinzhengjie.com:9092 --topic yinzhengjie-kafka
    Message-callback-107
    Message-callback-114
    Message-callback-127
    Message-callback-141
    Message-callback-153
    Message-callback-174
    Message-callback-180
    Message-callback-191
    Message-callback-104
    Message-callback-133
    Message-callback-168
    Message-callback-109
    Message-callback-120
    Message-callback-121
    Message-callback-124
    Message-callback-135
    Message-callback-144
    Message-callback-145
    Message-callback-156
    Message-callback-181
    Message-callback-111
    Message-callback-147
    Message-callback-161
    Message-callback-165
    Message-callback-185
    Message-callback-189
    Message-callback-129
    Message-callback-148
    Message-callback-151
    Message-callback-152
    Message-callback-175
    Message-callback-192
    Message-callback-134
    Message-callback-154
    Message-callback-186
    Message-callback-105
    Message-callback-142
    Message-callback-187
    Message-callback-194
    Message-callback-137
    Message-callback-140
    Message-callback-150
    Message-callback-102
    Message-callback-115
    Message-callback-123
    Message-callback-143
    Message-callback-163
    Message-callback-197
    Message-callback-106
    Message-callback-118
    Message-callback-139
    Message-callback-146
    Message-callback-162
    Message-callback-167
    Message-callback-171
    Message-callback-176
    Message-callback-116
    Message-callback-130
    Message-callback-131
    Message-callback-136
    Message-callback-182
    Message-callback-195
    Message-callback-112
    Message-callback-119
    Message-callback-126
    Message-callback-172
    Message-callback-184
    Message-callback-113
    Message-callback-138
    Message-callback-149
    Message-callback-158
    Message-callback-169
    Message-callback-198
    Message-callback-103
    Message-callback-122
    Message-callback-125
    Message-callback-190
    Message-callback-196
    Message-callback-199
    Message-callback-108
    Message-callback-159
    Message-callback-166
    Message-callback-177
    Message-callback-193
    Message-callback-101
    Message-callback-110
    Message-callback-200
    Message-callback-157
    Message-callback-160
    Message-callback-173
    Message-callback-178
    Message-callback-188
    Message-callback-100
    Message-callback-117
    Message-callback-128
    Message-callback-132
    Message-callback-155
    Message-callback-164
    Message-callback-170
    Message-callback-179
    Message-callback-183
    [root@kafka201.yinzhengjie.com ~]# kafka-console-consumer.sh --bootstrap-server kafka201.yinzhengjie.com:9092 --topic yinzhengjie-kafka

     

    4>.Future测试案例

    package com.yinzhengjie.kafka.producer;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class TestFuture {
    
        public static void main(String[] args) throws Exception{
            //创建一个线程池
            ExecutorService executor = Executors.newCachedThreadPool();
    
            //提交一个线程
            Future<?> future = executor.submit(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        System.out.println("i = " + i);
                    }
                }
            });
    
            //调用下面的代码后会阻塞当前线程
            future.get();
    
            System.out.println("=================");
    
            //停止线程池
            executor.shutdown();
        }
    }
    案例代码

    5>.同步发送数据

    package com.yinzhengjie.kafka.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class SyncProducer {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            /**
             *  需要用到的类:
             *      KafkaProducer:
             *          需要创建一个生产者对象,用来发送数据
             *      ProducerConfig:
             *           获取所需的一系列配置参数
             *      ProducerRecord:
             *          每条数据都要封装成一个ProducerRecord对象
             */
    
            //创建Properties对象,用于配置kafka集群的信息
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.ACKS_CONFIG,"all");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG,1000);    //设置发送数据的间隔时间为1秒,单位默认是毫秒
    
            //创建生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
            //调用生产者send方法发送数据
            for (int i = 1;i<=10;i++){
                /**
                 * 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
                 * 由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
                 */
                RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("yinzhengjie-kafka", Integer.toString(i), "message-" + i)).get();
                System.out.println("offset = " + metadata.offset());
            }
    
            //关闭生产者
            producer.close();
        }
    }
    案例代码

    二.Consumer API

      Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
    
      由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
      所以offset的维护是Consumer消费数据是必须考虑的问题。

    1>.手动提交offset

    package com.yinzhengjie.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class CustomConsumer {
        public static void main(String[] args){
            /**
             *  需要用到的类:
             *      KafkaConsumer:
             *          需要创建一个消费者对象,用来消费数据
             *      ConsumerConfig:
             *          获取所需的一系列配置参数
             *      ConsuemrRecord:
             *          每条数据都要封装成一个ConsumerRecord对象
             */
    
            //创建Properties对象,用于配置kafka集群的信息
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"yinzhengjie2020");        //指定消费者组,只要group.id相同,就属于同一个消费者组
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");       //关闭自动提交offset,默认就是自动提交的,即默认值是true.
    
            //创建消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            //订阅topic
            consumer.subscribe(Arrays.asList("yinzhengjie-kafka"));
    
            //调用pull
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Topic = " + record.topic() +",Offset = " + record.offset() + ",Value = " + record.value());
                }
    
                //手动提交offset,若不手动提交(上面我们已经禁用了自动提交offset功能)当Consumer进程结束后,再次启动时你会发现有重复数据出现哟
                /**
                 * 同步提交offset,该方法有重试机制,一直到提交成功为止。
                 */
                consumer.commitSync();
                /**
                 *  异步提交offset,仅提交一次,并没有失败重试的机制,生产环境中建议推荐使用这种方法,效率较高。
                 *
                 *  温馨提示:
                 *      如果本次提交失败没有关系,当消费下一批数据是会再次触发异步提交,只要下一次提交成功了尽管上一次提交失败也没有任何影响;
                 *      但是异步提交一直失败的话,可能会导致数据重复消费的问题哟~
                 *
                 */
                consumer.commitAsync();
            }
    
        }
    }
    案例代码
    [root@kafka201.yinzhengjie.com ~]# kafka-console-producer.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --topic yinzhengjie-kafka
    >hello
    >world
    >https://www.cnblogs.com/yinzhengjie/
    >
    [root@kafka201.yinzhengjie.com ~]# kafka-console-producer.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --topic yinzhengjie-kafka      #启动生产者

    2>.自动提交offset

    package com.yinzhengjie.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class AutoConsumer {
        public static void main(String[] args){
            /**
             *  需要用到的类:
             *      KafkaConsumer:
             *          需要创建一个消费者对象,用来消费数据
             *      ConsumerConfig:
             *          获取所需的一系列配置参数
             *      ConsuemrRecord:
             *          每条数据都要封装成一个ConsumerRecord对象
             *
             *  为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
             *      自动提交offset的相关参数:
             *          enable.auto.commit:
             *              是否开启自动提交offset功能
             *          auto.commit.interval.ms:
             *              自动提交offset的时间间隔
             */
    
            //创建Properties对象,用于配置kafka集群的信息
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"yinzhengjie2020");        //指定消费者组,只要group.id相同,就属于同一个消费者组
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");       //开启自动提交offset,默认就是自动提交的,即默认值是true.
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");    //指定自动提交offset的时间间隔为1秒
    
            //创建消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            //订阅topic
            consumer.subscribe(Arrays.asList("yinzhengjie-kafka"));
    
            //调用pull
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic = %s, offset = %d, key = %s, value = %s%n", record.topic(),record.offset(), record.key(), record.value());
                }
            }
    
        }
    }
    案例代码
    [root@kafka201.yinzhengjie.com ~]# kafka-console-producer.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --topic yinzhengjie-kafka
    >https://www.cnblogs.com/yinzhengjie/
    >https://home.cnblogs.com/u/yinzhengjie2020
    >hello
    >world
    >
    [root@kafka201.yinzhengjie.com ~]# kafka-console-producer.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --topic yinzhengjie-kafka

    3>.自定义存储offset思路

    package com.yinzhengjie.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Properties;
    
    
    
    public class CustomOffsetConsumer {
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092");
            props.put("group.id", "yinzhengjie2020");//消费者组,只要group.id相同,就属于同一个消费者组
            props.put("enable.auto.commit", "false");//自动提交offset
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("yinzhengjie-kafka"), new ConsumerRebalanceListener() {
    
                //提交当前负责的分区的offset
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.println("===== 回收的分区 =====");
                    for (TopicPartition partition : partitions) {
                        System.out.printf("Partition = %s%n",partition);
                    }
    
                }
    
                //定位新分配的分区的offset
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.println("===== 重新分配的分区 =====");
                    for (TopicPartition partition : partitions) {
                        System.out.printf("Partition = %s%n",partition);
                        //下面是伪代码,需要自行实现
    //                    Long offset = getPartitionOffset(partition);
    //                    consumer.seek(partition,offset);
                    }
                }
            });
    
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic = %s, offset = %d,value = %s%n", record.topic(),record.offset(),record.value());
                    //下面是伪代码,需要自行实现
    //                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
    //                commitOffset(topicPartition,record.offset()+1);
                }
            }
        }
    
        //提交offset,根据你的业务场景自行实现功能
        private static void commitOffset(TopicPartition topicPartition, long l) {
    
        }
    
        //获取分区的offset,根据你的业务场景自行实现功能
        private static Long getPartitionOffset(TopicPartition partition) {
            return null;
        }
    
    }
    案例代码

    三.自定义Interceptor

    1>.拦截器原理

      Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
    
      对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

      同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

      Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
        configure(configs):       获取配置信息和初始化数据时调用。     onSend(ProducerRecord):       该方法封装进KafkaProducer.send方法中,即它运行在用户主线程(main)中。
          Producer确保在消息被序列化以及计算分区前调用该方法。
          用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
        onAcknowledgement(RecordMetadata, Exception):       该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用,并且通常都是在producer回调逻辑触发之前。
          onAcknowledgement运行在producer的IO线程(sender)中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
        close:       关闭interceptor,主要用于执行一些资源清理工作。
      温馨提示:
        如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。
        另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递,这在使用过程中要特别留意。

    2>.拦截器案例

    package com.yinzhengjie.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    
    
    public class CounterInterceptor implements ProducerInterceptor<String, String> {
    
        private long successNum = 0L;
        private long errorNum = 0L;
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return record;
        }
    
    
       //统计成功和失败的次数
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                successNum++;
            } else {
                errorNum++;
            }
        }
    
        @Override
        public void close() {
            System.out.println("successNum=" + successNum);
            System.out.println("errorNum=" + errorNum);
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    CounterInterceptor.java
    package com.yinzhengjie.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class TimeInterceptor implements ProducerInterceptor<String, String> {
    
        //给value增加时间戳功能
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + record.value(), record.headers());
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
    
        @Override
        public void close() {
            System.out.println("已为数据添加时间戳功能....");
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    TimeInterceptor.java
    package com.yinzhengjie.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomProducer {
    
        public static void main(String[] args){
    
            /**
             *  需要用到的类:
             *      KafkaProducer:
             *          需要创建一个生产者对象,用来发送数据
             *      ProducerConfig:
             *           获取所需的一系列配置参数
             *      ProducerRecord:
             *          每条数据都要封装成一个ProducerRecord对象
             */
    
            //创建Properties对象,用于配置kafka集群的信息
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.ACKS_CONFIG,"all");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG,1);
            //指定拦截器
            ArrayList<String> intertceptors = new ArrayList<>();
            intertceptors.add("com.yinzhengjie.kafka.interceptor.TimeInterceptor");
            intertceptors.add("com.yinzhengjie.kafka.interceptor.CounterInterceptor");
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,intertceptors);
    
            //创建生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
            //调用生产者send方法发送数据
            for (int i = 3000;i<=6000;i++){
                producer.send(new ProducerRecord<String, String>("yinzhengjie-kafka",Integer.toString(i),"message-" + i));
            }
    
            //注意哈关闭生产者时会调用拦截器的close()方法哟~
            producer.close();
    
            System.out.println("=====  生产者程序已运行完毕 =====");
        }
    }
    CustomProducer.java

  • 相关阅读:
    Druid 使用 Kafka 将数据载入到 Kafka
    Druid 使用 Kafka 数据加载教程——下载和启动 Kafka
    Druid 集群方式部署 —— 启动服务
    Druid 集群方式部署 —— 端口调整
    Druid 集群方式部署 —— 配置调整
    Druid 集群方式部署 —— 配置 Zookeeper 连接
    Druid 集群方式部署 —— 元数据和深度存储
    Druid 集群方式部署 —— 从独立服务器部署上合并到集群的硬件配置
    Druid 集群方式部署 —— 选择硬件
    Druid 独立服务器方式部署文档
  • 原文地址:https://www.cnblogs.com/yinzhengjie2020/p/13057627.html
Copyright © 2011-2022 走看看