zoukankan      html  css  js  c++  java
  • Kafka单线程Consumer及参数详解

    请使用0.9以后的版本:

    示例代码

     Properties props = new Properties();
    	    props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
    	    props.put("group.id", "test");
    	    props.put("enable.auto.commit", "true");
    	    props.put("auto.commit.interval.ms", "1000");
    	    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    	    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    	    
    	    props.put("auto.offset.reset","earliest");
    	    
    	    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    	    consumer.subscribe(Arrays.asList("foo", "bar"));
    	  try{  
            while (true) {
    	        ConsumerRecords<String, String> records = consumer.poll(1000);
    	        for (ConsumerRecord<String, String> record : records) {
    	        	System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    	        }
    	     }
            }finally{
              consumer.close();
            }
    

    1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必须指定);

    2、用这些Properties构建consumer对象(KafkaConsumer还有其他构造,可以把序列化传进去);

    3、subscribe订阅topic列表(可以用正则订阅Pattern.compile("kafka.*")

    使用正则必须指定一个listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重写这个接口来实现 分区变更时的逻辑。如果设置了enable.auto.commit = true 就不用理会这个逻辑。

    4、然后循环poll消息(这里的1000是超时设定,如果没有很多数据,也就等一秒);

    5、处理消息(打印了offset key value 这里写处理逻辑)。

    6、关闭KafkaConsumer(可以传一个timeout值 等待秒数 默认是30)。

    参数详解

    bootstrap.server(最好用主机名不用ip kafka内部用的主机名 除非自己配置了ip)

    deserializer 反序列化consumer从broker端获取的是字节数组,还原回对象类型。

    默认有十几种:StringDeserializer LongDeserializer DoubleDeserializer。。

    也可以自定义:定义serializer格式 创建自定义deserializer类实现Deserializer 接口 重写逻辑

    除了四个必传的 bootstrap.server group.id key.deserializer value.deserializer

    还有session.timeout.ms "coordinator检测失败的时间"

    是检测consumer挂掉的时间 为了可以及时的rebalance 默认是10秒 可以设置更小的值避免消息延迟。

    max.poll.interval.ms "consumer处理逻辑最大时间"

    处理逻辑比较复杂的时候 可以设置这个值 避免造成不必要的 rebalance ,因为两次poll时间超过了这个参数,kafka认为这个consumer已经跟不上了,会踢出组,而且不能提交offset,就会重复消费。默认是5分钟。

    auto.offset.reset "无位移或者位移越界时kafka的应对策略"

    所以如果启动了一个group从头消费 成功提交位移后 重启后还是接着消费 这个参数无效

    所以3个值的解释是:

    earliset 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最早的位移消费

    latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

    (注意kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中) 、

    我们这是说的是新版本:kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面))

    enable.auto.commit 是否自动提交位移

    true 自动提交 false需要用户手动提交 有只处理一次需要的 最近设置为false自己控制。

    fetch.max.bytes consumer单次获取最大字节数

    max.poll.records 单次poll返回的最大消息数

    默认500条 如果消费很轻量 可以适当提高这个值 增加消费速度。

    hearbeat.interval.ms consumer其他组员感知rabalance的时间

    该值必须小于 session.timeout.ms 如果检测到 consumer挂掉 也就根本无法感知rabalance了

    connections.max.idle.ms 定期关闭连接的时间

    默认是9分钟 可以设置为-1 永不关闭

    更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算

  • 相关阅读:
    过滤评论中的表情
    谈谈你对多进程,多线程,以及协程的理解
    什么是多线程竞争?
    解释以下什么是锁,有哪几种锁?
    .什么是死锁
    多线程交互访问数据,如果访问到了就不访问了?
    什么是线程安全,什么是互斥锁
    说说下面几个概念:同步,异步,阻塞,非阻塞?
    什么是僵尸进程和孤儿进程?怎么避免僵尸进程?
    python中进程与线程的使用场景
  • 原文地址:https://www.cnblogs.com/tree1123/p/11362252.html
Copyright © 2011-2022 走看看