zoukankan      html  css  js  c++  java
  • Kafka笔记整理(二):Kafka Java API使用

    下面的测试代码使用的都是下面的topic:

    $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
    Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:
            Topic: hadoop   Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102
            Topic: hadoop   Partition: 1    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103
            Topic: hadoop   Partition: 2    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101

    Kafka Java API之producer

    关于producer API的使用说明,可以查看org.apache.kafka.clients.producer.KafkaProducer这个类的代码注释,有非常详细的说明,下面就直接给出程序代码及测试。

    程序代码

    KafkaProducerOps.java
    package com.uplooking.bigdata.kafka.producer;
    
    import com.uplooking.bigdata.kafka.constants.Constants;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    import java.util.Random;
    
    /**
     * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
     * <p>
     * Producer
     */
    public class KafkaProducerOps {
        public static void main(String[] args) throws IOException {
            /**
             * 专门加载配置文件
             * 配置文件的格式:
             * key=value
             *
             * 在代码中要尽量减少硬编码
             *  不要将代码写死,要可配置化
             */
            Properties properties = new Properties();
            InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
            properties.load(in);
            /**
             * 两个泛型参数
             * 第一个泛型参数:指的就是kafka中一条记录key的类型
             * 第二个泛型参数:指的就是kafka中一条记录value的类型
             */
            String[] girls = new String[]{"姚慧莹", "刘向前", "周  新", "杨柳"};
            Producer<String, String> producer = new KafkaProducer<String, String>(properties);
            String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
            String key = "1";
            String value = "今天的姑娘们很美";
            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<String, String>(topic, key, value);
            producer.send(producerRecord);
            producer.close();
        }
    }
    Constants.java
    package com.uplooking.bigdata.kafka.constants;
    
    public interface Constants {
        /**
         * 生产的key对应的常量
         */
        String KAFKA_PRODUCER_TOPIC = "producer.topic";
    }
    producer.properties
    ############################# Producer Basics #############################
    
    # list of brokers used for bootstrapping knowledge about the rest of the cluster
    # format: host1:port1,host2:port2 ...
    bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
    
    # specify the compression codec for all data generated: none, gzip, snappy, lz4
    compression.type=none
    
    # name of the partitioner class for partitioning events; default partition spreads data randomly
    # partitioner.class=
    
    # the maximum amount of time the client will wait for the response of a request
    #request.timeout.ms=
    
    # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
    #max.block.ms=
    
    # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
    #linger.ms=
    
    # the maximum size of a request in bytes
    #max.request.size=
    
    # the default batch size in bytes when batching multiple records sent to a partition
    #batch.size=
    
    # the total bytes of memory the producer can use to buffer records waiting to be sent to the server
    #buffer.memory=
    
    #####设置自定义的topic
    producer.topic=hadoop
    
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer

    其实这个配置文件就是kafka conf目录下的配置文件,只是这里要做相应的修改,关于每个字段的含义,可以查看

    org.apache.kafka.clients.producer.KafkaProducer

    这个类的代码注释。

    测试

    在终端中启动消费者监听topic的消息:

    [uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181

    然后执行生产者程序,再查看终端输出:

    [uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181 
    今天的姑娘们很美

    Kafka Java API之consumer

    程序代码

    KafkaConsumerOps.java
    package com.uplooking.bigdata.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Properties;
    
    public class KafkaConsumerOps {
        public static void main(String[] args) throws IOException {
            Properties properties = new Properties();
            InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
            properties.load(in);
            Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            Collection<String> topics = Arrays.asList("hadoop");
            // 消费者订阅topic
            consumer.subscribe(topics);
            ConsumerRecords<String, String> consumerRecords = null;
            while (true) {
                // 接下来就要从topic中拉取数据
                consumerRecords = consumer.poll(1000);
                // 遍历每一条记录
                for (ConsumerRecord consumerRecord : consumerRecords) {
                    long offset = consumerRecord.offset();
                    int partition = consumerRecord.partition();
                    Object key = consumerRecord.key();
                    Object value = consumerRecord.value();
                    System.out.format("%d	%d	%s	%s
    ", offset, partition, key, value);
                }
    
            }
        }
    }
    consumer.properties
    # Zookeeper connection string
    # comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
    zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181
    
    bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
    
    # timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    #consumer group id
    group.id=test-consumer-group
    
    #consumer timeout
    #consumer.timeout.ms=5000
    
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

    测试

    先执行消费者的代码,然后再执行生产者的代码,在消费者终端可以看到如下输出:

    2   0   1   今天的姑娘们很美
    (分别是:offset partition key value)

    Kafka Java API之partition

    可以通过自定义partitioner来决定我们的消息应该存到哪个partition上,只需要在我们的代码上实现Partitioner接口即可。

    程序代码

    MyKafkaPartitioner.java
    package com.uplooking.bigdata.kafka.partitioner;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    import java.util.Random;
    
    /**
     * 创建自定义的分区,根据数据的key来进行划分
     * <p>
     * 可以根据key或者value的hashCode
     * 还可以根据自己业务上的定义将数据分散在不同的分区中
     * 需求:
     * 根据用户输入的key的hashCode值和partition个数求模
     */
    public class MyKafkaPartitioner implements Partitioner {
    
        public void configure(Map<String, ?> configs) {
    
        }
    
        /**
         * 根据给定的数据设置相关的分区
         *
         * @param topic      主题名称
         * @param key        key
         * @param keyBytes   序列化之后的key
         * @param value      value
         * @param valueBytes 序列化之后的value
         * @param cluster    当前集群的元数据信息
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            Integer partitionNums = cluster.partitionCountForTopic(topic);
            int targetPartition = -1;
            if (key == null || keyBytes == null) {
                targetPartition = new Random().nextInt(10000) % partitionNums;
            } else {
                int hashCode = key.hashCode();
                targetPartition = hashCode % partitionNums;
                System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
            }
            return targetPartition;
        }
    
        public void close() {
        }
    }
    KafkaProducerOps.java
    package com.uplooking.bigdata.kafka.producer;
    
    import com.uplooking.bigdata.kafka.constants.Constants;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    import java.util.Random;
    
    /**
     * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
     * <p>
     * Producer
     */
    public class KafkaProducerOps {
        public static void main(String[] args) throws IOException {
            /**
             * 专门加载配置文件
             * 配置文件的格式:
             * key=value
             *
             * 在代码中要尽量减少硬编码
             *  不要将代码写死,要可配置化
             */
            Properties properties = new Properties();
            InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
            properties.load(in);
            /**
             * 两个泛型参数
             * 第一个泛型参数:指的就是kafka中一条记录key的类型
             * 第二个泛型参数:指的就是kafka中一条记录value的类型
             */
            String[] girls = new String[]{"姚慧莹", "刘向前", "周  新", "杨柳"};
            Producer<String, String> producer = new KafkaProducer<String, String>(properties);
            Random random = new Random();
            int start = 1;
            for (int i = start; i <= start + 9; i++) {
                String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
                String key = i + "";
                String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~";
                ProducerRecord<String, String> producerRecord =
                        new ProducerRecord<String, String>(topic, key, value);
                producer.send(producerRecord);
            }
            producer.close();
        }
    }

    继续使用前面的消费者的代码,同时需要在producer.properties中指定我们定义的partitioner,如下:

    partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner

    测试

    先执行消费者代码,然后再执行生产者代码,查看终端输出。

    生产者终端输出(主要是自定义partitioner中的输出):

    key: 1, value: 今天的<--刘向前-->很美很美哦~, hashCode: 49, partition: 1
    key: 2, value: 今天的<--杨柳-->很美很美哦~, hashCode: 50, partition: 2
    key: 3, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 51, partition: 0
    key: 4, value: 今天的<--周  新-->很美很美哦~, hashCode: 52, partition: 1
    key: 5, value: 今天的<--刘向前-->很美很美哦~, hashCode: 53, partition: 2
    key: 6, value: 今天的<--周  新-->很美很美哦~, hashCode: 54, partition: 0
    key: 7, value: 今天的<--周  新-->很美很美哦~, hashCode: 55, partition: 1
    key: 8, value: 今天的<--刘向前-->很美很美哦~, hashCode: 56, partition: 2
    key: 9, value: 今天的<--杨柳-->很美很美哦~, hashCode: 57, partition: 0
    key: 10, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1567, partition: 1

    消费者终端输出:

    3   0   3   今天的<--姚慧莹-->很美很美哦~
    4   0   6   今天的<--周  新-->很美很美哦~
    5   0   9   今天的<--杨柳-->很美很美哦~
    0   2   2   今天的<--杨柳-->很美很美哦~
    1   2   5   今天的<--刘向前-->很美很美哦~
    2   2   8   今天的<--刘向前-->很美很美哦~
    1   1   1   今天的<--刘向前-->很美很美哦~
    2   1   4   今天的<--周  新-->很美很美哦~
    3   1   7   今天的<--周  新-->很美很美哦~
    4   1   10  今天的<--姚慧莹-->很美很美哦~
    (分别是:offset partition key value)
  • 相关阅读:
    DB2 for Z/os Statement prepare
    Foreign key (referential) constraints on DB2 LUW v105
    复制Informational constraints on LUW DB2 v105
    DB2 SQL Mixed data in character strings
    DB2 create partitioned table
    MVC中使用EF的技巧集(一)
    Asp.Net MVC 开发技巧(二)
    Linq使用技巧及查询示例(一)
    Asp.Net MVC 开发技巧(一)
    Asp.Net MVC Identity 2.2.1 使用技巧(八)
  • 原文地址:https://www.cnblogs.com/itboys/p/8862114.html
Copyright © 2011-2022 走看看