zoukankan      html  css  js  c++  java
  • Python往kafka生产消费数据

    安装 kafka:  pip install kafka-python

    生产数据

     1 from kafka import KafkaProducer
     2 import json
     3  
     4 '''
     5     生产者demo
     6     向test_lyl2主题中循环写入10条json数据
     7     注意事项:要写入json数据需加上value_serializer参数,如下代码
     8 '''
     9 producer = KafkaProducer(
    10                             value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    11                             bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667']
    12                          )
    13 for i in range(10):
    14     data={
    15         "name":"李四",
    16         "age":23,
    17         "gender":"",
    18         "id":i
    19     }
    20     producer.send('test_lyl2', data)
    21 producer.close()

    消费数据

     1 from kafka import KafkaConsumer
     2 import json
     3  
     4 '''
     5     消费者demo
     6     消费test_lyl2主题中的数据
     7     注意事项:如需以json格式读取数据需加上value_deserializer参数
     8 '''
     9  
    10  
    11 consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1",
    12                          bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'],
    13                          auto_offset_reset='earliest',value_deserializer=json.loads
    14                          )
    15 for message in consumer:
    16     print(message.value)
  • 相关阅读:
    python_异常处理
    python_类与对象
    函数
    字符串(查找,替换,分割)
    容器类型的数据
    条件语句
    关于WinSock编程的多线程控制
    利用Delphi编写Socket通信程序
    SQL Server数据库开发的二十一条军规
    SQL Server中的日期格式化
  • 原文地址:https://www.cnblogs.com/longsongpong/p/11010195.html
Copyright © 2011-2022 走看看