zoukankan      html  css  js  c++  java
  • kafka入门demo

    1.引入jar

    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.0.0</version>
    </dependency>
    

    2.kafka producer

    package com.xq.kafka;
    
    /**
     * @author duanxiaoqiu
     * @Date 2019-07-04 09:55
     **/
    
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    
    public class Producer
    {
        private static final String TOPIC="education-info";
        private static final String BROKER_LIST="localhost:9092";
        private static KafkaProducer<String,String> producer = null;
    
        static{
            Properties configs = initConfig();
            producer = new KafkaProducer<String, String>(configs);
        }
    
        private static Properties initConfig(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
            properties.put(ProducerConfig.ACKS_CONFIG,"all");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            return properties;
        }
    
        public static void main(String[] args){
            try{
                String message = "hello world";
                ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message);
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if(null==exception){
                            System.out.println("perfect!");
                        }
                        if(null!=metadata){
                            System.out.print("offset:"+metadata.offset()+";partition:"+metadata.partition());
                        }
                    }
                }).get();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                producer.close();
            }
        }
    }

    3.consumer

    package com.xq.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * @author duanxiaoqiu
     * @Date 2019-07-04 09:56
     **/
    
    public class Consumer {
    
        private static final String TOPIC="education-info";
        private static final String BROKER_LIST="localhost:9092";
        private static KafkaConsumer<String,String> kafkaConsumer = null;
    
        static {
            Properties properties = initConfig();
            kafkaConsumer = new KafkaConsumer<String, String>(properties);
            kafkaConsumer.subscribe(Arrays.asList(TOPIC));
        }
    
        private static Properties initConfig(){
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
            properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"test");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            return properties;
        }
    
        public static void main(String[] args){
            try{
                while(true){
                    ConsumerRecords<String,String> records = kafkaConsumer.poll(100);
                    for(ConsumerRecord record:records){
                        try{
                            System.out.println(record.value());
                        }catch(Exception e){
                            e.printStackTrace();
                        }
                    }
                }
    
            }catch(Exception e){
                e.printStackTrace();
            }finally {
                kafkaConsumer.close();
            }
        }
    
    }
  • 相关阅读:
    web移动开发最佳实践之js篇
    ubuntu升级到12.10
    C语言生成随机数
    终于签约了
    这个2012不寻常
    awk练习(实战)
    数据恢复的教训
    职业发展的一些随想
    diy谷蜂Y5刷机包基于官方0207稳定版
    web移动开发最佳实践之html篇
  • 原文地址:https://www.cnblogs.com/zgzf/p/11130918.html
Copyright © 2011-2022 走看看