zoukankan      html  css  js  c++  java
  • kafka消费者基本操作

    1.消费消息

    消费者以pull的方式获取消息,

    每个消费者属于某一个消费组,在创建时不指定消费者的groupId,则该消费者属于默认消费组test-consumer-group ,在配置文件./consumer.properties中设置

    同一消费组下的各个消费者在消费消息是是互斥的,也即是说,同一条消息,只能被同一个消费组下的某个消费者消费,但能被其它组的消费者消费

    kafka-console-consumer.sh脚本模拟终端消费者消费消息

    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-action --consumer-property group.id=old-consumer-test --consumer-property consumer.id=old-consumer-cl --from-beginning --delete-consumer-offsets
    

    consumer-property 参数以键值对的形式指定消费者级别的配置。

    from-beginning 设置消息起始位置开始消费。默认是从新位置 latest开始消费

    delete-consumer-offsets 删除在zookeeper中记录已消费的偏移量

     

    旧版消费者默认将消费偏移量保存到zookeeper中,可以通过offsets.storage=kafka设置

    offsets.storage=kaka,则保存到kafka主题中,

    offsets.storage=zookeeper, 则保存到zookeeper中

    ids 记录该消费组下正在运行的消费者列表

    owners  记录该消费组消费的主题列表

    offsets  记录该消费组下每个消费者所消费主题的各个分区的偏移量

    2.新版本消费者

    新版本消费者去掉了对zookeeper的依赖,当启动一个消费者时不再向zookeeper注册,而是

    由消费组协调器统一管理,已消费的消息偏移量提交后会保存到名为“__consumer_offsets”

    内部主题中

    启动一个新的消费者

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --consumer-property group.id=new-consumer-test --consumer-property client.id=new-consumer-cl --topic kafka-action
    

     老版本启动用的参数是zookeeper ,新版本的参数是bootstrap-server

    通过命令查看消费组名称信息

    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer
    
  • 相关阅读:
    一行JS代码实现的滑动门
    一款JS+CSS打造绝对经典的资讯网站滑动门特效
    【荐】JS封装的一个无限级的下拉树形菜单
    JS+CSS实现漂亮实用的红色导航菜单
    JS+CSS仿网易的选项卡TAB标签样式
    JS+CSS实现的不规则TAB选项卡效果
    jQuery仿FLASH响应鼠标滚动的动感菜单
    纯CSS仿迅雷首页的菜单导航代码
    JS+CSS仿雅虎首页网站快捷入口的切换效果
    【荐】Jquery+CSS打造的泡沫弹出框式的侧边蓝色导航菜单
  • 原文地址:https://www.cnblogs.com/sunshine-long/p/9857107.html
Copyright © 2011-2022 走看看