zoukankan      html  css  js  c++  java
  • Kafka客户端编程入门介绍

    1.maven依赖

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>1.0.0</version>
            </dependency>

    2.生产者

    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    
    /**
     * Created by zhangpeiran on 2018/9/30.
     */
    public class MyProducer {
        private static  class DemoProducerCallback implements Callback{
            public void onCompletion(RecordMetadata recordMetadata,Exception e){
                if(e != null)
                    e.printStackTrace();
                else
                    System.out.print("send message success");
            }
        }
    
        public static void main(String[] args){
            Properties kafkaProps = new Properties();
            kafkaProps.put("bootstrap.servers","ip1:9092,ip2:9092");
            kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer producer = new KafkaProducer<String,String>(kafkaProps);
    
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("DemoTopic","DemoKey","DemoValue");
    
            //async
            producer.send(record,new DemoProducerCallback());
            producer.close();
        }
    }

    3.自定义分区

    
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.record.InvalidRecordException;
    import org.apache.kafka.common.utils.Utils;

    import java.util.Map;

    /**
    * Created by zhangpeiran on 2018/10/8.
    */
    public class MyPartitioner implements Partitioner {
    public void configure(Map<String,?> configs){}

    public void close(){}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if((keyBytes == null) || !(key instanceof String))
    throw new InvalidRecordException("Message should has a key");
    int numPartitions = cluster.partitionsForTopic(topic).size();
    return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
    }
    }

    需要在Producer中加入配置:
    kafkaProps.put("partitioner.class","MyPartitioner");
     

     4.消费者

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * Created by zhangpeiran on 2018/10/9.
     */
    public class MyConsumer {
    
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers","ip1:9092,ip2:9092,ip3:9092");
            properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("group.id","DemoConsumerGroup");
    
            //默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
            //当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
            //置为earliest,表示从分区起始位置读取消息
            properties.put("auto.offset.reset","earliest");
    
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            consumer.subscribe(Collections.singletonList("DemoTopic"));
    
            try {
                while (true){
                    ConsumerRecords<String,String> records = consumer.poll(100);
                    for(ConsumerRecord<String ,String> record : records){
                        System.out.print(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                    }
                }
            } finally {
                consumer.close();
            }
        }
    }
  • 相关阅读:
    创建类以及引用一个类
    修改hosts文件
    微信第三方登录接口开发
    Android定位
    Leetcode 102. Binary Tree Level Order Traversal
    Leetcode 725. Split Linked List in Parts
    Leetcode 445. Add Two Numbers II
    Leetcode 328. Odd Even Linked List
    Leetcode 237. Delete Node in a Linked List
    Leetcode 234. Palindrome Linked List
  • 原文地址:https://www.cnblogs.com/darange/p/9756895.html
Copyright © 2011-2022 走看看