zoukankan      html  css  js  c++  java
  • Kafka笔记整理(三):消费形式验证与性能测试

    Kafka消费形式验证

    前面的《Kafka笔记整理(一)》中有提到消费者的消费形式,说明如下:

    1、每个consumer属于一个consumer group,可以指定组id。group.id
    
    2、消费形式:
       组内:组内的消费者消费同一份数据;同时只能有一个consumer消费一个Topic中的1个partition;
       一个consumer可以消费多个partitions中的消息。所以,对于一个topic,同一个group中推荐不能有多于
       partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
       组间:每个消费组消费相同的数据,互不影响。
    
    3、在一个consumer多个线程的情况下,一个线程相当于一个消费者。
       例如:partition为3,一个consumer起了3个线程消费,另一个后来的consumer就无法消费

    下面就来验证Kafka的消费形式,不过需要说明的是,在消费者的程序代码中,可以指定消费者的group.id(我们下面将会在配置文件中指定)。

    而在使用kafka的shell命令时,其实也是可以指定配置文件来指定消费者的group.id的,如果不指定,那么kafka将会随机生成一个group.id(kafka-console-consumer.sh中的kafka.tools.ConsoleConsumer类,如果没有指定group.id,其策略是随机生成)。

    在后面的程序代码中,会使用同一group.id开启4个消费的线程(因为我们创建的topic有3个partition),然后在终端中通过kafka shell来开启另外一个消费者,进而达到验证kafka消费形式的目的。

    另外,在测试中使用的topic如下:

    $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
    Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:
            Topic: hadoop   Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102
            Topic: hadoop   Partition: 1    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103
            Topic: hadoop   Partition: 2    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101

    即partition为3,副本因为也为3.

    程序代码

    KafkaProducerOps.java
    package com.uplooking.bigdata.kafka.producer;
    
    import com.uplooking.bigdata.kafka.constants.Constants;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    import java.util.Random;
    
    /**
     * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
     * <p>
     * Producer
     */
    public class KafkaProducerOps {
        public static void main(String[] args) throws IOException {
            /**
             * 专门加载配置文件
             * 配置文件的格式:
             * key=value
             *
             * 在代码中要尽量减少硬编码
             *  不要将代码写死,要可配置化
             */
            Properties properties = new Properties();
            InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
            properties.load(in);
            /**
             * 两个泛型参数
             * 第一个泛型参数:指的就是kafka中一条记录key的类型
             * 第二个泛型参数:指的就是kafka中一条记录value的类型
             */
            String[] girls = new String[]{"姚慧莹", "刘向前", "周  新", "杨柳"};
            Producer<String, String> producer = new KafkaProducer<String, String>(properties);
            Random random = new Random();
            int start = 1;
            for (int i = start; i <= start + 20; i++) {
                String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
                String key = i + "";
                String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~";
                ProducerRecord<String, String> producerRecord =
                        new ProducerRecord<String, String>(topic, key, value);
                producer.send(producerRecord);
            }
            producer.close();
        }
    }
    KafkaConsumerOps.java
    package com.uplooking.bigdata.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Properties;
    import java.util.concurrent.*;
    
    /**
     * 从kafka topic中消费数据
     */
    public class KafkaConsumerOps {
        public static void main(String[] args) throws IOException {
            //线程池
            ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
            System.out.println("外部开始时间:" + System.currentTimeMillis());
            for (int i =0; i < 4; i++){
                ScheduledFuture<?> schedule = service.schedule(
                        new ConsumerThread(),
                        5L,
                        TimeUnit.SECONDS);
            }
        }
    }
    
    class ConsumerThread implements Runnable {
    
        public void run() {
            System.out.println("线程ID:" + Thread.currentThread().getId() + "线程开始时间:" + System.currentTimeMillis());
            /**
             * 两个泛型参数
             * 第一个泛型参数:指的就是kafka中一条记录key的类型
             * 第二个泛型参数:指的就是kafka中一条记录value的类型
             */
            Properties properties = new Properties();
            try {
                properties.load(KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties"));
            } catch (IOException e) {
                e.printStackTrace();
            }
            Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    
            Collection<String> topics = Arrays.asList("hadoop");
            //消费者订阅topic
            consumer.subscribe(topics);
            ConsumerRecords<String, String> consumerRecords = null;
            while (true) {
                //接下来就要从topic中拉取数据
                consumerRecords = consumer.poll(1000);
                //遍历每一条记录
                for (ConsumerRecord consumerRecord : consumerRecords) {
                    long offset = consumerRecord.offset();
                    Object key = consumerRecord.key();
                    Object value = consumerRecord.value();
                    int partition = consumerRecord.partition();
                    System.out.println("CurrentThreadID: " + Thread.currentThread().getId() + "	offset: " + offset + "	partition: " + partition + "	key: " + key + "	value: " + value);
                }
            }
        }
    }
    MyKafkaPartitioner.java
    package com.uplooking.bigdata.kafka.partitioner;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    import java.util.Random;
    
    /**
     * 创建自定义的分区,根据数据的key来进行划分
     * <p>
     * 可以根据key或者value的hashCode
     * 还可以根据自己业务上的定义将数据分散在不同的分区中
     * 需求:
     * 根据用户输入的key的hashCode值和partition个数求模
     */
    public class MyKafkaPartitioner implements Partitioner {
    
        public void configure(Map<String, ?> configs) {
    
        }
    
        /**
         * 根据给定的数据设置相关的分区
         *
         * @param topic      主题名称
         * @param key        key
         * @param keyBytes   序列化之后的key
         * @param value      value
         * @param valueBytes 序列化之后的value
         * @param cluster    当前集群的元数据信息
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            Integer partitionNums = cluster.partitionCountForTopic(topic);
            int targetPartition = -1;
            if (key == null || keyBytes == null) {
                targetPartition = new Random().nextInt(10000) % partitionNums;
            } else {
                int hashCode = key.hashCode();
                targetPartition = hashCode % partitionNums;
                System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
            }
            return targetPartition;
        }
    
        public void close() {
        }
    }
    Constants.java
    package com.uplooking.bigdata.kafka.constants;
    
    public interface Constants {
        /**
         * 生产的key对应的常量
         */
        String KAFKA_PRODUCER_TOPIC = "producer.topic";
    }
    producer.properties
    ############################# Producer Basics #############################
    
    # list of brokers used for bootstrapping knowledge about the rest of the cluster
    # format: host1:port1,host2:port2 ...
    bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
    
    # specify the compression codec for all data generated: none, gzip, snappy, lz4
    compression.type=none
    
    # name of the partitioner class for partitioning events; default partition spreads data randomly
    partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner
    
    # the maximum amount of time the client will wait for the response of a request
    #request.timeout.ms=
    
    # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
    #max.block.ms=
    
    # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
    #linger.ms=
    
    # the maximum size of a request in bytes
    #max.request.size=
    
    # the default batch size in bytes when batching multiple records sent to a partition
    #batch.size=
    
    # the total bytes of memory the producer can use to buffer records waiting to be sent to the server
    #buffer.memory=
    
    #####设置自定义的topic
    producer.topic=hadoop
    
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    consumer.properties
    # Zookeeper connection string
    # comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
    zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181
    
    bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
    
    # timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    #consumer group id
    group.id=test-consumer-group
    
    #consumer timeout
    #consumer.timeout.ms=5000
    
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    pom.xml

    主要是kafka-clients的依赖:

    <dependencies>
      <!--kafka-->
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.1</version>
      </dependency>
    </dependencies>

    测试

    先在终端启动一个消费者,注意由于没有指定配置文件,所以其group.id是随机生成的:

    $ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

    接下来分别执行消费者的代码和生产者的代码,然后观察各个终端的输出。

    生产者程序的终端输出如下:

    key: 1, value: 今天的<--刘向前-->很美很美哦~, hashCode: 49, partition: 1
    key: 2, value: 今天的<--刘向前-->很美很美哦~, hashCode: 50, partition: 2
    key: 3, value: 今天的<--刘向前-->很美很美哦~, hashCode: 51, partition: 0
    key: 4, value: 今天的<--杨柳-->很美很美哦~, hashCode: 52, partition: 1
    key: 5, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 53, partition: 2
    key: 6, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 54, partition: 0
    key: 7, value: 今天的<--杨柳-->很美很美哦~, hashCode: 55, partition: 1
    key: 8, value: 今天的<--刘向前-->很美很美哦~, hashCode: 56, partition: 2
    key: 9, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 57, partition: 0
    key: 10, value: 今天的<--杨柳-->很美很美哦~, hashCode: 1567, partition: 1
    key: 11, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1568, partition: 2
    key: 12, value: 今天的<--周  新-->很美很美哦~, hashCode: 1569, partition: 0
    key: 13, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1570, partition: 1
    key: 14, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1571, partition: 2
    key: 15, value: 今天的<--刘向前-->很美很美哦~, hashCode: 1572, partition: 0
    key: 16, value: 今天的<--刘向前-->很美很美哦~, hashCode: 1573, partition: 1
    key: 17, value: 今天的<--杨柳-->很美很美哦~, hashCode: 1574, partition: 2
    key: 18, value: 今天的<--刘向前-->很美很美哦~, hashCode: 1575, partition: 0
    key: 19, value: 今天的<--杨柳-->很美很美哦~, hashCode: 1576, partition: 1
    key: 20, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1598, partition: 2
    key: 21, value: 今天的<--杨柳-->很美很美哦~, hashCode: 1599, partition: 0

    消费者程序的终端输出如下:

    外部开始时间:1521991118178
    线程ID:20线程开始时间:1521991123182
    线程ID:21线程开始时间:1521991123182
    线程ID:23线程开始时间:1521991123182
    线程ID:22线程开始时间:1521991123182
    CurrentThreadID: 22 offset: 78  partition: 1    key: 1  value: 今天的<--刘向前-->很美很美哦~
    CurrentThreadID: 22 offset: 79  partition: 1    key: 4  value: 今天的<--杨柳-->很美很美哦~
    CurrentThreadID: 22 offset: 80  partition: 1    key: 7  value: 今天的<--杨柳-->很美很美哦~
    CurrentThreadID: 22 offset: 81  partition: 1    key: 10 value: 今天的<--杨柳-->很美很美哦~
    CurrentThreadID: 22 offset: 82  partition: 1    key: 13 value: 今天的<--姚慧莹-->很美很美哦~
    CurrentThreadID: 23 offset: 81  partition: 0    key: 3  value: 今天的<--刘向前-->很美很美哦~
    CurrentThreadID: 23 offset: 82  partition: 0    key: 6  value: 今天的<--姚慧莹-->很美很美哦~
    CurrentThreadID: 23 offset: 83  partition: 0    key: 9  value: 今天的<--姚慧莹-->很美很美哦~
    CurrentThreadID: 23 offset: 84  partition: 0    key: 12 value: 今天的<--周  新-->很美很美哦~
    CurrentThreadID: 23 offset: 85  partition: 0    key: 15 value: 今天的<--刘向前-->很美很美哦~
    CurrentThreadID: 23 offset: 86  partition: 0    key: 18 value: 今天的<--刘向前-->很美很美哦~
    CurrentThreadID: 22 offset: 83  partition: 1    key: 16 value: 今天的<--刘向前-->很美很美哦~
    CurrentThreadID: 23 offset: 87  partition: 0    key: 21 value: 今天的<--杨柳-->很美很美哦~
    CurrentThreadID: 21 offset: 78  partition: 2    key: 2  value: 今天的<--刘向前-->很美很美哦~
    CurrentThreadID: 22 offset: 84  partition: 1    key: 19 value: 今天的<--杨柳-->很美很美哦~
    CurrentThreadID: 21 offset: 79  partition: 2    key: 5  value: 今天的<--姚慧莹-->很美很美哦~
    CurrentThreadID: 21 offset: 80  partition: 2    key: 8  value: 今天的<--刘向前-->很美很美哦~
    CurrentThreadID: 21 offset: 81  partition: 2    key: 11 value: 今天的<--姚慧莹-->很美很美哦~
    CurrentThreadID: 21 offset: 82  partition: 2    key: 14 value: 今天的<--姚慧莹-->很美很美哦~
    CurrentThreadID: 21 offset: 83  partition: 2    key: 17 value: 今天的<--杨柳-->很美很美哦~
    CurrentThreadID: 21 offset: 84  partition: 2    key: 20 value: 今天的<--姚慧莹-->很美很美哦~

    消费者shell的终端输出如下:

    $ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
    今天的<--刘向前-->很美很美哦~
    今天的<--姚慧莹-->很美很美哦~
    今天的<--刘向前-->很美很美哦~
    今天的<--姚慧莹-->很美很美哦~
    今天的<--姚慧莹-->很美很美哦~
    今天的<--杨柳-->很美很美哦~
    今天的<--姚慧莹-->很美很美哦~
    今天的<--刘向前-->很美很美哦~
    今天的<--姚慧莹-->很美很美哦~
    今天的<--姚慧莹-->很美很美哦~
    今天的<--周  新-->很美很美哦~
    今天的<--刘向前-->很美很美哦~
    今天的<--刘向前-->很美很美哦~
    今天的<--杨柳-->很美很美哦~
    今天的<--刘向前-->很美很美哦~
    今天的<--杨柳-->很美很美哦~
    今天的<--杨柳-->很美很美哦~
    今天的<--杨柳-->很美很美哦~
    今天的<--姚慧莹-->很美很美哦~
    今天的<--刘向前-->很美很美哦~
    今天的<--杨柳-->很美很美哦~

    分析

    因为使用kafka shell的消费者的group.id是随机生成的,所以其肯定可以消费到topic下partition的消息,这是属于组间的消费。

    而由于在消费者的程序代码中,4个线程都是使用同一个group.id的(都是使用consumer.properties这个配置文件),按照理论知识的理解,因为topic hadoop只有3个partition,所以只能有3个线程即3个consumer进行消息的消费,而观察输出,通过线程ID,发现确实只有三个线程消费了topic中的消息,这也验证了kafka组内消息的消费形式。

    Kafka性能测试

    参考文档:https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing

    生产能力测试

    在kafka的安装目录的bin里有性能的评估工具bin/kafka-producer-perf-test.sh,主要输出4项指标,总共发送消息量(以MB为单位),每秒发送消息量(MB/second),发送消息总数,每秒发送消息数(records/second)。

    测试如下:

    [uplooking@uplooking01 ~]$ kafka-producer-perf-test.sh --topic flume-kafka --num-records 1000000 --producer-props bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092 --throughput 10000 --record-size 100
    49972 records sent, 9994.4 records/sec (0.95 MB/sec), 3.1 ms avg latency, 258.0 max latency.
    50200 records sent, 10040.0 records/sec (0.96 MB/sec), 2.4 ms avg latency, 141.0 max latency.
    50020 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.
    50010 records sent, 10000.0 records/sec (0.95 MB/sec), 2.3 ms avg latency, 127.0 max latency.
    50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 24.0 max latency.
    50020 records sent, 10004.0 records/sec (0.95 MB/sec), 2.4 ms avg latency, 186.0 max latency.
    50010 records sent, 10002.0 records/sec (0.95 MB/sec), 15.1 ms avg latency, 466.0 max latency.
    50020 records sent, 10002.0 records/sec (0.95 MB/sec), 11.1 ms avg latency, 405.0 max latency.
    50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.
    50030 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 20.0 max latency.
    50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 30.0 max latency.
    50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.
    49990 records sent, 9998.0 records/sec (0.95 MB/sec), 1.4 ms avg latency, 49.0 max latency.
    50033 records sent, 10006.6 records/sec (0.95 MB/sec), 37.9 ms avg latency, 617.0 max latency.
    50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.5 ms avg latency, 74.0 max latency.
    50007 records sent, 10001.4 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.
    50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.8 ms avg latency, 132.0 max latency.
    50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 15.0 max latency.
    50020 records sent, 10000.0 records/sec (0.95 MB/sec), 1.9 ms avg latency, 121.0 max latency.
    1000000 records sent, 9999.200064 records/sec (0.95 MB/sec), 4.96 ms avg latency, 617.00 ms max latency, 1 ms 50th, 3 ms 95th, 105 ms 99th, 541 ms 99.9th.

    参数说明如下:

    --num-records 1000000   总共生产的消息数量
    --throughput 10000      每秒需要生产的消息数量
    --record-size 100       每条消息的大小,单位为字节

    消费能力测试:

    [uplooking@uplooking01 ~]$ kafka-consumer-perf-test.sh --topic flume-kafka --messages 1000000 --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092 --threads 3 --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
    start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
    2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760

    上面的测试为需要消费一百万条消息,输出的参数说明如下:

    开始时间     结束时间     消费消息总大小   每秒消费大小    消费消息总条数    每秒消费条数
    start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
    2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760
  • 相关阅读:
    怎么看待MYSQL的性能
    java dom4j 读写XML
    cas4.2的安装
    java websocket
    解决openresty http客户端不支持https的问题
    开源许可证GPL、BSD、MIT、Mozilla、Apache和LGPL的区别
    SpringMVC框架的学习(一):初步认识
    Spring框架: 理解为它是一个管理对象的创建,依赖,销毁的容器。
    Springmvc如何进行异常处理
    Springmvc:注解有哪些。
  • 原文地址:https://www.cnblogs.com/itboys/p/8716418.html
Copyright © 2011-2022 走看看