zoukankan      html  css  js  c++  java
  • kafka-python基本使用

    1. kafka-python的安装

      pip3 install kafka-python

    2.kafka-python的基本使用

    • 最简单使用实例

    1.消费端

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    for msg in consumer:
        print(msg)
    • 第1个参数为 topic的名称
    • group_id : 指定此消费者实例属于的组名,可以不指定
    • bootstrap_servers : 指定kafka服务器

    2.生产端

    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
    result = future.get(timeout= 10)
    print(result)

    producer.send函数为发送消息

    • 第1个参数为 topic名称,必须指定
    • key : 键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
    • value : 值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None
    • partition : 指定发送的partition,由于kafka默认配置1个partition,固为0

    future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替

     

    3.发送或接收消息解析

    消费者端接收消息如下:

    ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
    • topic
    • partition
    • offset : 这条消息的偏移量
    • timestamp : 时间戳
    • timestamp_type : 时间戳类型
    • key : key值,字节类型
    • value : value值,字节类型
    • checksum : 消息的校验和
    • serialized_key_size : 序列化key的大小
    • serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1

     

    KafkaConsumer

      • 手动分配partition
    from kafka import KafkaConsumer
    from kafka import TopicPartition
    
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
    consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)])
    for msg in consumer:
        print(msg)
      • 超时处理
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)
    for msg in consumer:
        print(msg)

    若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待

    consumer_timeout_ms : 毫秒数

     

      • 订阅多个topic
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
    consumer.subscribe(topics= ['my_topic', 'topic_1'])
    for msg in consumer:
        print(msg)

    可同时接收多个topic消息

    也可用正则订阅一类topic

    from kafka import KafkaConsumer
    import json
    
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
    consumer.subscribe(pattern= '^my.*')
    for msg in consumer:
        print(msg)
    • 解码json数据

    编码(生产者):value_serializer

    解码(消费者):value_deserializer

    1.先看producer发送的json数据

    from kafka import KafkaProducer
    import json
    
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
    future = producer.send('my_topic' ,  value= {'value_1' : 'value_2'}, partition= 0)
    future.get(timeout= 10)

    2.consumer没有解码收到的数据

    ConsumerRecord(topic='my_topic', partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b'{"value_1": "value_2"}', checksum=None, serialized_key_size=-1, serialized_value_size=22)

    可以看到value为原始的json字节数据,接下来可以再做一步解码操作

    3.consumer自动解码

    from kafka import KafkaConsumer
    import json
    
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
    consumer.subscribe(topics= ['my_topic', 'topic_1'])
    for msg in consumer:
        print(msg)

    接收结果:

    ConsumerRecord(topic='my_topic', partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={'value_1': 'value_2'}, checksum=None, serialized_key_size=-1, serialized_value_size=22)
    • 可以看到接收结果中,value已经自动解码,并为字符串类型
    • 不仅value可以json,key也可以,只需指定 key_deserializer

     

    KafkaProducer

      • 发送字符串类型的key和value
    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer= str.encode, value_serializer= str.encode)
    future = producer.send('my_topic' ,  key= 'key_3', value= 'value_3', partition= 0)
    future.get(timeout= 10)

    指定 key_serializer 和 value_serializer 为 str.encode,但消费者收到的还是字节字符串

    若想要消费者收到的为字符串类型,就需要解码操作,key_deserializer= bytes.decode

     
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode)
    consumer.subscribe(pattern= '^my.*')
    for msg in consumer:
        print(msg)
    • 可压缩消息发送

    compression_type='gzip'

    若消息过大,还可压缩消息发送,可选值为 ‘gzip’, ‘snappy’, ‘lz4’, or None

     
    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip')
    future = producer.send('my_topic' ,  key= b'key_3', value= b'value_3', partition= 0)
    future.get(timeout= 10)
    • 发送msgpack消息

    msgpack为MessagePack的简称,是高效二进制序列化类库,比json高效

    producer = KafkaProducer(value_serializer=msgpack.dumps)
    producer.send('msgpack-topic', {'key': 'value'})

    end

     
     
  • 相关阅读:
    随机取数
    张江在线APP演示
    同步异步GET和POST请求
    IOS内存管理retain,assign,copy,strong,weak
    win7 扩展双屏后 双屏同时显示任务栏
    虚拟机 Linux 操作系统与自己window系统之间共享文件
    Eclipse 中关于 “Program "sh" not found in PATH” 错误
    svn... coult not connect server
    Android应用资源系列之属性(Attribute)资源
    Java break [flag]; 与continue [flag]; 区别
  • 原文地址:https://www.cnblogs.com/xiaozengzeng/p/13621045.html
Copyright © 2011-2022 走看看