zoukankan      html  css  js  c++  java
  • python3操作Kafka

    # -- coding: UTF-8
    import datetime
    import json
    import time
    from kafka import KafkaProducer
    
    producer=KafkaProducer(bootstrap_servers='192.168.10.10:9092')
    for i in range(111):
        future = producer.send('test', json.dumps(
            {"method": "get", "step": i, "type": "test", "testName": "kafka",
             "cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
             "info": "demo{}".format(1)}).encode())
        record_metadata = future.get(timeout=10)
        print(record_metadata, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        time.sleep(3)
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('test', bootstrap_servers=['192.168.10.10:9092'], auto_offset_reset='earliest')
    
    for message in consumer:
        print(message)
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                             message.offset, message.key,
                                             message.value))
    

      

  • 相关阅读:
    vue day6 分页显示
    vue day5 分页控件
    vue day4 table
    c# excel xlsx 保存
    diff算法
    Web Workers
    多线程
    Http请求优化
    高效编写代码
    渲染引擎
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/11838254.html
Copyright © 2011-2022 走看看