zoukankan      html  css  js  c++  java
  • Kafka kafka.common.OffsetOutOfRangeException 问题处理

    最近公司的zk的down掉了,  storm job 重启的时候报出 kafka.common.OffsetOutOfRangeException 异常

    网上查询了一些朋友的做法, 自己也看了一下代码, 最终还是解决了  

    原因:

      zk挂掉的这几天, kafka中之前的数据已经被清掉了, 但是zk中保存的offset还是几天之前的, 导致KafkaSpout要获取的offset超过了当前kafka的offset, 就像ArrayIndexOutOfRangeException一样

    解决方案:

     KafkaSpout 配置项中可以选择读取的方式, 共有三种, 如果Topology启动的时候未进行配置, 则默认是从Zk中读取, 所以导致了异常

    -2: 从最老的开始读

    -1: 从最近的开始读

    0: 从Zk中读

      

    相关代码如下, storm.kafka.PartitionManager, 

    public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, GlobalPartitionId id) {
            _partition = id;
            _connections = connections;
            _spoutConfig = spoutConfig;
            _topologyInstanceId = topologyInstanceId;
            _consumer = connections.register(id.host, id.partition);
        _state = state;
            _stormConf = stormConf;

            String jsonTopologyId = null;
            Long jsonOffset = null;
            try {
                Map<Object, Object> json = _state.readJSON(committedPath());
                if(json != null) {
                    jsonTopologyId = (String)((Map<Object,Object>)json.get("topology")).get("id");
                    jsonOffset = (Long)json.get("offset");
                }
            }
            catch(Throwable e) {
                LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath(), e);
            }

            if(!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
                _committedTo = _consumer.getOffsetsBefore(spoutConfig.topic, id.partition, spoutConfig.startOffsetTime, 1)[0];
            LOG.info("Using startOffsetTime to choose last commit offset.");
            } else if(jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
                _committedTo = _consumer.getOffsetsBefore(spoutConfig.topic, id.partition, -1, 1)[0];
            LOG.info("Setting last commit offset to HEAD.");
            } else {
                _committedTo = jsonOffset;
            LOG.info("Read last commit offset from zookeeper: " + _committedTo);
            }

            LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
            _emittedToOffset = _committedTo;

        } 

     重点关注红色代码, spoutConfig.forceFromStart 为true的时候, 才会真正去读取自己设置的offset, 否则将会使用Zk中的offset

      

    那么问题来了, 如何设置呢,  SpoutConfig很贴心的给我们提供了一个方法

    public void forceStartOffsetTime(long millis) {
            startOffsetTime = millis;
            forceFromStart = true;

        } 

    所以我们只需要在我们的Topology中添加如下代码即可

    /* -2=最老 -1=最新, 0=zk offset*/
            if (args != null && args[1] != null && Integer.valueOf(args[1]) != 0) {
                if (Integer.valueOf(args[1]) == -2) {
                    spoutConfig.forceStartOffsetTime(-2); //从kafka最老的记录读取
                } else if (Integer.valueOf(args[1]) == -1) {
                    spoutConfig.forceStartOffsetTime(-1); //从kafka最新的记录读取
                }//其他情况则默认从zk的offset读取

            } 

    发布Topology的时候, 如果需要从最新记录读取, 则像这样  storm jar com.abc.StormTopology stormTopology -1 

    其他链接:  http://blog.csdn.net/baiyangfu_love/article/details/8919699

  • 相关阅读:
    intellij idea 热部署失效,需要手动编译类
    mac brew 安装包下载失败解决
    DataTemplate 之 ContentTemplate 的使用
    dataTemplate 之 ContentTemplate 的使用
    dataTemplate 的使用之listView
    dataTemplate 使用
    wpf中UserControl的几种绑定方式
    WPF教程(四)RelativeSource属性
    堆的概念
    哈夫曼(huffman)树和哈夫曼编码
  • 原文地址:https://www.cnblogs.com/zhwbqd/p/4040000.html
Copyright © 2011-2022 走看看