zoukankan      html  css  js  c++  java
  • python --kakfa(三):kafka模块生产和消费数据

    文章目录
    一、kafka是什么?
    二、使用步骤
    1. 安装
    1.引入库
    2.消费端:读取数据
    3.发送端:发送数据
    总结
    一、kafka是什么?
    kafka 是一个分布式流式计算平台。而在大部分企业开发人员中,都是把 kafka 当成消息系统使用,即它是一个分布式消息队列,很少会使用 kafka 的流式计算。它有四个关键概念:

    topic :kafka 把收到的消息按 topic 进行分类,因此可以理解为 topic 是一种类别
    producer :往kafka 发送消息的用户
    consumer :接收 kafka 消息的用户
    二、使用步骤
    1. 安装
    安装 kafka: pip install kafka-python
    1
    1.引入库
    代码如下(示例):

    from kafka import KafkaConsumer
    from kafka import KafkaProducer
    1
    2
    2.消费端:读取数据
    发送和接受消息解析

    # 客户端接受消息如下
    ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392,
    timestamp_type=0, key=b'my_value', value=None, checksum=None,
    serialized_key_size=8,
    serialized_value_size=-1)
    # 解释
    topic
    partition
    offset : 这条消息的偏移量
    timestamp : 时间戳
    timestamp_type : 时间戳类型
    key : key值,字节类型
    value : value值,字节类型
    checksum : 消息的校验和
    serialized_key_size : 序列化key的大小
    serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    消费:
    # my_topic 为Topic的名称
    # group_id 指定次消费者实例属于的组名,可以不指定
    # bootstrap_servers kafka地址
    consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'])
    for msg in consumer:
    print(msg) # 消费到的数据
    1
    2
    3
    4
    5
    6
    解码json(接收json)
    from kafka import KafkaConsumer
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'],
    # value 自动解码,为字符串型
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    # key 自动解码,为字符串型
    key_deserializer=lambda m: json.loads(m.decode('utf-8')))
    1
    2
    3
    4
    5
    6
    手动设置消费位置和超时时间
    from kafka import TopicPartition
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
    """ 手动配置partition"""
    consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)])
    for msg in consumer:
    print(msg)

    """超时处理"""
    # 若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
    # consumer_timeout_ms : 毫秒数
    consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)
    for msg in consumer:
    print(msg)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    订阅多个topic
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
    # 订阅2个Topic
    consumer.subscribe(topics= ['my_topic', 'topic_1'])
    for msg in consumer:
    print(msg)

    # 正则化订阅一类Topic
    consumer = KafkaConsumer(group_id='group2',bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    consumer.subscribe(pattern= '^my.*')

    for msg in consumer:
    print(msg)
    """消费者接受字符串"""
    consumer = KafkaConsumer(group_id='group2',bootstrap_servers=['localhost:9092'],
    value_deserializer=bytes.decode,
    key_deserializer =bytes.decode)
    consumer.subscribe(pattern= '^my.*')

    for msg in consumer:
    print(msg)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    3.发送端:发送数据
    循环发送重复消息
    # 导入kfkaProducer
    from kafka import KafkaProducer
    # 在本地locahost,端口为9092上创建Broker的producer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    # 发送消息:循环向world这个Topic发送100个消息,消息的内容为some_message_bytes,没有指定Partition,默认
    # 平均在5个Partition上
    for _ in range(100):
    producer.send('world',b'some_message_bytes')
    1
    2
    3
    4
    5
    6
    7
    8
    发送一条消息
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    # my_topic 为Topic,必须指定
    # key :键,必须为字符串,可以不指定(key,values)必须有一个
    # values:值,必须为字符串,可以不指定(key,values)必须有一个
    # Partition:设置发送的Partition.默认为1个,对于默认的情况需要重确定
    future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
    # 函数等待单条消息发送完成和超时的时间,这个必须设置需重新确定。
    result = future.get(timeout= 10)
    # 等同result = time.sleep(10)
    print(result)

    """发送字符串类型的key和value"""
    # 消费者受到的仍为字节字符串
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer= str.encode, value_serializer= str.encode)
    future = producer.send('my_topic' , key= 'key_3', value= 'value_3', partition= 0)
    future.get(timeout= 10)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    发送json文件
    from kafka import KafkaProducer
    import json
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
    value_serializer=lambda m: json.dumps(m).encode('utf-8'))
    # value 为发送的具体内容
    future = producer.send('my_topic' , value= {'value_1' : 'value_2'}, partition= 0)
    future.get(timeout= 10)
    1
    2
    3
    4
    5
    6
    7
    发送msgpack消息
    msgpack为MessagePack的简称,是高效二进制序列化类库,比json高效

    producer = KafkaProducer(value_serializer=msgpack.dumps)
    producer.send('msgpack-topic', {'key': 'value'})
    1
    2
    参考1:https://www.jianshu.com/p/c89997867d48
    参考2:https://blog.csdn.net/luanpeng825485697/article/details/81036028

    总结
    kafka模块生产和消费过程一些基础的整理,和上节的内容类似,都是kafka在python环境下的开发使用,选择根据个人喜好,建议选择confluent-kafka模块。
    ————————————————
    版权声明:本文为CSDN博主「高高兴兴5788」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_33624802/article/details/110436222

  • 相关阅读:
    Oracle常用命令大全(很有用,做笔记)
    表格驱动编程在代码中的应用
    mac 利用svn下载远程代码出现Agreeing to the Xcode/iOS license requires admin privileges, please re-run as root via sudo.
    FAILURE: Build failed with an exception.
    There is an internal error in the React performance measurement code.Did not expect componentDidMount timer to start while render timer is still in progress for another instance
    react native TypeError network request failed
    Android向系统相册中插入图片,相册中会出现两张 一样的图片(只是图片大小不一致)
    react-native Unrecognized font family ‘Lonicons’;
    react-native SyntaxError xxxxx/xx.js:Unexpected token (23:24)
    Application MyTest has not been registered. This is either due to a require() error during initialization or failure to call AppRegistry.registerComponent.
  • 原文地址:https://www.cnblogs.com/ExMan/p/14951677.html
Copyright © 2011-2022 走看看