zoukankan      html  css  js  c++  java
  • Kafka-python 客户端导致的 cpu 使用过高,且无法消费消息的问题

    今天遇到一个情况使用了 Kafka-python 1.3.3 来操作读取 broker 1.0.1 版本的 kafka。出现了 rebalance 之后分配到了客户端,但是 cpu 利用率很高且无法消费的情况。

    先是排查了连接方面和代码方面的问题,后来发现都没有问题就把注意力转移到了 kafka-client 本身。

    搜索相关问题首先搜到了 kafka-python issues 1033 

    When no module exists to handle Snappy decompression, the KafkaConsumer returns no messages, rather than reporting the problem. This differs from the legacy Consumer API which provides a much more useful error message.

    Background

    I was attempting to fetch some data from a Kafka topic which was using snappy compression. No data was ever returned even though I knew data was being landed in the topic (confirmed with the Kafka CLI tools). This had me very confused.

    >>> consumer = kafka.KafkaConsumer("test", bootstrap_servers=["svr:9092"])
    >>> consumer.poll(5000)
    {}
    

    I then attempted to use the legacy consumer API which pointed me to the exact problem.

    >>> client = kafka.SimpleClient("svr:9092")
    >>> consumer.close()
    >>> consumer = kafka.SimpleConsumer(client, "group", "test")
    >>> for message in consumer:
    ...     print(message)
    ...
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 353, in __iter__
        message = self.get_message(True, timeout)
      File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 305, in get_message
        return self._get_message(block, timeout, get_partition_info)
      File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 320, in _get_message
        self._fetch()
      File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 379, in _fetch
        fail_on_error=False
      File "/usr/lib/python2.7/site-packages/kafka/client.py", line 665, in send_fetch_request
        KafkaProtocol.decode_fetch_response)
      File "/usr/lib/python2.7/site-packages/kafka/client.py", line 295, in _send_broker_aware_request
        for payload_response in decoder_fn(future.value):
      File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 212, in decode_fetch_response
        for partition, error, highwater_offset, messages in partitions
      File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 219, in decode_message_set
        inner_messages = message.decompress()
      File "/usr/lib/python2.7/site-packages/kafka/protocol/message.py", line 121, in decompress
        assert has_snappy(), 'Snappy decompression unsupported'
    AssertionError: Snappy decompression unsupported
    

    All I needed to do was install the python-snappy module to handle the decompression.

    pip install python-snappy

    跟我目前遭遇的情况非常相似。

    的确我看了一下 requiments 里面也确实没有安装 python-snappy。看了一下我使用的生产者也确实使用了 snappy 来压缩 message 。 

    python-kafka 在新版本中修复了这个问题,如果没有安装 python-snappy 将会把错误 raise 出来而不是让人不知所措。

    所以我直接升级了 python-kafka 然后安装了 python-snappy 便可以愉快运行了!

    Reference:

    https://github.com/dpkp/kafka-python/issues/1033  KafkaConsumer Fails to Report Problem with Compression

    https://github.com/dpkp/kafka-python/issues/1315  High CPU usage in KafkaConsumer.poll() when subscribed to many topics with no new messages (possibly SSL related)

  • 相关阅读:
    Navicat 导入数据报错 --- 1153
    VS2015创建的Asp.net WebApi默认项目在CentOS7+Mono4.2.2+jexus5.8运行不起来的解决方案
    CentOS 6.5安装MySQL中文乱码问题解决
    Centos上Apache重启,mysql重启, nginx 重启方法
    linux自己带的apache重新启动
    CentOS Linux系统下更改Apache默认网站目录
    C语言王国探秘一
    《JavaScript权威指南》学习笔记 第一天。
    《JavaScript权威指南》学习笔记 第二天 下好一盘大棋
    Js里面的强制类型转换
  • 原文地址:https://www.cnblogs.com/piperck/p/10265706.html
Copyright © 2011-2022 走看看