zoukankan      html  css  js  c++  java
  • Python3操作Kafka

    前言

    操作Kafka之前,先启动Kafka:

    方式一:进入Kafka安装目录,常规模式启动:
    bin/kafka-server-start.sh config/server.properties
    
    方式二:进入Kafka安装目录,进程守护模式启动kafka:
    nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 & 
    

    另外,Kafka的关闭命令:

    进入Kafka安装目录,执行:
    bin/kafka-server-stop.sh
    

    Python操作Kafka发送字符串

    编写Kafka生产者Producer:

    # kafka_producer.py
    
    from kafka import KafkaProducer
    from time import sleep
    
    def start_producer():
        producer = KafkaProducer(bootstrap_servers='192.168.0.157:9092')
        for i in range(0,100000):
            msg = 'msg is ' + str(i)
            producer.send('topic_test', msg.encode('utf-8'))
            sleep(3)
    
    if __name__ == '__main__':
        start_producer()
    

    编写Kafka消费者KafkaConsumer:

    # kafka_consumer.py
    
    from kafka import KafkaConsumer
    import time
    
    def start_consumer():
        consumer = KafkaConsumer('topic_test', bootstrap_servers = '192.168.0.157:9092')
        for msg in consumer:
            print(msg)
            print("topic = %s" % msg.topic) # topic default is string
            print("partition = %d" % msg.offset)
            print("value = %s" % msg.value.decode()) # bytes to string
            print("timestamp = %d" % msg.timestamp)
            print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) )
    
    if __name__ == '__main__':
        start_consumer()
    

    以上两个代码,先运行消费者,后运行生产者,就看到消费者在监听生产者发信息。

    发送结构化json待续。。

    以上。

  • 相关阅读:
    洛谷1968美元汇率 dp
    洛谷luogu2782
    题解 AT2243 【正方形のチップ】
    [HAOI2006]聪明的猴子 题解
    D:苏卿念发红包
    c++小游戏——扫雷
    c++小游戏——拯救公主
    c++小游戏——三国杀
    C++小游戏——井字棋
    c++小游戏——杀手
  • 原文地址:https://www.cnblogs.com/lovebkj/p/14868166.html
Copyright © 2011-2022 走看看