zoukankan      html  css  js  c++  java
  • Kafka笔记整理(二):Kafka Java API使用

    下面的测试代码使用的都是下面的topic:

    $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
    Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:
            Topic: hadoop   Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102
            Topic: hadoop   Partition: 1    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103
            Topic: hadoop   Partition: 2    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101

    Kafka Java API之producer

    关于producer API的使用说明,可以查看org.apache.kafka.clients.producer.KafkaProducer这个类的代码注释,有非常详细的说明,下面就直接给出程序代码及测试。

    程序代码

    KafkaProducerOps.java
    package com.uplooking.bigdata.kafka.producer;
    
    import com.uplooking.bigdata.kafka.constants.Constants;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    import java.util.Random;
    
    /**
     * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
     * <p>
     * Producer
     */
    public class KafkaProducerOps {
        public static void main(String[] args) throws IOException {
            /**
             * 专门加载配置文件
             * 配置文件的格式:
             * key=value
             *
             * 在代码中要尽量减少硬编码
             *  不要将代码写死,要可配置化
             */
            Properties properties = new Properties();
            InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
            properties.load(in);
            /**
             * 两个泛型参数
             * 第一个泛型参数:指的就是kafka中一条记录key的类型
             * 第二个泛型参数:指的就是kafka中一条记录value的类型
             */
            String[] girls = new String[]{"姚慧莹", "刘向前", "周  新", "杨柳"};
            Producer<String, String> producer = new KafkaProducer<String, String>(properties);
            String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
            String key = "1";
            String value = "今天的姑娘们很美";
            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<String, String>(topic, key, value);
            producer.send(producerRecord);
            producer.close();
        }
    }
    Constants.java
    package com.uplooking.bigdata.kafka.constants;
    
    public interface Constants {
        /**
         * 生产的key对应的常量
         */
        String KAFKA_PRODUCER_TOPIC = "producer.topic";
    }
    producer.properties
    ############################# Producer Basics #############################
    
    # list of brokers used for bootstrapping knowledge about the rest of the cluster
    # format: host1:port1,host2:port2 ...
    bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
    
    # specify the compression codec for all data generated: none, gzip, snappy, lz4
    compression.type=none
    
    # name of the partitioner class for partitioning events; default partition spreads data randomly
    # partitioner.class=
    
    # the maximum amount of time the client will wait for the response of a request
    #request.timeout.ms=
    
    # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
    #max.block.ms=
    
    # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
    #linger.ms=
    
    # the maximum size of a request in bytes
    #max.request.size=
    
    # the default batch size in bytes when batching multiple records sent to a partition
    #batch.size=
    
    # the total bytes of memory the producer can use to buffer records waiting to be sent to the server
    #buffer.memory=
    
    #####设置自定义的topic
    producer.topic=hadoop
    
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer

    其实这个配置文件就是kafka conf目录下的配置文件,只是这里要做相应的修改,关于每个字段的含义,可以查看

    org.apache.kafka.clients.producer.KafkaProducer

    这个类的代码注释。

    测试

    在终端中启动消费者监听topic的消息:

    [uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181

    然后执行生产者程序,再查看终端输出:

    [uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181 
    今天的姑娘们很美

    Kafka Java API之consumer

    程序代码

    KafkaConsumerOps.java
    package com.uplooking.bigdata.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Properties;
    
    public class KafkaConsumerOps {
        public static void main(String[] args) throws IOException {
            Properties properties = new Properties();
            InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
            properties.load(in);
            Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            Collection<String> topics = Arrays.asList("hadoop");
            // 消费者订阅topic
            consumer.subscribe(topics);
            ConsumerRecords<String, String> consumerRecords = null;
            while (true) {
                // 接下来就要从topic中拉取数据
                consumerRecords = consumer.poll(1000);
                // 遍历每一条记录
                for (ConsumerRecord consumerRecord : consumerRecords) {
                    long offset = consumerRecord.offset();
                    int partition = consumerRecord.partition();
                    Object key = consumerRecord.key();
                    Object value = consumerRecord.value();
                    System.out.format("%d	%d	%s	%s
    ", offset, partition, key, value);
                }
    
            }
        }
    }
    consumer.properties
    # Zookeeper connection string
    # comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
    zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181
    
    bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
    
    # timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    #consumer group id
    group.id=test-consumer-group
    
    #consumer timeout
    #consumer.timeout.ms=5000
    
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

    测试

    先执行消费者的代码,然后再执行生产者的代码,在消费者终端可以看到如下输出:

    2   0   1   今天的姑娘们很美
    (分别是:offset partition key value)

    Kafka Java API之partition

    可以通过自定义partitioner来决定我们的消息应该存到哪个partition上,只需要在我们的代码上实现Partitioner接口即可。

    程序代码

    MyKafkaPartitioner.java
    package com.uplooking.bigdata.kafka.partitioner;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    import java.util.Random;
    
    /**
     * 创建自定义的分区,根据数据的key来进行划分
     * <p>
     * 可以根据key或者value的hashCode
     * 还可以根据自己业务上的定义将数据分散在不同的分区中
     * 需求:
     * 根据用户输入的key的hashCode值和partition个数求模
     */
    public class MyKafkaPartitioner implements Partitioner {
    
        public void configure(Map<String, ?> configs) {
    
        }
    
        /**
         * 根据给定的数据设置相关的分区
         *
         * @param topic      主题名称
         * @param key        key
         * @param keyBytes   序列化之后的key
         * @param value      value
         * @param valueBytes 序列化之后的value
         * @param cluster    当前集群的元数据信息
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            Integer partitionNums = cluster.partitionCountForTopic(topic);
            int targetPartition = -1;
            if (key == null || keyBytes == null) {
                targetPartition = new Random().nextInt(10000) % partitionNums;
            } else {
                int hashCode = key.hashCode();
                targetPartition = hashCode % partitionNums;
                System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
            }
            return targetPartition;
        }
    
        public void close() {
        }
    }
    KafkaProducerOps.java
    package com.uplooking.bigdata.kafka.producer;
    
    import com.uplooking.bigdata.kafka.constants.Constants;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    import java.util.Random;
    
    /**
     * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
     * <p>
     * Producer
     */
    public class KafkaProducerOps {
        public static void main(String[] args) throws IOException {
            /**
             * 专门加载配置文件
             * 配置文件的格式:
             * key=value
             *
             * 在代码中要尽量减少硬编码
             *  不要将代码写死,要可配置化
             */
            Properties properties = new Properties();
            InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
            properties.load(in);
            /**
             * 两个泛型参数
             * 第一个泛型参数:指的就是kafka中一条记录key的类型
             * 第二个泛型参数:指的就是kafka中一条记录value的类型
             */
            String[] girls = new String[]{"姚慧莹", "刘向前", "周  新", "杨柳"};
            Producer<String, String> producer = new KafkaProducer<String, String>(properties);
            Random random = new Random();
            int start = 1;
            for (int i = start; i <= start + 9; i++) {
                String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
                String key = i + "";
                String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~";
                ProducerRecord<String, String> producerRecord =
                        new ProducerRecord<String, String>(topic, key, value);
                producer.send(producerRecord);
            }
            producer.close();
        }
    }

    继续使用前面的消费者的代码,同时需要在producer.properties中指定我们定义的partitioner,如下:

    partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner

    测试

    先执行消费者代码,然后再执行生产者代码,查看终端输出。

    生产者终端输出(主要是自定义partitioner中的输出):

    key: 1, value: 今天的<--刘向前-->很美很美哦~, hashCode: 49, partition: 1
    key: 2, value: 今天的<--杨柳-->很美很美哦~, hashCode: 50, partition: 2
    key: 3, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 51, partition: 0
    key: 4, value: 今天的<--周  新-->很美很美哦~, hashCode: 52, partition: 1
    key: 5, value: 今天的<--刘向前-->很美很美哦~, hashCode: 53, partition: 2
    key: 6, value: 今天的<--周  新-->很美很美哦~, hashCode: 54, partition: 0
    key: 7, value: 今天的<--周  新-->很美很美哦~, hashCode: 55, partition: 1
    key: 8, value: 今天的<--刘向前-->很美很美哦~, hashCode: 56, partition: 2
    key: 9, value: 今天的<--杨柳-->很美很美哦~, hashCode: 57, partition: 0
    key: 10, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1567, partition: 1

    消费者终端输出:

    3   0   3   今天的<--姚慧莹-->很美很美哦~
    4   0   6   今天的<--周  新-->很美很美哦~
    5   0   9   今天的<--杨柳-->很美很美哦~
    0   2   2   今天的<--杨柳-->很美很美哦~
    1   2   5   今天的<--刘向前-->很美很美哦~
    2   2   8   今天的<--刘向前-->很美很美哦~
    1   1   1   今天的<--刘向前-->很美很美哦~
    2   1   4   今天的<--周  新-->很美很美哦~
    3   1   7   今天的<--周  新-->很美很美哦~
    4   1   10  今天的<--姚慧莹-->很美很美哦~
    (分别是:offset partition key value)
  • 相关阅读:
    HTML+CSS知识点总结
    消灭textarea中的神秘空格
    OAuth2.0
    C# task和timer实现定时操作
    C# 多线程task
    EF的使用
    支付宝支付开发
    Basic Auth
    C#中匿名函数、委托delegate和Action、Func、Expression、还有Lambda的关系和区别
    [转]CodeSite使用小结
  • 原文地址:https://www.cnblogs.com/itboys/p/8862114.html
Copyright © 2011-2022 走看看