文章目录
一、kafka是什么?
二、使用步骤
1. 安装
1.引入库
2.消费端:读取数据
3.发送端:发送数据
总结
一、kafka是什么?
kafka 是一个分布式流式计算平台。而在大部分企业开发人员中,都是把 kafka 当成消息系统使用,即它是一个分布式消息队列,很少会使用 kafka 的流式计算。它有四个关键概念:
topic :kafka 把收到的消息按 topic 进行分类,因此可以理解为 topic 是一种类别
producer :往kafka 发送消息的用户
consumer :接收 kafka 消息的用户
二、使用步骤
1. 安装
安装 kafka: pip install kafka-python
1
1.引入库
代码如下(示例):
from kafka import KafkaConsumer
from kafka import KafkaProducer
1
2
2.消费端:读取数据
发送和接受消息解析
# 客户端接受消息如下
ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392,
timestamp_type=0, key=b'my_value', value=None, checksum=None,
serialized_key_size=8,
serialized_value_size=-1)
# 解释
topic
partition
offset : 这条消息的偏移量
timestamp : 时间戳
timestamp_type : 时间戳类型
key : key值,字节类型
value : value值,字节类型
checksum : 消息的校验和
serialized_key_size : 序列化key的大小
serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
消费:
# my_topic 为Topic的名称
# group_id 指定次消费者实例属于的组名,可以不指定
# bootstrap_servers kafka地址
consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'])
for msg in consumer:
print(msg) # 消费到的数据
1
2
3
4
5
6
解码json(接收json)
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'],
# value 自动解码,为字符串型
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
# key 自动解码,为字符串型
key_deserializer=lambda m: json.loads(m.decode('utf-8')))
1
2
3
4
5
6
手动设置消费位置和超时时间
from kafka import TopicPartition
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
""" 手动配置partition"""
consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)])
for msg in consumer:
print(msg)
"""超时处理"""
# 若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
# consumer_timeout_ms : 毫秒数
consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)
for msg in consumer:
print(msg)
1
2
3
4
5
6
7
8
9
10
11
12
13
订阅多个topic
consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
# 订阅2个Topic
consumer.subscribe(topics= ['my_topic', 'topic_1'])
for msg in consumer:
print(msg)
# 正则化订阅一类Topic
consumer = KafkaConsumer(group_id='group2',bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
print(msg)
"""消费者接受字符串"""
consumer = KafkaConsumer(group_id='group2',bootstrap_servers=['localhost:9092'],
value_deserializer=bytes.decode,
key_deserializer =bytes.decode)
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
print(msg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
3.发送端:发送数据
循环发送重复消息
# 导入kfkaProducer
from kafka import KafkaProducer
# 在本地locahost,端口为9092上创建Broker的producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 发送消息:循环向world这个Topic发送100个消息,消息的内容为some_message_bytes,没有指定Partition,默认
# 平均在5个Partition上
for _ in range(100):
producer.send('world',b'some_message_bytes')
1
2
3
4
5
6
7
8
发送一条消息
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# my_topic 为Topic,必须指定
# key :键,必须为字符串,可以不指定(key,values)必须有一个
# values:值,必须为字符串,可以不指定(key,values)必须有一个
# Partition:设置发送的Partition.默认为1个,对于默认的情况需要重确定
future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
# 函数等待单条消息发送完成和超时的时间,这个必须设置需重新确定。
result = future.get(timeout= 10)
# 等同result = time.sleep(10)
print(result)
"""发送字符串类型的key和value"""
# 消费者受到的仍为字节字符串
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer= str.encode, value_serializer= str.encode)
future = producer.send('my_topic' , key= 'key_3', value= 'value_3', partition= 0)
future.get(timeout= 10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
发送json文件
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
# value 为发送的具体内容
future = producer.send('my_topic' , value= {'value_1' : 'value_2'}, partition= 0)
future.get(timeout= 10)
1
2
3
4
5
6
7
发送msgpack消息
msgpack为MessagePack的简称,是高效二进制序列化类库,比json高效
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
1
2
参考1:https://www.jianshu.com/p/c89997867d48
参考2:https://blog.csdn.net/luanpeng825485697/article/details/81036028
总结
kafka模块生产和消费过程一些基础的整理,和上节的内容类似,都是kafka在python环境下的开发使用,选择根据个人喜好,建议选择confluent-kafka模块。
————————————————
版权声明:本文为CSDN博主「高高兴兴5788」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_33624802/article/details/110436222