zoukankan      html  css  js  c++  java
  • 【kafka】生产者API 回调 同步

    普通实现

    
    public class MyProducer {
        public static void main(String[] args) {
            /**
             * 创建Kafka生产者配置信息:ProducerConfig类中记录了Kafka需要的所有参数信息
             * 1.指定连接的Kafka集群
             * 2.ack应答级别
             * 3.发送失败的重试次数
             * 4.批次大小(一次发送多少大小数据)
             * 5.等待时间
             * 6.RecordAccumulator缓冲区大小
             * 7.指定key,value序列化类
             */
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("acks", "all");
            properties.put("retries", 1);
            properties.put("batch.size", 16384);
            properties.put("liner.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            /**
             *  通过配置文件创建生产者对象
             */
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            for (int i = 0; i < 10; i++) {
            // 创建记录ProducerRecord("Topic","partition","key","value")
                ProducerRecord<String, String> message =
                        new ProducerRecord<String, String>("test", 0,"MyProducer","hello" + i);
                // send:异步方法,发送之后,立即返回,并不是说调用了,就真的发送成功了;
                kafkaProducer.send(message);
            }
            // 关闭连接:会清空内存
            kafkaProducer.close();
        }
    }
    
    

    同步实现

    public class MyProducerFuture {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("acks", "all");
            properties.put("retries", 1);
            properties.put("batch.size", 16384);
            properties.put("liner.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "hello" + i);
                /**
                 * 同步发送,send返回 Future对象
                 * 调用get()
                 * 返回RecordMetadata元数据记录,记录了发送的topic,partition,offset
                 */
                RecordMetadata metadata = kafkaProducer.send(message).get();
                String topic = metadata.topic();
                int partition = metadata.partition();
                long offset = metadata.offset();
                System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
            }
            // 关闭连接:会清空内存
            kafkaProducer.close();
        }
    }
    
    
    

    回调实现

    public class MyProducerCallback {
        public static void main(String[] args) {
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("acks", "all");
            properties.put("retries", 1);
            properties.put("batch.size", 16384);
            properties.put("liner.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "messuesein--" + i);
                /**
                 * 发送消息:带回调
                 * 传入CallBack函数接口,参数:
                 * 1. RecordMetadata:成功返回元数据记录
                 * 2. Exception:失败返回异常
                 */
                kafkaProducer.send(message, (metadata, exception) -> {
                    // exception==null,即成功
                    if (exception == null) {
                        /**
                         * metadata记录元数据信息
                         */
                        String topic = metadata.topic();
                        int partition = metadata.partition();
                        long offset = metadata.offset();
                        System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
                    } else {
                        exception.printStackTrace();
                    }
                });
            }
            // 关闭连接:会清空内存
            kafkaProducer.close();
        }
    }
    
    
  • 相关阅读:
    NTC温度检测程序(转)
    基于GPS的经纬度、方位角、距离、航向计算及基于单目视觉的距离计算!
    GPS速度和航向计算(转)
    NOR、 NAND、Raw Flash和 Managed Flash的区别(转)
    FreeModbus V1.6 主机使用说明(转)
    只要单片机具有真正唯一ID,就可以让加密坚不可摧(转)
    Java 给Word添加数字签名
    Java 获取Word中指定图片的坐标位置
    C#/VB.NET 自定义PPT动画路径
    在线编辑Excel——插入图表
  • 原文地址:https://www.cnblogs.com/mussessein/p/12187626.html
Copyright © 2011-2022 走看看