zoukankan      html  css  js  c++  java
  • kafka zookeeper

    import json

    from pykafka import KafkaClient
    from pykafka.simpleconsumer import SimpleConsumer
    import time

    topic = 'qq'
    data = {
    "ddgd":"dgdfg",
    "ddd":"fdg"
    }


    class kafkaConfig:

    def __init__(self):

    # 如果有chroot path 就放到端口号后面/kfk1
    zookeeper_hosts = '10.200.224.13:2181,10.200.224.9:2181,10.200.224.2:2181'
    try:
    self.client = KafkaClient(zookeeper_hosts=zookeeper_hosts)
    print('[conn success!]')
    except Exception as e:
    print('[exception conn]:%s,[msg]:%s' % (e, self.client))

    def topic_lists(self):
    try:
    topic_lists = self.client.topics
    print('[topic lists]:%s' % topic_lists)
    except Exception as e:
    print('[exception topic lists]:%s' % (e, topic_lists))
    return topic_lists

    # produce 同步消息send 单条消息 需要等待
    def msg_sync_send(self, topic, content):
    try:
    topic = self.client.topics[topic]
    with topic.get_sync_producer() as produce:
    produce.produce(bytes(content, encoding='utf-8'), timestamp=round(time.time() * 1000))
    print('[send sync success!]')
    return "send sync success!"
    except Exception as e:
    print('[exception sync send]:%s' % e)

    # produce 异步消息send 单条消息 高吞吐量
    def msg_async_send(self, topic, content):
    try:
    topic = self.client.topics[topic]
    with topic.get_producer() as produce:
    produce.produce(bytes(content, encoding='utf-8'), timestamp=round(time.time() * 1000))
    print('[send async success!]')
    except Exception as e:
    print('[exception async send]:%s' % e)


    if __name__ == '__main__':
    kafka = kafkaConfig()
    kafka.msg_sync_send(topic=topic,
    content=json.dumps(data))
  • 相关阅读:
    第九次作业
    第八次作业
    第七次作业
    组合数学—递推关系与母函数
    组合数学—排列组合
    三角函数
    OpenCV初步
    计算机视觉如何入门
    GDB调试技巧:总结篇
    PyQt5之窗口类型
  • 原文地址:https://www.cnblogs.com/yaohu/p/12597000.html
Copyright © 2011-2022 走看看