zoukankan      html  css  js  c++  java
  • Consumer is not subscribed to any topics or assigned any partitions

    版本:

    scala:2.11.8
    spark:2.11
    hbase:1.2.0-cdh5.14.0

    报错信息:

    java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions

    分析原因:

    从指定的主题或者分区获取数据,在poll之前,你没有订阅任何主题或分区是不行的,每一次poll,消费者都会尝试使用最后一次消费的offset作为接下来获取数据的start offset,最后一次消费的offset也可以通过seek(TopicPartition, long)设置或者自动设置
    通过源码可以找到:
    public ConsumerRecords<K, V> poll(long timeout) {
        acquire();
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");
            // 如果没有任何订阅,抛出异常
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
    
            // 一直poll新数据直到超时
            long start = time.milliseconds();
            // 距离超时还剩余多少时间
            long remaining = timeout;
            do {
                // 获取数据,如果自动提交,则进行偏移量自动提交,如果设置offset重置,则进行offset重置
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    // 再返回结果之前,我们可以进行下一轮的fetch请求,避免阻塞等待
                    fetcher.sendFetches();
                    client.pollNoWakeup();
                    // 如果有拦截器进行拦截,没有直接返回
                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
    
                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);
    
            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }
    
    因此,需要订阅当前的topic才能消费,我之前使用的api是:(适用于非新--已经被消费者消费过的)
    val inputDStream1 = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Assign[String, String](
      fromOffsets.keys,kafkaParams,fromOffsets)
    )
    修改:(全新的topic,没有被消费者消费过)
    val inputDStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    

      

  • 相关阅读:
    【源码剖析】HashMap1.7 详解
    友链
    P4747 [CERC2017]Intrinsic Interval
    Educational Codeforces Round 97 简要题解
    CF908D New Year and Arbitrary Arrangement(期望 dp)
    一个方便的自定义注解,管理实体类
    Leetcode 657 机器人能否回到原点
    Leetcode 695 岛屿的最大面积 二维平面DFS
    WebSocket 的简单用例
    俄罗斯方块JAVA
  • 原文地址:https://www.cnblogs.com/niutao/p/10547499.html
Copyright © 2011-2022 走看看