安装kafka-python
pip install kafka-python
生产者
from kafka import KafkaProducer # 有时候导入包会报错,使用pip uninstall kafka-python,卸载后重装可以解决
import json
# 创建producer对象
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'), # 对发送的数据进行序列化处理
bootstrap_servers=['192.168.0.189:9092','192.168.0.190:9092','192.168.0.191:9092'] # 安装了kafka的集群
)
for i in range(10):
# 创建 data
data={
"name":"李四",
"age":23,
"gender":"男",
"id":i
}
# 将data发送到kafka,主题'test_topic'(自定义)
producer.send('test_topic', data)
producer.close()
消费者
from kafka import KafkaConsumer
import json
# 建立消费者对象
consumer = KafkaConsumer('test_topic', # 与消费者中发送消息的 topic对应
bootstrap_servers=['192.168.0.189:9092','192.168.0.190:9092','192.168.0.191:9092'],
value_deserializer=json.loads # 反序列化数据
)
# 生产者中send()一次数据,消费者中就会接收到一次数据,所以需要遍历
for message in consumer:
print(message.value) # 通过.value方法获取到值
consumer.close()
注:有时候建立 生产者 或消费者 对象时会报错,反复多试几次就可以建立成功,具体什么原因还得多研究,后续补充
参考链接