zoukankan      html  css  js  c++  java
  • Kafka消息重新发送

    Kafka消息重新发送

     

    1、  使用kafka消息队列做消息的发布、订阅,如果consumer端消费出问题,导致数据并没有消费,此时不需要担心,数据并不会立刻丢失,kafka会把数据在服务器的磁盘上默认存储7天,或者自己指定有两种方式:1)指定时间,log.retention.hours=1682)指定大小,log.segment.bytes=1073741824。此时就可以通过重置某个topicoffset来是消息重新发送,进行消费

     

    2、        查看topic的offset的范围

     

    1)使用下面的命令可以查看topicuserlogbrokerspark:9092offset的最小值:

    #./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -2

     

    2)offset的最大值:

    #./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -1

     

     

    3、        设置consumer group的offset

    1)启动zookeeper,如果使用的是kafka内置的zookeeper,直接启动bin目录下的zookeeper-shell.sh,进行登录:

    #./zookeeper-shell.sh  localhost:2181

    通过如下命令,来重置offset值,比如topic:userlog,group:userlogs,partition:0 ,offset:2181

     

    set /consumers/userlogs/offsets/userlog/0 1288

     

    如果有多个partition都执行上输的命令,并将0换为相对应的分区号就可以了。

    ###如果创建分区的时候设置了zk的根目录,如localhost:2181/kafka

    则重置的命令为:

    set /kafka/consumers/userlogs/offsets/userlog/0 1288

     

    2)如果使用单独安装的zookeeper,直接使用bin目录下的ZKCli.sh登录后,执行上述命令即可。

     

    4、 设置完成后需要重启相关的服务,就可以从设置offset的地方开始消费。

    重启服务:consumer服务。

  • 相关阅读:
    springBoot jpa 分页
    springBoot jpa 表单关联查询
    springBoot 登录拦截器
    SpringBoot 封装返回类以及session 添加获取
    SpringBoot 数据库操作 增删改查
    IDEA SpringBoot +thymeleaf配置
    IDEA Spring Boot 项目创建
    php判断手机段登录,以及phpcms手机PC双模板调用
    简单爬虫,查博客浏览量
    [51nod1357]密码锁 暨 GDOI2018d1t2
  • 原文地址:https://www.cnblogs.com/cuishuai/p/7788039.html
Copyright © 2011-2022 走看看