zoukankan      html  css  js  c++  java
  • kafka rest api service的使用

    转载:https://www.cnblogs.com/xnchll/p/9618432.html

     kafka集群需要安装confluent https://www.cnblogs.com/xnchll/p/9404705.html

    转载 https://blog.51cto.com/9291927/2504495

    Kafka快速入门(十二)——Python客户端

    一、confluent-kafka

    1、confluent-kafka简介

    confluent-kafka是Python模块,是对librdkafka的轻量级封装,支持Kafka 0.8以上版本。本文基于confluent-kafka 1.3.0编写。
    GitHub地址:
    https://github.com/confluentinc/confluent-kafka-python

    2、confluent-kafka特性

    (1)可靠。confluent-kafka是对广泛应用于各种生产环境的librdkafka的封装,使用Java客户端相同的测试集进行测试,由Confluent进行支持。
    (2)性能。性能是一个关键的设计考虑因素,对于较大的消息,最大吞吐量与Java客户机相当(Python解释器的开销影响较小),延迟与Java客户端相当。
    (3)未来支持。Coufluent由Kafka创始人创建,致力于构建以Apache Kafka为核心的流处理平台。确保核心Apache Kafka和Coufluent平台组件保持同步是当务之急。

    3、confluent-kafka安装

    创建confluent源:
    进入/etc/yum.repos.d目录创建confluent.repo文件:

    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.4/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.4
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
    enabled=1

    安装:

    sudo yum clean all &&  sudo yum install confluent-community-2.12
    sudo yum install librdkafka-devel python-devel
    pip install confluent-kafka

    安装AvroProducer、AvroConsumer:
    pip install "confluent-kafka[avro]"

    二、coufluent-kafka客户端API

    1、confluent_kafka.Consumer

    Consumer(config)
    使用指定的配置dict创建Consumer实例。
    Consumer.assign(partitions)
    由指定TopicPartition列表设置Consumer的分区分配策略,启动消费。如果对关闭的Consumer调用本函数会抛出RuntimeError。
    Consumer.assignment()
    返回当前分区分配策略,返回list(TopicPartition)
    Consumer.close()
    关闭和终止Consumer实例,关闭Consumer实例会执行以下操作:停止消费;提交位移(如果enable.auto.commit设置为False会抛出异常)、离开Consumer Group。
    Consumer.commit([message=None][, offsets=None][, asynchronous=True])
    提交一条消息或位移列表,message和offsets是互斥参数,如果没有指定参数,会使用当前分区分配策略的offsets。
    message:提交消息的位移加1
    offsets:要提交的TopicPartition列表
    asynchronous:是否异步提交。异步提交会立即返回None。如果设置为False,会阻塞直到提交成功或失败,如果提交成功,会返回提交的offsets。注意:提交成功,需要对返回的TopicPartition列表的每个TopicPartition的err字段进行检查,TopicPartition可能会提交失败。
    Consumer.committed(partitions[, timeout=None])
    获取已提交的分区的offsets。
    partitions:TopicPartition列表
    timeout:请求超时,单位秒。
    返回TopicPartition列表或错误集
    Consumer.consume([num_messages=1][, timeout=-1])
    消费消息,调用回调函数,返回消息列表,如果超时,返回空。
    应用程序必须检查返回Message的error方法,正常Message的error返回None。
    num_messages:返回的最大消息数量,默认为1
    timeout:阻塞等待消息、事件、回调函数的最大时间
    Connsumer.get_watermark_offsets(partition[, timeout=None][, cached=False])
    获取分区的低水位和高水位
    partition:TopicPartition对象
    Timeout:请求超时,
    Cached:是否替换正在查询的Broker使用的缓存信息。
    成功返回低水位和高水位的元组,超时返回None。
    Consumer.list_topics([topic=None][, timeout=-1])
    请求集群的元数据信息。
    topic:字符串类,如果指定,只请求本Topic的信息,否则返回集群的所有Topic。
    timeout:超时前的最大响应时间,-1表示永不超时。
    返回ClusterMetadata类型
    Consumer.offsets_for_times(partitions[, timeout=None])
    对指定的分区列表根据时间戳查询offsets。
    返回每个分区的offsets大于等于指定分区列表的时间戳的位移。
    partitions:TopicPartition列表
    timeout:请求超时时间。
    Consumer.pause(partitions)
    暂停指定分区列表的分区的消费
    Consumer.poll([timeout=None])
    消费消息,调用回调函数,返回事件。
    应用程序必须检查返回的Message对象的error()方法,如果是正常消息,返回None。
    返回Message对象回None。
    Consumer.position(partitions)
    获取指定分区列表分区的位移
    partitions:分区列表
    返回带位移的TopicPartition列表,当前位移是最新消费消息的位移加1。
    Consumer.resume(partitions)
    恢复指定分区列表的分区的消费
    partitions:要恢复的TopicPartitio列表
    Consumer.seek(partition)
    定位分区的消费位移到offset。offset可以是绝对值,也可以是逻辑位移OFFSET_BEGINNING。本函数只用于活跃消费分区更新消费位移,要设置分区的起始位移可以使用assign函数。
    Consumer.store_offsets([message=None][, offsets=None])
    存储一条消息的位移或位移列表。
    message和offsets是互斥参数。
    被存储的位移会根据auto.commit.interval.m参数值被提交,使用本函数时enable.auto.offset.store参数必须被设置为False。
    message:存储message的位移加1。
    offsets:要存储位移的TopicPartition列表
    Consumer.subscribe(topics[, on_assign=None][, on_revoke=None])
    设置要订阅的Topic列表,会替代此前订阅的Topic。
    订阅的Topic名称支持正则表达式,使用”^”作为Topic名称前缀。
    topics:Topic名称列表
    on_assign:完成分区再分配的回调函数
    on_revoke:再平衡操作的

    on_assign(consumer, partitions)
    on_revoke(consumer, partitions)

    Consumer.unassign()
    删除当前分区分配策略和停止消费
    Consumer.unsubscribe()
    删除当前订阅Topic

    2、confluent_kafka.Producer

    confluent_kafka.Producer是异步Kafka生产者。
    Producer.Producer(config)
    使用config字典创建Producer实例。
    config:配置字典对象,至少应该设置bootstrap.servers属性
    Producer.len()
    返回要传递到Broker的消息数量
    Producer.flush([timeout])
    等待Producer队列中要传递的所有消息。
    timeout:阻塞的最大时间,要求librdkafka版本大于0.9.4。
    返回Producer消息队列中仍然存在的消息的数量。
    Producer.list_topics([topic=None][, timeout=-1])
    请求集群的元数据。
    topic:如果指定Topic,只返回Topic的相应元数据,否则返回所有Topic的元数据。
    timeout:超时前的最大响应时间,-1表示永不超时。
    返回ClusterMetadata类型。
    Producer.poll([timeout])
    Poll生产者事件,调用相应回调函数。
    timeout:阻塞等待事件的最大时间。
    返回处理事件的数量。
    Producer.produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
    生产消息到指定Topic,异步操作。
    topic:要生产消息到的指定Topic。
    value:str或bytes类型,消息数据。
    key:str或bytes类型,消息Key。
    partition:要生产消息到指定分区,否则使用内置分区分配策略。
    on_delivery:投递报告回调函数
    timestamp:消息事件戳,要求librdkafka v0.9.4以上版本,api.version.request=true, Kafka 0.10.0.0以上版本。
    headers:消息头,字典类型,消息头的key必须是字符串,value必须是二进制数据,unicode或None。要求librdkafka v0.11.4以上版本和Kafka 0.11.0.0以上版本。

    3、confluent_kafka.admin.AdminClient

    AdminClient提供对Kafka Broker、Topic、Group、Broker支持的其它资源进行管理操作。
    AdminClient.alter_configs(resources, **kwargs)
    更新指定resource的配置值。
    AdminClient.create_partitions(new_partitions, **kwargs)
    创建指定Topic的分区
    AdminClient.create_topics(new_topics, **kwargs)
    创建Topic
    AdminClient.delete_topics(topics, **kwargs)
    删除Topic
    AdminClient.describe_configs(resources, **kwargs)
    获取指定resource的配置

    4、confluent_kafka.admin.BrokerMetadata

    BrokerMetadata定义了Kafka Broker的信息,是非实例化类,BrokerMetadata包含的属性如下:
    id:整型,Broker ID
    host:字符串类型,Broker 主机名
    port:整型,Broker端口

    5、confluent_kafka.admin.ClusterMetadata

    ClusterMetadata定义了Kafka集群、Broker、Topic等信息,是非实例化类,ClusterMetadata包含如下属性:
    cluster_id :字符串,Kafka集群ID字符串。
    controller_id:id类型,当前Controller Broker ID
    brokers:字典,key为整型Broker ID,值为BrokerMetadata对象。
    topics :字典,key为字符串Topic名称, 值为TopicMetadata对象。
    orig_broker_id:整型,数据源于Broker的ID。
    orig_broker_name:字符串,数据源于Broker的名称或地址。

    6、confluent_kafka.admin.ConfigEntry

    ConfigEntry由describe_configs()返回指定资源的配置实体,是非实例化类,ConfigEntry包含如下属性:
    name:字符串,配置属性名称。
    value:字符串,配置值。
    source :ConfigSource类型,配置源。
    is_read_only:bool类型,指明配置属性是否只读。
    is_default:bool类型,指明配置属性是否使用默认值。
    is_sensitive:bool类型,指明配置属性值是否包含敏感信息,如安全配置。
    is_synonym:bool类型,指明配置属性是否是赋配置实体的别名。
    synonyms :list类型,配置属性的备用源的配置实体列表。

    7、confluent_kafka.admin.ConfigResource

    ConfigResource(restype, name, set_config=None, described_configs=None, error=None)
    restype:resource类型
    name:resource名称
    set_config:胚珠属性值设置方法
    described_configs:
    error:错误信息
    Kafka资源ConfigResource.Type如下:
    ANY= 1,任何资源
    BROKER= 4,Broker资源,资源名称是Broker ID
    GROUP= 3,Group资源,资源名称是group.id
    TOPIC= 2,Topic资源,资源名称是Topic名称。
    UNKNOWN= 0,未知类型,未设置类型。
    set_config(name, value, overwrite=True)
    设置、覆写配置实体
    name:配置属性名称。
    value:配置属性值。
    overwrite:是否覆写。

    8、confluent_kafka.admin.ConfigSource

    ConfigSource是由 describe_configs()返回的配置实体的配置源。
    DEFAULT_CONFIG= 5
    DYNAMIC_BROKER_CONFIG= 2
    DYNAMIC_DEFAULT_BROKER_CONFIG= 3
    DYNAMIC_TOPIC_CONFIG= 1
    STATIC_BROKER_CONFIG= 4
    UNKNOWN_CONFIG= 0

    9、confluent_kafka.admin.PartitionMetadata

    PartitionsMetadata包含Kafka分区的元数据,是非实例化类,PartitionsMetadata包含的属性如下:
    id :整型,分区编号。
    leader :整型,分区的当前Leader,或是-1。
    replicas: 整型列表,分区的副本的Broker ID的列表。
    Isrs: 整型列表,分区的ISR Broker ID列表。
    error:KafkaError类型,分区错误。

    10、confluent_kafka.admin.TopicMetadata

    TopicMetadata包含Kafka Topic相关的元数据,是非实例化类,TopicMetadata包含的属性如下:
    topic:字符串类型,Topic名称
    partitions:字典,key是分区编号,值是PartitionMetadata对象。
    error:KafkaError类型,Topic错误。

    三、Avro序列化组件

    1、confluent_kafka.avro.AvroConsumer

    AvroConsumer(config, schema_registry=None, reader_key_schema=None, reader_value_schema=None)
    Kafka Consumer客户端,对消息进行avro模式解码,处理消息反序列化。
    config:字典,配置参数,包含schema.registry.url和bootstrap.servers参数。
    reader_key_schema:schema类型,消息key的读取器。
    reader_value_schema:(schema类型,消息值的读取器。
    AvroConsumer.poll(timeout=None)
    confluent_kafka.Consumer类的poll方法的覆写,使用avro scema处理消息的反序列化。

    2、confluent_kafka.avro.AvroProducer

    AvroProducer(config, default_key_schema=None, default_value_schema=None, schema_registry=None)
    对消息进行avro schema编码的Kafka Producer客户端,处理结构注册、消息序列化。
    config:字典,配置参数,包含schema.registry.url和bootstrap.servers参数。
    default_key_schema:字符串,可选,key的默认avro schema。
    default_value_schema:字符串,可选,value的默认avro schema。
    AvroProducer.produce(**kwargs)
    异步发送消息到Kafka,使用指定编码,默认使用avro schema。
    topic:字符串,Topic名称。
    value: object类型,要序列化的对象。
    value_schema:字符串,值的Avro schema。
    key:object类型,要序列化的对象。
    key_schema:字符串,键Avro schema。

    四、coufluent-kafka主要类API

    1、confluent_kafka.Message

    Message对象表示一条消费或生产的消息,或是一个事件。
    应用程序必须使用Message.error()方法检查Message对象是否是一个正常的Message还是一个错误事件。
    Message类不是用户可实例化的类,
    Message.len()
    返回消息数据的长度
    Message.error()
    Message对象用于传播错误或事件,应用程序必须检查error()方法以确定Message对象是否是一个正常的消息(返回None),错误或是事件(返回KafkaError对象)。
    Message.headers()
    获取消息的头。消息头是键值对集合,消息头的键是有序的,可以重重复。
    返回消息头键值对的列表。
    Message.key()
    获取消息键。
    Message.offset()
    消息位移
    Message.partition()
    分区编号
    Message.set_headers(value)
    使用新值设置Message.key字段的值。
    Message.set_value()
    使用新值设置Message.value字段的值
    Message.timestamp()
    获取消息的时间戳类型和时间戳。时间戳类型如下:
    TIMESTAMP_NOT_AVAILABLE:Broker不支持的时间戳
    TIMESTAMP_CREATE_TIME:生产者时间戳
    TIMESTAMP_LOG_APPEND_TIME:Broker接收时间
    返回时间戳类型和时间戳的元组。
    如果返回的时间戳类型是TIMESTAMP_NOT_AVAILABLE,返回的时间戳应该被忽略。时间戳要求Kafka 0.10.0.0以上版本,客户端配置的api.version.request属性值为True。
    Message.topic()
    返回消息的Topic
    Message.Value()
    返回消息数据。

    2、confluent_kafka.TopicPartition

    TopicPartition是一种泛类型,用户保存单个分区及其相关的各种信息。通常用于为不同的操作提供TopicPartition列表。
    TopicPartition(topic[, partition][, offset])
    实例化TopicPartition对象。
    topic:Topic名称
    partition:分区编号
    offset:位移
    TopicPartition.error:属性值,使用KafkaError表示一个错误。
    TopicPartition.offset:属性值,位移
    TopicPartition.partition:属性值,分区编号
    TopicPartition.topic:属性值,Topic名称

    3、confluent_kafka.KafkaError

    KafkaError表示Kafka错误和事件对象,不是用户实例化类,用于事件传播、错误传播、异常。
    KafkaError.code()
    返回错误或事件的代码
    KafkaError.name()
    返回错误或事件的枚举名称
    KafkaError.str()
    返回事件或错误的可读字符串描述

    4、confluent_kafka.KafkaException

    KafkaException是对KafkaError类的封装,使用exception.args[0]可以提取KafkaError对象。

    5、Offset

    OFFSET_BEGINNING:从分区开始
    OFFSET_END:分区结束位置
    OFFSET_STORED:使用存储提交位移
    OFFSET_INVALID:非法/默认位移。

    6、confluent_kafka.ThrottleEvent

    ThrottleEvent包含限制请求的相关数据,是非实例化类,包含如下属性:
    broker_name:字符串,限制请求的Broker的主机名称。 
    broker_id:整型,Broker ID。
    throttle_time:float类型,Broker限制请求的时间,单为秒。

    7、Configuration

    生产者和消费者实例的配置。

    conf = {'bootstrap.servers': 'mybroker.com',
            'group.id': 'mygroup', 'session.timeout.ms': 6000,
            'on_commit': my_commit_callback,
            'default.topic.config': {'auto.offset.reset': 'smallest'}}
    consumer = confluent_kafka.Consumer(conf)

    Python绑定提供了其它的配置属性:
    default.topic.config:属性值是顶层配置属性字典。

    五、coufluent-kafka示例

    1、Producer

    from confluent_kafka import Producer
    
    class KafkaProducer:
        def __init__(self, brokers):
            self.producer = Producer({'bootstrap.servers': brokers})
    
        def sendMessage(self, topic, payloads):
            for payload in payloads:
                # Trigger any available delivery report callbacks from previous produce() calls
                self.producer.poll(0)
                # Asynchronously produce a message, the delivery report callback
                # will be triggered from poll() above, or flush() below, when the message has
                # been successfully delivered or failed permanently.
                self.producer.produce(topic, payload.encode('utf-8'), callback=self.delivery_report)
    
            # Wait for any outstanding messages to be delivered and delivery report
            # callbacks to be triggered.
            self.producer.flush()
    
        @staticmethod
        def delivery_report(err, msg):
            """ Called once for each message produced to indicate delivery result.
                Triggered by poll() or flush(). """
            if err is not None:
                print('Message delivery failed: {}'.format(err))
            else:
                print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    if __name__ == "__main__":
        producer = KafkaProducer('192.168.0.105:9092')
        source_data = list()
        for x in range(1000):
            source_data.append("Hello kafka{}".format(x))
        producer.sendMessage('test2', source_data)

    进入kafka容器:
    docker exec -it kafka-test /bin/bash
    查看Topic的消息:
    kafka-console-consumer.sh --bootstrap-server kafka-test:9092 --topic test2 --from-beginning

    2、Consumer

    from confluent_kafka import Consumer
    
    class KafkaConsumer:
        def __init__(self, brokers, group):
            config = dict()
            config['bootstrap.servers'] = brokers
            config['group.id'] = group
            config['auto.offset.reset'] = 'earliest'
            self.consumer = Consumer(config)
    
        def subscribe(self, topics):
            self.consumer.subscribe(topics=topics)
    
        def pull(self):
            while True:
                msg = self.consumer.poll(1.0)
                if msg is None:
                    continue
                if msg.error():
                    print("Consumer error: {}".format(msg.error()))
                    continue
                print('Received message: {}'.format(msg.value().decode('utf-8')))
    
        def close(self):
            self.consumer.close()
    
    if __name__ == "__main__":
        consumer = KafkaConsumer("192.168.0.105:9092", "test_group")
        consumer.subscribe(["test1", "test2"])
        consumer.pull()
        consumer.close()

    3、AvroProducer

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    class Message:
        """
        Message struct
        """
        def __init__(self, key, value):
            self.key = {"name": "{}".format(key)}
            self.value = {"name": "{}".format(value)}
    
    class KafkaAvroProducer:
        """
        Kafka Avro Producer Wrapper class
        """
        value_schema = ""
        key_schema = ""
    
        def __init__(self, brokers, schema_registry_url):
            config = dict()
            config['bootstrap.servers'] = brokers
            config['on_delivery'] = KafkaAvroProducer.delivery_report
            config['schema.registry.url'] = schema_registry_url
            self.avro_producer = AvroProducer(config=config,
                                              default_key_schema=KafkaAvroProducer.key_schema,
                                              default_value_schema=KafkaAvroProducer.value_schema)
    
        @classmethod
        def register_value_schema(cls, schema):
            cls.key_schema = avro.loads(schema)
    
        @classmethod
        def register_key_schema(cls, schema):
            cls.value_schema = avro.loads(schema)
    
        def send_message(self, topic, messages):
            for message in messages:
                self.avro_producer.produce(topic='test1', value=message.value, key=message.key)
    
            self.avro_producer.flush()
    
        @staticmethod
        def delivery_report(err, msg):
            """ Called once for each message produced to indicate delivery result.
                Triggered by poll() or flush(). """
            if err is not None:
                print('Message delivery failed: {}'.format(err))
            else:
                print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    if __name__ == "__main__":
        value_schema_str = """
        {
           "namespace": "kafka.test",
           "name": "value",
           "type": "record",
           "fields" : [
             {
               "name" : "name",
               "type" : "string"
             }
           ]
        }
        """
        key_schema_str = """
        {
           "namespace": "kafka.test",
           "name": "key",
           "type": "record",
           "fields" : [
             {
               "name" : "name",
               "type" : "string"
             }
           ]
        }
        """
        messages = list()
        for i in range(1000):
            messages.append(Message("key{}".format(i), "Hello Confluent Kafka{}".format(i)))
        KafkaAvroProducer.register_key_schema(key_schema_str)
        KafkaAvroProducer.register_value_schema(value_schema_str)
        schema_registry_url = 'http://127.0.0.1:8081'
        avroProducer = KafkaAvroProducer("192.168.0.105:9092", schema_registry_url)
        avroProducer.send_message("test1", messages=messages)

    4、AvroConsumer

    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka.avro.serializer import SerializerError
    
    class KafkaAvroConsumer:
        def __init__(self, brokers, group, schema_registry_url):
            self.avro_consumer = AvroConsumer({
                'bootstrap.servers': brokers,
                'group.id': group,
                'auto.offset.reset': 'earliest',
                'schema.registry.url': schema_registry_url})
    
        def subscribe(self, topics):
            self.avro_consumer.subscribe(topics=topics)
    
        def pull_message(self):
            while True:
                try:
                    msg = self.avro_consumer.poll(2)
                except SerializerError as e:
                    print("Message deserialization failed for {}: {}".format(msg, e))
                    break
                if msg is None:
                    continue
                if msg.error():
                    print("AvroConsumer error: {}".format(msg.error()))
                    continue
                print(msg.key(), ": ", msg.value())
    
        def close(self):
            self.avro_consumer.close()
    
    if __name__ == "__main__":
        schema_registry_url = "http://127.0.0.1:8081"
        avroConsumer = KafkaAvroConsumer("192.168.0.105:9092", "test_group", schema_registry_url)
        avroConsumer.subscribe(["test1", "test2"])
        avroConsumer.pull_message()
        avroConsumer.close()

    5、AdminClient

    from confluent_kafka.admin import NewTopic, AdminClient
    
    class KafkaManager:
        def __init__(self, broker):
            self.admin_client = AdminClient({'bootstrap.servers': broker})
    
        def create_topics(self, topics, num_partition):
            new_topics = [NewTopic(topic, num_partitions=num_partition, replication_factor=1) for topic in topics]
            fs = self.admin_client.create_topics(new_topics)
            # Wait for each operation to finish.
            for topic, f in fs.items():
                try:
                    f.result()  # The result itself is None
                    print("Topic {} created".format(topic))
                except Exception as e:
                    print("Failed to create topic {}: {}".format(topic, e))
    
        def delete_topics(self, topics):
            fs = self.admin_client.delete_topics(topics=topics)
            for topic, f in fs.items():
                try:
                    f.result()  # The result itself is None
                    print("Topic {} deleted".format(topic))
                except Exception as e:
                    print("Failed to delete topic {}: {}".format(topic, e))
    
    if __name__ == "__main__":
        import time
        manager = KafkaManager("192.168.0.105:9092")
        manager.create_topics(["test3", "test4"], 1)
        time.sleep(3)
        manager.delete_topics(["test3", "test4"])
  • 相关阅读:
    学习Java书籍推荐和面试网站推荐
    Java 多线程学习扩展
    Java Excel 导入导出(二)
    Java Excel 导入导出(一)
    Matplotlib库(二)
    Matplotlib库(一)
    【转】MATLAB导出精美的论文插图
    图像的手绘效果
    Numpy库的使用(二)
    Numpy库的使用(一)
  • 原文地址:https://www.cnblogs.com/to-here/p/14345851.html
Copyright © 2011-2022 走看看