python代码使用 ,先 pip3 install kafka-python
生产者
from kafka import KafkaProducer
# 生产端
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
for i in range(3):
future = producer.send(
'test', # 设置topic,消费者根据topic拿数据
key=b'qwedef', # 必须是bytes数据类型,同一个key值,会被送至同一个分区
value=b'dewcdef', # 必须是bytes数据类型, 发送的数据
partition=0 # partition设置分区
)
result = future.get(timeout=10) # 监控是否发送成功
# print(result)
消费者
from kafka import KafkaConsumer
# 消费端
consumer = KafkaConsumer(
'test', # 设置topic,从topic里面拿数据
group_id='group1', # 如果不设置这个参数,则消费者A消费后,消费者B会继续消费;如果和其他消费者设置同样的group_id, 则消费者A消费这条数据后,不会被消费者B消费
bootstrap_servers=['localhost:9092']
)
for msg in consumer:
print(msg.key.decode())
print(msg.value.decode())