zoukankan      html  css  js  c++  java
  • python --kakfa(三):kafka模块生产和消费数据

    文章目录
    一、kafka是什么?
    二、使用步骤
    1. 安装
    1.引入库
    2.消费端:读取数据
    3.发送端:发送数据
    总结
    一、kafka是什么?
    kafka 是一个分布式流式计算平台。而在大部分企业开发人员中,都是把 kafka 当成消息系统使用,即它是一个分布式消息队列,很少会使用 kafka 的流式计算。它有四个关键概念:

    topic :kafka 把收到的消息按 topic 进行分类,因此可以理解为 topic 是一种类别
    producer :往kafka 发送消息的用户
    consumer :接收 kafka 消息的用户
    二、使用步骤
    1. 安装
    安装 kafka: pip install kafka-python
    1
    1.引入库
    代码如下(示例):

    from kafka import KafkaConsumer
    from kafka import KafkaProducer
    1
    2
    2.消费端:读取数据
    发送和接受消息解析

    # 客户端接受消息如下
    ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392,
    timestamp_type=0, key=b'my_value', value=None, checksum=None,
    serialized_key_size=8,
    serialized_value_size=-1)
    # 解释
    topic
    partition
    offset : 这条消息的偏移量
    timestamp : 时间戳
    timestamp_type : 时间戳类型
    key : key值,字节类型
    value : value值,字节类型
    checksum : 消息的校验和
    serialized_key_size : 序列化key的大小
    serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    消费:
    # my_topic 为Topic的名称
    # group_id 指定次消费者实例属于的组名,可以不指定
    # bootstrap_servers kafka地址
    consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'])
    for msg in consumer:
    print(msg) # 消费到的数据
    1
    2
    3
    4
    5
    6
    解码json(接收json)
    from kafka import KafkaConsumer
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'],
    # value 自动解码,为字符串型
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    # key 自动解码,为字符串型
    key_deserializer=lambda m: json.loads(m.decode('utf-8')))
    1
    2
    3
    4
    5
    6
    手动设置消费位置和超时时间
    from kafka import TopicPartition
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
    """ 手动配置partition"""
    consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)])
    for msg in consumer:
    print(msg)

    """超时处理"""
    # 若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
    # consumer_timeout_ms : 毫秒数
    consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)
    for msg in consumer:
    print(msg)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    订阅多个topic
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
    # 订阅2个Topic
    consumer.subscribe(topics= ['my_topic', 'topic_1'])
    for msg in consumer:
    print(msg)

    # 正则化订阅一类Topic
    consumer = KafkaConsumer(group_id='group2',bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    consumer.subscribe(pattern= '^my.*')

    for msg in consumer:
    print(msg)
    """消费者接受字符串"""
    consumer = KafkaConsumer(group_id='group2',bootstrap_servers=['localhost:9092'],
    value_deserializer=bytes.decode,
    key_deserializer =bytes.decode)
    consumer.subscribe(pattern= '^my.*')

    for msg in consumer:
    print(msg)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    3.发送端:发送数据
    循环发送重复消息
    # 导入kfkaProducer
    from kafka import KafkaProducer
    # 在本地locahost,端口为9092上创建Broker的producer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    # 发送消息:循环向world这个Topic发送100个消息,消息的内容为some_message_bytes,没有指定Partition,默认
    # 平均在5个Partition上
    for _ in range(100):
    producer.send('world',b'some_message_bytes')
    1
    2
    3
    4
    5
    6
    7
    8
    发送一条消息
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    # my_topic 为Topic,必须指定
    # key :键,必须为字符串,可以不指定(key,values)必须有一个
    # values:值,必须为字符串,可以不指定(key,values)必须有一个
    # Partition:设置发送的Partition.默认为1个,对于默认的情况需要重确定
    future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
    # 函数等待单条消息发送完成和超时的时间,这个必须设置需重新确定。
    result = future.get(timeout= 10)
    # 等同result = time.sleep(10)
    print(result)

    """发送字符串类型的key和value"""
    # 消费者受到的仍为字节字符串
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer= str.encode, value_serializer= str.encode)
    future = producer.send('my_topic' , key= 'key_3', value= 'value_3', partition= 0)
    future.get(timeout= 10)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    发送json文件
    from kafka import KafkaProducer
    import json
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
    value_serializer=lambda m: json.dumps(m).encode('utf-8'))
    # value 为发送的具体内容
    future = producer.send('my_topic' , value= {'value_1' : 'value_2'}, partition= 0)
    future.get(timeout= 10)
    1
    2
    3
    4
    5
    6
    7
    发送msgpack消息
    msgpack为MessagePack的简称,是高效二进制序列化类库,比json高效

    producer = KafkaProducer(value_serializer=msgpack.dumps)
    producer.send('msgpack-topic', {'key': 'value'})
    1
    2
    参考1:https://www.jianshu.com/p/c89997867d48
    参考2:https://blog.csdn.net/luanpeng825485697/article/details/81036028

    总结
    kafka模块生产和消费过程一些基础的整理,和上节的内容类似,都是kafka在python环境下的开发使用,选择根据个人喜好,建议选择confluent-kafka模块。
    ————————————————
    版权声明:本文为CSDN博主「高高兴兴5788」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_33624802/article/details/110436222

  • 相关阅读:
    PullToRefreshListView 应用讲解
    websql的使用/phonegap操作数据库 sqlite
    HTML5本地存储——Web SQL Database
    PhoneGap 数据库操作
    adb shell命令行
    实现调用Android手机的拍照功能
    Android_照相机Camera_调用系统照相机返回data为空
    Android--数据持久化之内部存储、Sdcard存储
    Android之项目推荐使用的第三方库
    面试常考问题大全
  • 原文地址:https://www.cnblogs.com/ExMan/p/14951677.html
Copyright © 2011-2022 走看看