zoukankan      html  css  js  c++  java
  • Kafka简单使用

    1.引入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>

    2.producer代码

    import com.sakura.bean.User;
    import kafka.utils.Json;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    public class KafkaProducerDemo {
        //创建一个kafka生产者
        private final KafkaProducer<String, String> producer;
        //创建一个topic
        private final String topic;
    
    
        //对kafka进行初始化设置
        public KafkaProducerDemo(String topic) {
            Properties properties = new Properties();
            //broker的地址
            properties.put("bootstrap.servers", "192.168.204.139:9092");
            //clientId
            properties.put("client.id", "producer-demo");
            //序列化配置
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    
            this.producer = new KafkaProducer<String, String>(properties);
            this.topic = topic;
        }
    
    
        public void sendMsg(Object message, boolean syncSend) {
            if (syncSend) {
                producer.send(new ProducerRecord<String, String>(topic, Json.encodeAsString(message)));
            } else {
                producer.send(new ProducerRecord<String, String>(topic, Json.encodeAsString(message)), new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e != null) {
                            System.err.println("unable to write to Kafka in KafkaProducerDemo[" + topic + "]exception:" + e);
                        }
                    }
                });
            }
        }
    
    
        public void close() {
            producer.close();
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            String topic = "firstTopic";
            KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo(topic);
            //send message
            for (int i = 0; i < 6; i++) {
                User user = new User(i, "user" + i, (byte) i);
                kafkaProducerDemo.sendMsg(user, false);
            }
            kafkaProducerDemo.close();
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

    3.consumer代码

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    
    public class KafkaConsumerDemo {
        //创建消费者
        private final KafkaConsumer<String, String> consumer;
    
    
        public KafkaConsumerDemo(String topic) {
            Properties properties = new Properties();
            //zookeeper
            properties.put("zookeeper.connect", "192.168.204.140:2181");
            //broker
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.139:9092");
            //消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group-id");
            //是否自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            //如果没有设置offset或者设置的offset不存在时(例如数据被删除)采取的策略:
            //earliest:使用最早的offset
            //latest:使用最新的offset
            //none:使用前一个offset,如果没有就向consumer抛异常
            //anything else:直接向consumer抛出异常
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            //超时时间设置.单位毫秒
            //当使用了consumerGroup时,consumer会向broker发送心跳检测,如果在设置的超时时间内broker未接收到该心跳检测,
            //则认为consumer不可用,将其从consumerGroup中移除并重新做负载。
            properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            //序列化设置
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
    
            consumer = new KafkaConsumer<String, String>(properties);
            //指定消费的topic
            //订阅主题,消费消息
            consumer.subscribe(Collections.singletonList(topic));
        }
    
    
        //消费消息
        public void receiveMsg(KafkaConsumer<String, String> consumer) {
            while (true) {
                //从指定的topic上拉取消息
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                //循环topic下的每一个partition
                for (TopicPartition partition :
                        records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    String topic = partition.topic();
                    int size = partitionRecords.size();
                    System.out.println("topic:" + topic + ",分区:" + partition.partition() + ",消息总数:" + size);
                    //循环partition上的消息
                    for (ConsumerRecord<String, String> stringStringConsumerRecord : partitionRecords) {
                        System.out.println("value:" + stringStringConsumerRecord.value());
                        long offSet = stringStringConsumerRecord.offset() + 1;
                        //下面这种方式会自动设置topic和offset
    //                    consumer.commitSync();
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offSet)));
                        System.out.println("同步成功,topic:" + topic + ",offSet:" + offSet);
                    }
                }
            }
        }
    
    
        public static void main(String[] args) {
            String topic = "firstTopic";
            KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo(topic);
            kafkaConsumerDemo.receiveMsg(kafkaConsumerDemo.consumer);
        }
    }
  • 相关阅读:
    CF div2 325 C
    CF div2 325 B
    CF div2 325 A
    CF div2 322 C
    CF div2 322 B
    CF div2 322 A
    Sudoku Solver POJ 2676 LightOJ 1397
    逆序数(归并排序 )
    RMQ算法
    Socket编程:listen()函数英文翻译
  • 原文地址:https://www.cnblogs.com/monument/p/12944677.html
Copyright © 2011-2022 走看看