zoukankan      html  css  js  c++  java
  • Kafka 消费者 Demo

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.util.Arrays;
    import java.util.Properties;
    /**
     * 主要参数:
     * 1.bootstrap.servers,group.id,key.deserializer,value.deserializer
     * 2.session.timeout.ms coordinator检测失败的时间,设置为比较小的值
     * 3. max.poll.interval.ms consumer处理逻辑最大时间
     * 4. auto.offset.reset  [earliest,lastest,none]
     * 5. enable.auto.commit 是否自动提交位移,设置为false,由用户自行提交位移
     * 6. fetch.max.bytes 指定consumer单次获取数据的最大字节数
     * 7. max.poll.records 单次调用poll的最大返回消息数,默认500
     * 8. heartbeat.interval.ms  越小越好
     * 9. connections.max.idle.ms Kafka定期关闭空闲Socket的时间
     */
    public class KafkaConsumerDemo {
        public static void main(String[] args) {
    
            String topic = "test-topic";
            String groupId = "test-group";
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
            //必须指定有业务意义的名字
            properties.put("group.id",groupId);
            properties.put("enable.auto.commit","true");
            properties.put("auto.commit.interval.ms","1000");
            //从最早的消息开始读取
            properties.put("auto.offset.reset","earliest");
            properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
    
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
            //订阅主题,可以订阅多个主题,还可以使用正则表达式订阅主题
            //注意:多次订阅,会覆盖前面的
            consumer.subscribe(Arrays.asList(topic));
    
            try{
                while(true){
                    //1000是超时设定,如果有定时要求,可设置,否则建议设置个比较大的值
                    //通常consumer拿到足够多的数据,会立即返回,否则会阻塞
                    //poll返回则认为是成功消费了消息,如果发现消费慢需要分析是poll慢还是本身业务逻辑处理慢
                    ConsumerRecords<String,String> records = consumer.poll(1000);
                    for(ConsumerRecord<String,String> record : records){
                        System.out.printf("offset=%d, key=%s,value= %s%n",record.offset(),record.key(),record.value());
                    }
                }
            }finally {
                consumer.close();
            }
        }
    }
  • 相关阅读:
    STM32启动BOOT0 BOOT1设置方法
    端口映射
    端口映射
    静态路由配置
    静态路由配置
    NETGEAR路由器登录不上 重新获取ip
    NETGEAR路由器登录不上 重新获取ip
    GSM AT指令 SIM900A TC35
    GSM AT指令 SIM900A TC35
    TTP223 触摸按键
  • 原文地址:https://www.cnblogs.com/fubinhnust/p/11967913.html
Copyright © 2011-2022 走看看