python-kafka 参考: https://blog.csdn.net/see_you_see_me/article/details/78468421
python kafka 的 api 的基本操作: https://www.cppentry.com/bencandy.php?fid=120&id=207208
操作 比较全: https://blog.csdn.net/qq_41262248/article/details/80790918
kafka 配置详解:
https://blog.csdn.net/weixin_40596016/article/details/79562023
依赖:
pip install kafka-python
发送数据:
1)、命令行方式---普通的发送方式
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='192.168.120.11:9092')
for _ in range(100):
... producer.send('world',b'some_message_bytes')
.. producer.close().
上面的几行功能分别是:
导入KafkaProducer
创建连接到192.168.120.11:9092这个Broker的Producer,
循环向world这个Topic发送100个消息,消息内容都是some_message_bytes',这种发送方式不指定Partition,kafka会均匀大把这些消息分别写入5个Partiton里面,
更详细的说明可以参考 https://kafka-python.readthedocs.io/en/master/index.html
2)、命令行方式---发送json字符串
json作为一种强大的文本格式,已经得到非常普遍的应用,kafak-python也支持发送json格式的消息
其实如果你参考https://kafka-python.readthedocs.io/en/master/index.html这里的KafkaProducer里面的发送json
producer = KafkaProducer(bootstrap_servers='192.168.120.11:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('world', {'key1': 'value1'})
producer.close()
4)、命令行方式--发送压缩字符串
上面都是测试各个命令的使用,接下来,我们写一个完整的脚本,这个脚本的功能是把指定目录下的文件名发送到world这个topic
file_monitor.py脚本
#-*- coding: utf-8 -*- from kafka import KafkaProducer import json import os import time from sys import argv producer = KafkaProducer(bootstrap_servers='192.168.18.129:9092') def log(str): t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime()) print("[%s]%s"%(t,str)) def list_file(path): dir_list = os.listdir(path); for f in dir_list: producer.send('world',f) producer.flush() log('send: %s' % (f)) list_file(argv[1]) producer.close() log('done')
假如我们要监控/opt/jdk1.8.0_91/lib/missioncontrol/features这个目录下的文件,可以这样执行
python file_monitor.py /opt/jdk1.8.0_91/lib/missioncontrol/features
原文链接:https://blog.csdn.net/see_you_see_me/article/details/78468421
2、创建Consumer
通常使用Kafka时会创建不同的Topic,并且在Topic里面创建多个Partiton,因此作为Consumer,通常是连接到指定的Broker,指定的Topic来消费消息。
完整的python 脚本
consumer.py
#-*- coding: utf-8 -*- from kafka import KafkaConsumer import time def log(str): t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime()) print("[%s]%s"%(t,str)) log('start consumer') #消费192.168.120.11:9092上的world 这个Topic,指定consumer group是consumer-20171017 consumer=KafkaConsumer('world',group_id='consumer-20171017',bootstrap_servers=['192.168.120.11:9092']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value) log(recv)
原文链接:https://blog.csdn.net/see_you_see_me/article/details/78468421
手动提交:
#encoding=utf-8 from kafka.consumer.group import KafkaConsumer print("########################################################") print("############## kafka consumer test 1 ################") print("########################################################") ''' 自动提交位移设为flase, 默认为取最新的偏移量,重新建立一个group_id, 这样就实现了不影响别的应用程序消费数据,又能消费到最新数据,实现预警(先于用户发现)的目的。 ''' print("============= start consumer +++=========== ") consumer = KafkaConsumer("json",bootstrap_servers = ['192.168.18.129:9092'], group_id ='test_group_json', auto_offset_reset ='earliest', enable_auto_commit =False) for msg in consumer: offset = msg.offset print("offset"+str(offset)) print("============= consumering 2 +++=========== ") recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value) print(recv) consumer.commit() print("============= start consumer +++=========== ")
5、消费者(手动设置偏移量)
from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer('test', bootstrap_servers=['172.21.10.136:9092']) print consumer.partitions_for_topic("test") #获取test主题的分区信息 print consumer.topics() #获取主题列表 print consumer.subscription() #获取当前消费者订阅的主题 print consumer.assignment() #获取当前消费者topic、分区信息 print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量 consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
6、消费者(订阅多个主题)
from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092']) consumer.subscribe(topics=('test','test0')) #订阅要消费的主题 print consumer.topics() print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
7、消费者(手动拉取消息)
from kafka import KafkaConsumer import time consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092']) consumer.subscribe(topics=('test','test0')) while True: msg = consumer.poll(timeout_ms=5) #从kafka获取消息 print msg time.sleep(1)