zoukankan      html  css  js  c++  java
  • python --kafka(二): confluent-kafka 模块生产数据消费数据

    文章目录
    前言
    一、confluent-kafka 是什么?
    二、使用步骤
    1.引入库
    2.消费数据
    2.1 初始化consumer对象
    2.2 消费数据偏移量
    3. 生产数据
    总结
    前言
    kafka是一个开源的流处理平台,一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

    一、confluent-kafka 是什么?
    confluent-kafka 模块 confluent-kafka是Python模块,推荐使用,性能优于kafka-python
    参考文档:https://docs.confluent.io/current/clients/confluent-kafka-python/index.html

    二、使用步骤
    1.引入库
    安装 : pip install confluent-kafka

    代码示例:

    # 引入生产者、消费者
    from confluent_kafka import Consumer, Producer
    # 引入指针和kafka异常数据机制
    from confluent_kafka import TopicPartition, KafkaError
    1
    2
    3
    4
    2.消费数据
    2.1 初始化consumer对象
    创建consumer对象,其中参数bootstrap.servers 为kafka的地址:接口, group.id为分组号,default.topic.config为topic的配置设置,其中auto.offset.reset为消费方式:‘smallest’ 每次从最小的offset位置消费,‘latest’ 从最新的offset位置消费数据

    代码如下:

    conf = {'bootstrap.servers': "host1:9092", # 地址接口host1:9092
    'group.id': "foo", # 分组号
    'enable.auto.commit': True, # 是否自动提交偏移量
    'default.topic.config': {'auto.offset.reset': 'smallest'} # 默认设置topic的消费的方式
    # 'default.topic.config': {'auto.offset.reset': 'latest'}
    }
    subscriber = Consumer(conf)
    1
    2
    3
    4
    5
    6
    7
    2.2 消费数据偏移量
    手动设置开始消费的位置:

    topic: 消费主题
    topic_part : 一次消费数据的批次
    init_offset:最新offset的位置
    kafka_consumer_batch2 : kafka_consumer_batch为数值,即指针向前拨回init_offset - kafka_consumer_batch2 位置处。

    topic_part = TopicPartition(topic, 0)
    topic_part_out = subscriber.committed([topic_part])
    init_offset = topic_part_out[0].offset
    # 将0位置设置为当前的位置
    topic_part_new = TopicPartition(topic, 0, init_offset - (kafka_consumer_batch*2))
    # 提交offset
    subscriber.commit(offsets=[topic_part_new])
    1
    2
    3
    4
    5
    6
    7
    根据时间戳消费数据,begin_time需要为int类型的数据,
    part_new = TopicPartition(topic, 0, begin_time)
    begin_offset = consumer.offsets_for_times(part_new)
    consumer.commit(offsets=[begin_offset])
    # 将0 位置设置成当前需要的位置
    par_p = consumer.committed([TopicPartition(begin_offset.topic, 0)])
    print('####################', par_p[0].offset)
    1
    2
    3
    4
    5
    6
    消费数据
    # 批量消费 consumer_topic_list 为list列表,其中存放的为topic的名称
    subscriber.subscribe(consumer_topic_list)
    while True:
    # kafka_consumer_batch 消费的batch size 单位为整数, kafka_consumer_timeout设置超时时间,单位为s
    msg = subscriber.consume(kafka_consumer_batch,kafka_consumer_timeout)
    if msg is None:
    continue
    else:
    if not msg.error() is None:
    print msg.error()
    else:
    message = msg.value()
    print(msg.partition(), msg.offset())
    subscriber.close()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    3. 生产数据
    import socket

    conf = {'bootstrap.servers': "host1:9092",
    'client.id': socket.gethostname()}
    # 初始化producer对象
    producer = Producer(conf)
    # 推送数据 topic为主题,json_data 为传送的数据
    # 生产数据异常处理机制
    def __publish_delivery_report(err, msg) -> None:
    """
    发布消息记录
    :param err: 消息的错误信息
    :param msg: 消息
    :return:
    """
    try:
    if err is not None:
    logger_all.error('Message delivery failed: {}'.format(err))
    else:
    logger_all.debug('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    except Exception as e:
    logger_all.error(str(e))
    logger_all.error(traceback.format_exc())
    # json数据格式转化
    data = json.dumps(need_push)
    # 推送数据 publisher_topic推送主题, data 数据, callback 召回处理机制
    publisher.produce(publisher_topic, data, callback=__publish_delivery_report)
    publisher.poll(kafka_producer_timeout) # kafka_producer_timeout 为超时时间
    publisher.flush()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    总结
    kafka既可以用于python开发和可以用于Java开发,使用场景相对较广泛。 同时,python对应的包也存在很多,除了confluent-kafka 还有kafka-python包,应用场景较为广泛,开发相对简洁。
    ————————————————
    版权声明:本文为CSDN博主「高高兴兴5788」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_33624802/article/details/110424769

  • 相关阅读:
    std thread
    windows更新包发布地址
    How to set up logging level for Spark application in IntelliJ IDEA?
    spark 错误 How to set heap size in spark within the Eclipse environment?
    hadoop 常用命令
    windows 安装hadoop 3.2.1
    windows JAVA_HOME 路径有空格,执行软连接
    day01MyBatisPlus条件构造器(04)
    day01MyBatisPlus的CRUD 接口(03)
    day01MyBatisPlus入门(02)
  • 原文地址:https://www.cnblogs.com/ExMan/p/14951662.html
Copyright © 2011-2022 走看看