zoukankan      html  css  js  c++  java
  • Kafka 生产者 工作流程和Demo示例

    生产者运转流程:

    生产者Demo演示

    maven构建项目需要引入kafka客户端依赖:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
    </dependency>

    生产者代码

    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.errors.RetriableException;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /**
     *
     * 可重试异常
     * 1. 分区副本不可用
     * 2. Controller当前不可用
     * 3. 网络瞬时故障
     *
     * 可自行恢复,超过重试次数也需要自行处理
     *
     *
     * 不可重试异常
     * 1. 发送消息尺寸过大
     * 2. 序列化失败异常
     * 3. 其他类型异常
     *
     *
     */
    
    public class KafkaProducerDemo {
    
        public static void main(String[] args) {
            Properties properties = new Properties();
    
            //最好使用主机名,Kafka内部使用FQDN,如果集群数量过多,指定部分机器即可。
            properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
            //发送到broker的消息必须是字节数据,因此必须做序列化,key.serializer必须配置,无默认值
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //控制消息有几个副本写入成功才算发送成功,有0,all(-1),1(默认)
            //0 不关心消息是否发送成功
            //-1 ISR至少有一个副本处于存活
            // 1 broker消息写入本地日志,不需等待副本同步成功(折中方案)
            properties.put("acks", "-1");
            System.out.println(ProducerConfig.ACKS_CONFIG);
            //重试次数,可能导致消息重复
            properties.put("retries", "3");
            //调优吞吐量和延时 默认16KB
            properties.put("batch.size", 1048576);
            //batch没有被填满,等待多久发送
            properties.put("linger.ms", 10);
            //32MB 缓存消息的缓冲区大小
            properties.put("buffer.memory", "33554432");
            System.out.println(ProducerConfig.BUFFER_MEMORY_CONFIG);
    
            //压缩算法
            properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
            properties.put("max.block.ms", "3000");
    
            Producer<String, String> producer = new KafkaProducer<String, String>(properties);
            for (int i = 0; i < 10; i++) {
                // 最好不要自行指定时间戳
                // 最好使用有回调函数的,用于确认发送消息是否成功
                // 底层是异步化发送
                producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)), new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("消息发送成功");
                        } else {
                            System.out.println("消息发送失败");
                            if(exception instanceof RetriableException){
                                System.out.println("处理可重试异常");
                            }else {
                                System.out.println("处理不可重试异常");
                            }
                        }
                    }
                });
    
                System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>.");
    
    
                //同步发送
                ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("test", Integer.toString(i));
    
                try {
                    RecordMetadata recordMetadata = producer.send(producerRecord).get();
                    System.out.println("同步发送的返回:" + recordMetadata.toString() + "," 
                    + recordMetadata.topic() + ", " + recordMetadata.partition()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //谨慎使用带超时参数的close方法 producer.close(); } }
  • 相关阅读:
    HTTP协议详解
    【VC++开发实战】迅雷晒密及批量查询流量程序
    C/C++中指针和引用之相关问题研究
    C++类中拷贝构造函数详解
    构造函数为什么不能是虚函数
    High一下!
    文件搜索神器everything 你不知道的技巧总结
    不要被C++“自动生成”所蒙骗
    对象的传值与返回
    深入浅出Node.js (3)
  • 原文地址:https://www.cnblogs.com/fubinhnust/p/11967861.html
Copyright © 2011-2022 走看看