zoukankan      html  css  js  c++  java
  • python之kafka消费

    使用python3第三方工具,实现kafka消费

     1 # -*- coding: utf-8 -*-
     2 
     3 import uuid
     4 import json
     5 from kafka import KafkaConsumer
     6 from xxxxxx import MessageToDict
     7 from xxx import ObjectInfo
     8 
     9 import sys
    10 import codecs
    11 
    12 sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
    13 
    14 
    15 class ReadKafkaContent(object):
    16     @staticmethod
    17     def deserialize(msg):
    18         """
    19         反序列化
    20         :param msg:
    21         :return:
    22         """
    23         pb_obj = ObjectInfo()
    24         pb_obj.Clear()
    25         pb_obj.ParseFromString(msg.value)
    26         return MessageToDict(pb_obj, including_default_value_fields=True, preserving_proto_field_name=True)
    27 
    28     def consume_msg(self, consumer_obj):
    29         """
    30         逐条消费,返回反序列化后的内容
    31         :param consumer_obj:
    32         :return:
    33         """
    34         try:
    35             while True:
    36                 msg = next(consumer_obj, None)
    37                 if not msg:
    38                     continue
    39                 content = self.deserialize(msg)
    40                 return content
    41         except Exception as ex:
    42             print(u"消费kafka错误,退出测试")
    43             return None
    44 
    45     def entry(self, topic, ip, count=10, log="log_read_kafka_content.json"):
    46         """
    47 
    48         :param topic:topic
    49         :param ip:ip
    50         :param count:查询kafka数据数量,默认10条
    51         :param log:内容保存地址,默认
    52         :return:
    53         """
    54         print(u"开始......")
    55         try:
    56             # 创建kafka消费对象
    57             print(u"创建kafka消费对象...")
    58             consumer = KafkaConsumer(topic, group_id=uuid.uuid4().hex,
    59                                      bootstrap_servers=[ip],
    60                                      auto_offset_reset="latest", consumer_timeout_ms=3 * 1000)
    61         except Exception as ex:
    62             print(u"连接kafka失败!")
    63             return False
    64         print(u"kafka消费对象创建成功.")
    65 
    66         with open(log, "w") as f:
    67             for i in range(count):
    68                 print(u"开始消费第%s条数据..." % str(i + 1))
    69                 content = self.consume_msg(consumer)
    70                 if not content:
    71                     return False
    72 
    73                 # dict转json保存数据内容
    74                 content_json = json.dumps(content, ensure_ascii=False, indent=4)
    75                 f.write(content_json)
    76                 f.write("
    
    ")
    77                 print(u"第%s条数据写入完成." % str(i + 1))
    78 
    79         print(u"完成.")
  • 相关阅读:
    unserialize() 反序列化报错,疑似乱码解决 阿星小栈
    js判断输入字符串是否为空、空格、null的方法总结 阿星小栈
    navicat中创建存储过程、触发器和使用游标的简单实例(图文) 阿星小栈
    mysql触发器new和old区别 阿星小栈
    div垂直居中
    javascript 原型详解
    对Web标准的理解。可用性和可访问性
    css兼容问题
    $(...).live is not function
    js判断两个对象是否相等
  • 原文地址:https://www.cnblogs.com/sunshine-blog/p/11929690.html
Copyright © 2011-2022 走看看