zoukankan      html  css  js  c++  java
  • kafka知识体系-生产者编程实践

    本文主要实际编程讲解kafka生产者相关内容,版本kafka_2.11-0.10.1.0

    安装

    linux集群安装过程请参考http://www.cnblogs.com/aidodoo/p/7151949.html
    window安装过程如下:

    下载zookeeper安装包(zookeeper-3.4.6),解压到D:Programzookeeper,并设置环境变量

    • 添加系统变量ZOOKEEPER_HOME=D:Programzookeeper,并在path后面添加:%ZOOKEEPER_HOME%in

    • zoo_sample.cfg重命名为zoo.cfg,修改内容如下:

        tickTime=4000
        initLimit=10
        syncLimit=5
        dataDir=D:/Program/zookeeper/data
        clientPort=2181
        maxClientCnxns=60
        server.1=localhost:2888:3888
      
    • D:/Program/zookeeper/data目录下新建文件myid,并用文本软件打开,填入数字1

    下载kafka安装包(kafka_2.11-0.10.1.0),解压到D:Programkafka,并设置环境变量

    • 添加系统变量KAFKA_HOME=D:Programkafka,并在path后面添加:%KAFKA_HOME%in

    • 修改D:Programkafkaconfigserver.properties配置文件如下:

        broker.id=0
        advertised.listeners=PLAINTEXT://LAPTOP-2CBRDCI0:9092
        advertised.port=9092
        advertised.host.name=LAPTOP-2CBRDCI0
        log.dirs=D:/Program/kafka/data/kafka-logs
        zookeeper.connect=localhost:2181/kafka
        zookeeper.connection.timeout.ms=60000
      

    启动zookeeper
    双击脚本D:ProgramzookeeperinzkServer.cmd

    启动kafka

    命令行运行
    D:Programkafkainwindowskafka-server-start.bat D:/Program/kafka/config/server.properties
    
    kafka创建topic
    D:Programkafkainwindowskafka-topics.bat --zookeeper LAPTOP-2CBRDCI0:2181/kafka --create --topic TEST1 --replication-factor 1 --partitions 3
    
    D:Programkafkainwindowskafka-topics.bat --zookeeper LAPTOP-2CBRDCI0:2181/kafka --describe --topic TEST 
    

    实践

    依赖

    kafka 0.10.1.0版本中采用KafkaProducer对象用来向kafka broker集群发送消息。
    编写代码前先引入相关依赖包:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
    </dependency>
    

    基本配置和发送流程

    KafkaProducer是线程安全的,即可以跨线程共享单个KafkaProducer实例,我们先看单线程发送消息的示例,以了解kafka发送消息的流程。

    package com.molyeo.kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * Created by zhangkh on 2018/7/11.
     */
    public class SinglekafkaProducerDemo {
        public static void  main(String[] args){
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 100; i++)
                producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
            producer.flush();
            producer.close();
        }
    }
    

    我们先构建一个props实例,用于保存kafka配置。
    acks
    acks配置项表示消息的确认机制。
    acks=0表示生者不会等服务端确认,消息被立即添加到socket buffer中,并认为已经发送。这种情况下由于客户端不知道消息是否真实发送成功,配置项中的重试次数项retries也不会生效(即不会重试),每条消息返回的offset值均为-1
    acks=1表示消息的leader分区收到消息后则被视为消息已发送成功,不会等待副本分区确认。如果leader分区收到消息后,然后所在节点立即宕机,follower分区还来不同步,则消息丢失。
    acks=all或者acks=-1,表示消息的leaderfollower分区均已收到后才被视为消息已成功发送。这是最严格的确认机制,只要至少min.insync.replicas还活着,则消息不会丢失。

    retries
    如果网络原因或者其他异常导致发送请求失败,生产端可以根据参数retries进行重试。

    batch.size
    生产者为每个分区维护未发送消息的缓冲区,缓冲区的大小及batch.size,默认配置为16384,即16KB

    linger.ms
    逗留时间,默认为0,即使缓冲区有其他未使用的空间,也可以立即发送。
    如果我们希望减少服务端的压力,则可以延迟一定时间,待消息量比较大时批量发送。
    简单点说,只要满足batch.sizelinger.ms中的一个条件,生产者发送线程则会发送请求,具体的要分析org.apache.kafka.clients.producer.internals.Sender类。

    buffer.memory
    生产者总的消息缓冲区,超过该大小,阻塞max.block.ms

    生产者其他配置项可参考http://kafka.apache.org/0101/documentation.html#brokerconfigs

    重点说一下KafkaProducer的send方法

    public Future<RecordMetadata> send(ProducerRecord<K, V> record)
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
    

    这两个send方法均是异步发送,一旦将记录存储在待发送的缓冲区中,均立即返回,这允许并行发送许多记录而不会阻塞,以便在每个记录之后等待响应。
    下面的send(ProducerRecord<K, V> record, Callback callback)方法提供了当消息发送成功时的回调,返回的结果RecordMetadata指定记录发送到的分区,分配的偏移量和记录的时间戳。
    如果想阻塞同步发送,可以调用Future的get方法:

     byte[] key = "key".getBytes();
     byte[] value = "value".getBytes();
     ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
     producer.send(record).get();
    

    完全异步发送,则采用Callback参数来提供请求完成用的回调:

     ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
     producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e != null) {
                              e.printStackTrace();
                           } else {
                              System.out.println("The offset of the record we just sent is: " + metadata.offset());
                           }
                       }
                   });
    

    多线程并发发送

    为充分利用kafka的高吞吐量,生产端可以采用多线程并发发送消息,前文已提到过KafkaProducer是线程安全的,即可以跨线程共享单个KafkaProducer实例。
    kafka实际配置类KafkaCommonConfig:

    package com.molyeo.kafka;
    
    /**
     * Created by zhangkh on 2018/7/10.
     */
    public class KafkaCommonConfig {
        public static String BOOTSTRAP_SERVERS="LAPTOP-2CBRDCI0:9092";
        public static String ACKS="all";
        public static int RETRIES=0;
        public static int BATCH_SIZE=16384;
        public static int LINGER_MS=1;
        public static int BUFFER_MEMORY=33554432;
        public static String KEY_SERIALIZER_CLASS="org.apache.kafka.common.serialization.StringSerializer";
        public static String VALUE_SERIALIZER_CLASS= "org.apache.kafka.common.serialization.StringSerializer";
    }
    

    消息发送

    package com.molyeo.kafka;
    
    import org.apache.kafka.clients.producer.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Created by zhangkh on 2018/7/5.
     */
    public class MultiKafkaProducerDemo {
        private static final int PRODUCER_THREAD_NUM = 5;
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_THREAD_NUM);
            Producer<String, String> producer = new KafkaProducer<String, String>(getProducerConfig());
            String topic = "TEST";
            try {
                for (int i = 0; i < 20; i++) {
                    Thread.sleep(20);
                    String key = Integer.toString(i);
                    String value = Long.toString(System.currentTimeMillis());
                    ProducerRecord<String, String> record = new ProducerRecord<>(topic,i%3, key, value);
                    executorService.submit(new CommonProducerThread<>(producer, record));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                //Block for a while
                Thread.sleep(60 * 1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                producer.flush();
                producer.close();
                executorService.shutdown();
            }
        }
    
        public static Properties getProducerConfig() {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommonConfig.BOOTSTRAP_SERVERS);
            props.put(ProducerConfig.ACKS_CONFIG, KafkaCommonConfig.ACKS);
            props.put(ProducerConfig.RETRIES_CONFIG, KafkaCommonConfig.RETRIES);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, KafkaCommonConfig.BATCH_SIZE);
            props.put(ProducerConfig.LINGER_MS_CONFIG, KafkaCommonConfig.LINGER_MS);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, KafkaCommonConfig.BUFFER_MEMORY);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KafkaCommonConfig.KEY_SERIALIZER_CLASS);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaCommonConfig.VALUE_SERIALIZER_CLASS);
            return props;
        }
    }
    
    
    class CommonProducerThread<K, V> implements Runnable {
        Logger logger = LoggerFactory.getLogger(CommonProducerThread.class.getSimpleName());
    
        private final Producer producer;
        private final ProducerRecord<K, V> record;
    
        public CommonProducerThread(Producer producer, ProducerRecord record) {
            this.producer = producer;
            this.record = record;
        }
    
        @Override
        public void run() {
            logger.info("prepare to send msg:thread name={},key={},value={}", Thread.currentThread().getName(), record.key(), record.value());
            producer.send(record, new ProducerAckCallback(System.currentTimeMillis(), record.key(), record.value()));
        }
    }
    
    class ProducerAckCallback<K, V> implements Callback {
        Logger logger = LoggerFactory.getLogger(ProducerAckCallback.class.getSimpleName());
        private final long startTime;
        private final K key;
        private final V value;
    
        public ProducerAckCallback(long startTime, K key, V value) {
            this.startTime = startTime;
            this.key = key;
            this.value = value;
        }
    
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                logger.info("send success:key {},value {}, sent to partition {},offset {} in {} ms", key, value, metadata.partition(), metadata.offset(), elapsedTime);
            } else {
                exception.printStackTrace();
            }
        }
    }
    

    在程序中我们采用5个线程去发送20条消息,并且指定了消息的分区,每条消息的间隔20ms。输出结果如下:

    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=0,value=1531360688703
    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=1,value=1531360688748
    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=2,value=1531360688853
    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=3,value=1531360688878
    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=4,value=1531360688900
    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=5,value=1531360688921
    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=6,value=1531360688942
    18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 3,value 1531360688878, sent to partition 0,offset 59 in 91 ms
    18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 0,value 1531360688703, sent to partition 0,offset 60 in 241 ms
    18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 6,value 1531360688942, sent to partition 0,offset 61 in 17 ms
    18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 1,value 1531360688748, sent to partition 1,offset 68 in 121 ms
    18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 4,value 1531360688900, sent to partition 1,offset 69 in 71 ms
    18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 2,value 1531360688853, sent to partition 2,offset 33 in 95 ms
    18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 5,value 1531360688921, sent to partition 2,offset 34 in 19 ms
    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=7,value=1531360688976
    18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 7,value 1531360688976, sent to partition 1,offset 70 in 5 ms
    18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=8,value=1531360688997
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 8,value 1531360688997, sent to partition 2,offset 35 in 4 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=9,value=1531360689022
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 9,value 1531360689022, sent to partition 0,offset 62 in 3 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=10,value=1531360689043
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 10,value 1531360689043, sent to partition 1,offset 71 in 4 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=11,value=1531360689065
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 11,value 1531360689065, sent to partition 2,offset 36 in 5 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=12,value=1531360689089
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 12,value 1531360689089, sent to partition 0,offset 63 in 5 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=13,value=1531360689118
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 13,value 1531360689118, sent to partition 1,offset 72 in 5 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=14,value=1531360689141
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 14,value 1531360689141, sent to partition 2,offset 37 in 7 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=15,value=1531360689174
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 15,value 1531360689174, sent to partition 0,offset 64 in 5 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=16,value=1531360689198
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 16,value 1531360689198, sent to partition 1,offset 73 in 4 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=17,value=1531360689220
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 17,value 1531360689220, sent to partition 2,offset 38 in 4 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=18,value=1531360689245
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 18,value 1531360689245, sent to partition 0,offset 65 in 4 ms
    18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=19,value=1531360689266
    18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 19,value 1531360689266, sent to partition 1,offset 74 in 6 ms
    

    本文总结了kafka生产者常用配置,并用多线程发送消息。

    本文参考:
    http://kafka.apache.org/0101/documentation.html#brokerconfigs


    关于作者
    爱编程、爱钻研、爱分享、爱生活
    关注分布式、高并发、数据挖掘
    如需捐赠,请扫码

  • 相关阅读:
    用asp自编源码制作动态的音乐播放页面
    VBS 连接数据库 样例
    VBS访问SQL数据库
    人人都应该知道的计算机网络协议(1)
    VBS 访问数据库 别人写的一份公共函数
    WPF DateTimePicker 和 TimeSpanPicker 控件发布
    实现Evernote的OAuth授权
    EvernoteTodo发布
    EvernoteAdage 发布
    关于 极限(Extreme)
  • 原文地址:https://www.cnblogs.com/aidodoo/p/9298056.html
Copyright © 2011-2022 走看看