zoukankan      html  css  js  c++  java
  • RdKafka文档翻译

    函数
    string rd_kafka_err2str ( integer $err ) 将rdkafka错误代码转换为字符串

    integer rd_kafka_errno2err ( integer $errnox ) 将系统errno转换为Kafka错误代码

    integer rd_kafka_errno ( void ) 返回系统errno

    integer rd_kafka_offset_tail ( integer $cnt ) 返回一个特殊的偏移量值,该值可用于在主题尾部之前开始使用cnt消息

    RdKafkaKafkaConsume类
    这是高水平消费者,支持自动分区/撤销(pecl rdkafka>=1.0.0,librdkafka>=0.9)

    1)public void RdKafkaKafkaConsumer::assign ([ array $topic_partitions = NULL ] )
    更新分配集到$topic_partitions,可以通过调用RdKafkaConf::setDefaultTopicConf()来更改主题的默认配置
    $kafkaConsumer->assign([
    new RdKafkaTopicPartition("logs", 0),
    new RdKafkaTopicPartition("logs", 1),
    ]);

    2)public void RdKafkaKafkaConsumer::commit ([ mixed $message_or_offsets = NULL ] )
    同步提交偏移,直到提交偏移或提交失败为止。
    如果注册了COMMIT_CB回调,那么它将被调用,并包含未来要使用的调用的提交详细信息。
    参数
    message_or_offsets
    When NULL, commit offsets for the current assignment.
    When a RdKafkaMessage, commit offset for a single topic+partition based on the message.
    When an array of RdKafkaTopicPartition, commit offsets for the provided list of partitions.
    异常
    Errors/Exceptions
    Throws RdKafkaException on errors.
    例子:
    // Commit offsets for the current assignment
    $kafkaConsumer->commit();

    // Commit offsets based on the message's topic, partition, and offset
    $kafkaConsumer->commit($message);

    // Commit offsets by providing a list of TopicPartition
    $kafkaConsumer->commit([
    new RdKafkaTopicPartition($topic, $partition, $offset),
    ]);

    3)public void RdKafkaKafkaConsumer::commitAsync ([ string $message_or_offsets = NULL ] )
    异步提交偏移
    4)public RdKafkaKafkaConsumer::__construct ( RdKafkaConf $conf )
    参数
    conf (RdKafkaConf)
    The conf object must have group.id set to the consumer group to join.
    conf对象必须将Group.id设置为要加入的消费者组。
    示例:
    $conf = new RdKafkaConf();
    $conf->set("group.id", "myGroupID");

    $kafkaConsumer = new RdKafkaKafkaConsumer($conf);
    5)public RdKafkaMessage RdKafkaKafkaConsumer::consume ( string $timeout_ms )
    使用消息或获取错误事件,触发回调
    将自动调用任何此类排队事件的已注册回调,包括rebalance_cb, event_cb, commit_cb, etc.
    参数
    timeout_ms (int) 超时时间(milliseconds)
    返回值
    Returns a RdKafkaMessage. On error or timeout, RdKafkaMessage::$err is != RD_KAFKA_ERR_NO_ERROR, and other properties should be ignored.
    注意:
    应用程序应确保定期调用consume (),即使没有预期的消息,为等待调用的排队回调提供服务,当rebalnce_cb已经注册时,这一点尤其重要,因为需要正确地调用和处理它,以同步内部使用者状态。


    while (true) {
    $message = $kafkaConsumer->consume(3600e3);
    switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    handle($message);
    break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timedout ";
    break;
    default:
    throw new Exception($message->errstr());
    break;
    }
    }

    6)public array RdKafkaKafkaConsumer::getAssignment ( void )
    返回由assign设置 或 再平衡的 当前分区分配集
    Returns the current partition assignment as set by RdKafkaKafkaConsumer::assign() or by rebalancing.
    返回值
    Returns an array of RdKafkaTopicPartition 返回RdKafkaTopic分区的数组
    Errors/Exceptions
    Throws RdKafkaException on errors.

    6)public RdKafkaMetadata RdKafkaKafkaConsumer::getMetadata ( bool $all_topics , RdKafkaKafkaConsumerTopic $only_topic = NULL , int $timeout_ms)
    向代理请求元数据

    参数
    all_topics (bool)
    When TRUE, request info about all topics in cluster. Else, only request info about locally known topics.如果为真,请求有关集群中所有主题的信息。否则,只请求有关本地已知主题的信息
    only_topic (RdKafkaKafkaConsumerTopic)
    When non-null, only request info about this topic当非空时,只请求有关此主题的信息。
    timeout_ms (int)
    Timeout (milliseconds) 超时

    返回值
    Returns a RdKafkaMetadata instance
    示例
    $all = $kafkaConsumer->metadata(true, NULL, 60e3);

    $local = $kafkaConsumer->metadata(false, NULL, 60e3);

    $topic = $kafkaConsumer->newTopic("myTopic");
    $one = $kafkaConsumer->metadata(true, $topic, 60e3);

    7)public array RdKafkaKafkaConsumer::getSubscription ( void )
    返回RdKafkaKafkaConsumer:subscribe()设置的当前订阅
    Return the current subscription as set by RdKafkaKafkaConsumer::subscribe()
    返回值
    Returns an array of topic names 返回主题名称数组

    8)public void RdKafkaKafkaConsumer::subscribe ( array $topics )
    将订阅集更新为主题。
    这将覆盖当前任务。任何先前的订阅都将首先被取消分配和取消订阅。
    订阅集表示要消费的所需主题.......
    可以通过调用RdKafkaConf::setDefaultTopicConf()更改订阅主题的默认配置。

    $kafkaConsumer->assign([
    "logs",
    "^myPfx[0-9]_.*",
    ]);

    9)public ReturnType RdKafkaKafkaConsumer::unsubscribe ( void )
    从当前订阅集取消订阅

    RdKafkaKafkaConsumerTopic类
    (PECL rdkafka >= 1.0.0, librdkafka >= 0.9)
    This class represents a topic when using the RdKafkaKafkaConsumer. It can not be instantiated directly, RdKafkaKafkaConsumer::newTopic() should be used instead.
    当想使用RdKafkaKafkaConsumer去表示一个主题的时候,不能直接实例化,应该使用RdKafkaKafkaConsumer::newTopic()替代

    1)public void RdKafkaKafkaConsumerTopic::offsetStore ( integer $partition , integer $offset )
    Store offset offset for topic partition partition. The offset will be commited (written) to the offset store according to auto.commit.interval.ms.
    auto.commit.interval.ms消费者offset提交到zookeeper的频率(以毫秒为单位)(0.9之后就默认存储再broke中了)
    auto.commit.enable must be set to false when using this API.使用此API时 auto.commit.enable必须设置为false,如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。
    auto.offset.reset largest 如果ZooKeeper中没有初始偏移量,或偏移值超出范围,
    该怎么办?
    最小:自动将偏移重置为最小偏移
    最大:自动将偏移重置为最大偏移
    * 其他任何事情:抛出异常消费者

    参数
    partition (integer)
    Partition ID
    offset (integer)
    Offset


    2)/* Inherited methods */
    public string RdKafkaTopic::getName ( void )

    RdKafka类
    (PECL rdkafka >= 0.9.1)
    This is the base class for low-level clients: RdKafkaConsumer, RdKafkaProducer. This class can not be instanciated directly, use one of the sub classes instead.
    这是低级消费者客户端的基类:RdKafkaConsumer,RdKafkaProducer。不能直接实例化这个类,而是使用其中一个子类。

    1)public integer RdKafka::addBrokers ( string $broker_list )
    将一个或多个代理添加到Kafka句柄的初始引导代理列表中。
    当rdkafka通过查询代理元数据连接到代理时,将自动发现其他代理。
    如果代理名称解析为多个地址(可能是地址家族),则所有代理名称都将以循环方式用于连接尝试。
    返回值
    Returns the number of brokers successfully added.成功添加的代理个数

    代理还可以使用metadata.broker.list或bootstrap.server配置属性(首选方法)进行定义。

    $kafka->addBrokers("broker1:10000,broker2");

    $kafka->addBrokers("SSL://broker3:9000,ssl://broker2");

    2)public RdKafkaMetadata RdKafka::getMetadata ( bool $all_topics , RdKafkaTopic $only_topic = NULL , int $timeout_ms )
    Request Metadata from broker 向代理请求元数据
    上面有一个

    3)public integer RdKafka::getOutQLen ( void )
    返回当前的输出队列长度。Out队列包含等待发送给代理的消息,或代理知道的消息。

    3)public RdKafkaQueue RdKafka::newQueue ( void )
    创建一个新的消息队列实例
    Return Values
    Returns a RdKafkaQueue.

    4)public RdKafkaTopic RdKafka::newTopic ( string $topic_name [, RdKafkaTopicConf $topic_conf = NULL ] )
    Creates a new topic instance for topic_name.为Topic_Name创建一个新的主题实例。
    Returns a RdKafkaTopic (more specifically, either a RdKafkaConsumerTopic or a RdKafkaProducerTopic).

    为具有不同配置的同一主题名称创建两个主题实例没有任何效果。每个主题实例都将使用第一个实例的配置。
    $conf = new RdKafkaTopicConf();
    $conf->set("...", "...");

    $topic = $kafka->newTopic("myTopic", $conf);


    4)public void RdKafka::poll ( integer $timeout_ms )
    对于事件的轮询,导致调用应用程序提供的回调
    使用rdKafka子类的应用程序应该确保定期调用poll(),以便为等待调用的任何排队回调服务。

    Events:
    Delivery report callbacks RdKafkaConf::setDrMsgCb() [producer]
    Error callbacks (RdKafkaConf::setErrorCb())
    Stats callbacks (RdKafkaConf::setStatsCb())
    Throttle callbacks (RdKafkaConf::setThrottleCb())

    Parameters
    timeout_ms (integer)
    Specifies the maximum amount of time (in milliseconds) that the call will block waiting for events. For non-blocking calls, provide 0 as timeout_ms. To wait indefinately for an event, provide -1.
    指定调用将阻止等待事件的最大时间(以毫秒为单位)。对于非阻塞调用,提供0作为timeout_ms。若要不确定地等待某个事件,请提供-1。

    Return Values
    Returns the number of events served.返回服务的事件数

    5)public void RdKafka::setLogLevel ( integer $level )
    指定内部Kafka日志记录和调试产生的最大日志记录级别。如果设置了“DEBUG”配置属性,该级别将自动调整为LOG_DEBUG。

    Parameters
    level (integer)
    Log level. Can take any LOG_* constant (see the syslog function).日志级别。可以接受任何log_*常量(请参阅syslog函数)。

    RdKafkaConsumer 类
    This is the low-level Kafka consumer. It can be used with Kafka >= 0.8.
    低级消费者
    1)public RdKafkaConsumer::__construct ([ RdKafkaConf $conf = NULL ] )
    Parameters
    conf (RdKafkaConf)
    An optional RdKafkaConf instance.

    此类只有继承(低级消费者基类RdKafka)的以下几个方法
    RdKafkaConsumer extends RdKafka {
    /* Methods */
    /* Inherited methods */
    public integer RdKafka::addBrokers ( string $broker_list )
    public RdKafkaMetadata RdKafka::getMetadata ( bool $all_topics , RdKafkaTopic $only_topic = NULL , int $timeout_ms )
    public integer RdKafka::getOutQLen ( void )
    public RdKafkaQueue RdKafka::newQueue ( void )
    public RdKafkaTopic RdKafka::newTopic ( string $topic_name [, RdKafkaTopicConf $topic_conf = NULL ] )
    public void RdKafka::poll ( integer $timeout_ms )
    public void RdKafka::setLogLevel ( integer $level )
    }
    RdKafkaProducer类
    (PECL rdkafka >= 0.9.1)

    1)public RdKafkaProducer::__construct ([ RdKafkaConf $conf = NULL ] )
    Parameters
    conf (RdKafkaConf)
    An optional RdKafkaConf instance.

    RdKafkaProducer extends RdKafka {
    /* Methods */
    /* Inherited methods */
    public integer RdKafka::addBrokers ( string $broker_list )
    public RdKafkaMetadata RdKafka::getMetadata ( bool $all_topics , RdKafkaTopic $only_topic = NULL , int $timeout_ms )
    public integer RdKafka::getOutQLen ( void )
    public RdKafkaQueue RdKafka::newQueue ( void )
    public RdKafkaTopic RdKafka::newTopic ( string $topic_name [, RdKafkaTopicConf $topic_conf = NULL ] )
    public void RdKafka::poll ( integer $timeout_ms )
    public void RdKafka::setLogLevel ( integer $level )
    }

    RdKafkaTopic类
    (PECL rdkafka >= 0.9.1)

    1)public string RdKafkaTopic::getName ( void )
    Returns the topic name.返回主题名称

    RdKafkaConsumerTopic 类
    (PECL rdkafka >= 0.9.1)

    当使用RdKafkaConsumer时,该类表示一个主题。不能直接实例化它,应该使用RdKafkaConsumer:newTopic()。

    1)public RdKafkaMessage RdKafkaConsumerTopic::consume ( integer $partition , integer $timeout_ms )
    消费-使用来自分区的单个消息
    消费者之前必须使用 RdKafkaConsumerTopic::consumeStart().
    必须检查返回消息的ERR属性是否存在错误。
    Err属性等于RD_Kafka_RESP_ERR_PARY_EOF,表示已到达分区的结束,通常不应将其视为错误。应用程序应该处理这种情况(例如,忽略)。
    Parameters
    partition (integer)
    The partition to consume
    timeout_ms
    The maximum amount of time to wait for a message to be received.

    Returns a RdKafkaMessage or NULL on timeout. 正常返回RdKafkaMessage,超时返回NULL。

    2)public void RdKafkaConsumerTopic::consumeQueueStart ( integer $partition , integer $offset , RdKafkaQueue $queue )
    与RdKafkaConsumerTopic::consumerTopic()相同,但将传入消息重新路由到提供的队列。应用程序必须使用一个RdKafkaQueue::consumer*()函数来接收获取的消息。
    参数
    partition (integer)
    Partition ID
    offset (integer)
    Offset
    queue (RdKafkaQueue)
    A RdKafkaQueue instance

    3)public void RdKafkaConsumerTopic::consumeStart ( integer $partition , integer $offset )
    开始在偏移量处使用分区的消息(请参阅参数中允许的值)。

    librdkafka将尝试通过反复从代理获取批消息,直到达到阈值,从而将queued.min.messages (config属性)消息保留在本地队列中。
    应用程序应该使用RdKafkaConsumerTopic::consumeStart()方法来使用本地队列中的消息,每个Kafka消息都表示为RdKafkaMessage对象。
    对于同一个主题和分区,不能多次调用RdKafkaConsumerTopic::consumeStart()。在没有停止消费的情况下,先使用RdKafkaConsumerTopic::consumeStop()停止消费后再开始消费。

    Parameters
    partition (integer)
    Partition ID
    offset (integer)
    Can be either a proper offset (0..N), or one the the special offset:
    可以是正常的偏移量(0.N),也可以是特殊的偏移量:
    RD_KAFKA_OFFSET_BEGINNING
    RD_KAFKA_OFFSET_END
    RD_KAFKA_OFFSET_STORED
    The return value of rd_kafka_offset_tail()
    示例:
    $partition = 123;

    // consume from the end
    $topic->consumeStart($partition, RD_KAFKA_OFFSET_END);

    // consume from the stored offset
    $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);

    // consume 200 messages from the end
    $topic->consumeStart($partition, rd_kafka_offset_tail(200));

    4)public void RdKafkaConsumerTopic::consumeStop ( integer $partition )
    Stop consuming messages from partition停止使用来自分区的消息
    停止使用分区消息,清除当前本地队列中的所有消息。

    5)public void RdKafkaConsumerTopic::offsetStore ( integer $partition , integer $offset )
    store offset存储offset
    Parameters
    partition (integer)
    Partition ID
    offset (integer)
    Offset

    RdKafkaProducerTopic类
    (PECL rdkafka >= 0.9.1)
    当使用RdKafkaProducer时,该类表示一个主题。不能直接实例化它,应该使用RdKafkaProducer::newTopic().

    RdKafkaProducerTopic extends RdKafkaTopic {
    /* Methods */
    public void produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] )
    /* Inherited methods */
    public string RdKafkaTopic::getName ( void )
    }

    1)public void RdKafkaProducerTopic::produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] )
    生成并向代理发送一条消息。这是一个异步和非阻塞的。
    Parameters
    partition (integer)
    Can be either RD_KAFKA_PARTITION_UA (unassigned) for automatic partitioning using the topic's partitioner function (see RdKafkaTopicConf::setPartitioner(), or a fixed partition (0..N).
    msgflags (integer)
    可以是RD_Kafka_PARID_UA(未分配的),用于使用主题的分区函数(请参见RdKafkaTopicConf::setPartitioner(),也可以是固定的分区(0.N)。
    msgflags (integer)
    Must be 0
    payload (string)
    Payload string
    key (string)
    Optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.
    可选消息键,如果非空,则将其传递给主题分区程序,并与消息一起发送给代理并传递给使用者。

    $message = [
    'type' => 'account-created',
    'id' => $accountId,
    'date' => date(DATE_W3C),
    ];
    $payload = json_encode($message);
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload, $accountId);with the message to the broker and passed on to the consumer.

    RdKafkaQueuel类
    (PECL rdkafka >= 0.9.1)
    1)public RdKafkaMessage RdKafkaQueue::consume ( string $timeout_ms )
    使用一条消息
    Parameters
    timeout_ms
    The maximum amount of time to wait for a message to be received.
    Return Values
    Returns a RdKafkaMessage or NULL on timeout.

    RdKafkaMessage 类
    (PECL rdkafka >= 0.9.1)
    此对象表示单个已消费或生产的消息或事件(设置了$err)。
    This object represents either a single consumed or produced message, or an event ($err is set).
    An application must check RdKafkaMessage::err to see if the object is a proper message (error is RD_KAFKA_RESP_ERR_NO_ERROR) or an error event.

    RdKafkaMessage {
    /* Properties */
    public $err ; //Error code
    public $topic_name ;
    public $partition ;
    public $payload ;
    public $key ;
    public $offset ;
    /* Methods */
    public string errstr ( void )
    }

    1)public string RdKafkaMessage::errstr ( void )
    这是一种方便的方法,将错误作为字符串返回
    Return Values
    The error as a string
    if ($message->err) {
    echo $message->errstr(), " ";
    }


    RdKafkaConf 类
    (PECL rdkafka >= 0.9.1)
    This class holds configuration for consumers and producers.
    A list of available properties can be found on the »librdkafka repository. Note that available configuration properties and default values may change depending on the librdkafka version.
    该类包含使用者和生产者的配置
    请注意,可用的配置属性和默认值可能会根据librdkafka 版本而改变。

    RdKafkaConf {
    /* Methods */
    public void dump ( void )
    public void set ( string $name , string $value )
    public void setDefaultTopicConf ( RdKafkaTopicConf $topic_conf )
    public void setDrMsgCb ( callable $callback )
    public void setErrorCb ( callable $callback )
    public void setRebalanceCb ( callable $callback )
    }

    1)public void RdKafkaConf::dump ( void )
    Dumps the configuration properties and values to an array.
    转储配置属性和值到数组
    Return Values
    Returns an array with configuration properties as keys, and configuration values as values.

    2)public void RdKafkaConf::set ( string $name , string $value )
    Set configuration property name to value.
    设置配置属性 属性名=>属性值

    3)public void RdKafkaConf::setDefaultTopicConf ( RdKafkaTopicConf $topic_conf )
    设置用于自动订阅主题的默认主题配置。可以与RdKafkaKafkaConsumer::subscribe()或者RdKafkaKafkaConsumer::assign()一起使用
    Sets the default topic configuration to use for for automatically subscribed topics. This can be used along with RdKafkaKafkaConsumer::subscribe() or RdKafkaKafkaConsumer::assign().

    4)public void RdKafkaConf::setDrMsgCb ( callable $callback )
    设置传递报告回调,对于RdKafkaProducerTopic::Producer()接受的每条消息,将调用一次传递报告回调,并将ERR设置为指示生产请求的结果。

    当消息成功地生成时,或者如果librdkafka 遇到永久故障,或者临时错误的重试计数器已经耗尽,就会调用回调。

    应用程序必须定期调用rdKafka::poll(),以便为排队的传递报告回调服务。

    Parameters
    callback (callable)
    A callable with the following signature:
    <?php
    /**
    * @param RdKafkaKafka $kafka
    * @param RdKafkaMessage $message
    */
    function (RdKafkaKafka $kafka, RdKafkaMessage $message);

    $conf->setDrMsgCb(function ($kafka, $message) {
    if ($message->err) {
    // message permanently failed to be delivered
    } else {
    // message successfully delivered
    }
    });


    5)public void RdKafkaConf::setErrorCb ( callable $callback )

    设置错误回调。librdkafka 使用错误回调将ciritcal错误信号发送回应用程序。

    Parameters
    callback (callable)
    A callable with the following signature:
    <?php
    /**
    * @param object $kafka
    * @param int $err
    * @param string $reason
    */
    function ($kafka, $err, $reason);

    <?php
    $conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s) ", rd_kafka_err2str($err), $reason);
    });
    ?>


    6)public void RdKafkaConf::setRebalanceCb ( callable $callback )

    Set rebalance callback for use with coordinated consumer group balancing.
    设置“再平衡回调”,以便与协调的消费者组 平衡一起使用。

    Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.
    注册一个reBalance_cb会关闭librdkafka的自动分区分配/撤销,而是将这一责任委托给应用程序的reBalance_cb。

    The rebalance callback is responsible for updating librdkafka's assignment set based on the two events RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONSand RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.
    重新平衡回调负责根据这两个事件RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS和RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS更新librdkafka的分配集,但也应该能够再平衡处理任意的不止这些的失败。

    In this latter case (arbitrary error), the application must $kafka->assign(NULL) to synchronize (同步)state.
    后一种情况 必须使用assign去同步状态

    在没有重新平衡回调的情况下,这是由librdkafka自动完成的,但是注册一个重新平衡回调会使应用程序在执行其他操作时具有灵活性,同时还可以执行排序/撤销操作(assinging/revocation)。例如从另一个位置获取偏移量(在赋值时)或手动提交偏移量(在REVOKE上)。

    Parameters
    callback (callable)
    A callable with the following signature:
    <?php
    /**
    * @param RdKafkaKafkaConsumer $kafka
    * @param int $err
    * @param array $partitions
    */
    function (RdKafkaKafkaConsumer $kafka, $err, $partitions);
    ERR参数被设置为RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS或RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS(或意外错误)。
    partitions参数是RdKafkaTopicPartition数组,表示分配或撤销的完整分区集。

    <?php
    $conf->setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
    // application may load offets from arbitrary external
    // storage here and update partitions
    $kafka->assign($partitions);
    break;

    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
    if ($manual_commits) {
    // Optional explicit manual commit
    $kafka->commit($partitions);
    }
    $kafka->assign(NULL);
    break;

    default:
    handle_unlikely_error($err);
    $kafka->assign(NULL); // sync state同步状态
    break;
    }
    }
    ?>

    RdKafkaTopicConf类
    (PECL rdkafka >= 0.9.1)

    该类保存主题topic实例的配置。
    A list of available properties can be found on the » librdkafka repository. Note that available configuration properties and default values may change depending on the librdkafka version.
    注意配置属性依赖版本 , 可以从librdkafka仓库中查看详细配置

    RdKafkaTopicConf {
    /* Methods */
    public void dump ( void )
    public void set ( void )
    public void setPartitioner ( integer $partitioner )
    }

    1)public void RdKafkaTopicConf::dump ( void )
    将配置属性和值转储到数组。
    返回一个数组,其中配置属性作为键,配置值作为值。

    2)public void RdKafkaTopicConf::set ( void )
    Set configuration property name to value.

    3)public void RdKafkaTopicConf::setPartitioner ( integer $partitioner )
    将分区器设置为根据keys将消息路由到分区。
    Parameters
    partitioner (integer)
    Must be one of the RD_KAFKA_MSG_PARTITIONER_* constants.
    必须是RD_Kafka_MSG_Partiator_*常量之一。

    RdKafkaException类
    (PECL rdkafka >= 0.9.1)

    RdKafka Exception.异常类
    RdKafkaException extends Exception {
    /* Inherited properties */
    protected string $message ;
    protected int $code ;
    protected string $file ;
    protected int $line ;
    /* Methods */
    /* Inherited methods */
    final public string Exception::getMessage ( void )
    final public Exception Exception::getPrevious ( void )
    final public mixed Exception::getCode ( void )
    final public string Exception::getFile ( void )
    final public int Exception::getLine ( void )
    final public array Exception::getTrace ( void )
    final public string Exception::getTraceAsString ( void )
    public string Exception::__toString ( void )
    final private void Exception::__clone ( void )
    }

    RdKafkaTopicPartition类
    (PECL rdkafka >= 1.0.0, librdkafka >= 0.9)

    Topic+Partition 主题加分区
    RdKafkaTopicPartition {
    /* Methods */
    public integer getOffset ( void )
    public integer getPartition ( void )
    public string getTopic ( void )
    public void setOffset ( string $offset )
    public ReturnType setPartition ( string $partition )
    public ReturnType setTopic ( string $topic_name )
    }

    1)public RdKafkaTopicPartition::__construct ( string $topic , integer $partition [, integer $offset = NULL ] )
    Parameters
    topic (string)
    Topic name
    partition (integer)
    Partition ID
    offset (integer)
    Offset
    <?php
    new RdKafkaTopicPartition("myTopic", 1);
    ?>

    2)public integer RdKafkaTopicPartition::getOffset ( void )
    获取偏移量

    3)public integer RdKafkaTopicPartition::getPartition ( void )
    Gets the partition ID. 得到分区id

    4)public string RdKafkaTopicPartition::getTopic ( void )
    Gets the topic name. 得到主题

    5)public void RdKafkaTopicPartition::setOffset ( string $offset )
    Sets the offset. 设置偏移量

    6)public ReturnType RdKafkaTopicPartition::setPartition ( string $partition )
    Sets the partition ID.

    7)public ReturnType RdKafkaTopicPartition::setTopic ( string $topic_name )
    Sets the topic name.

    RdKafkaMetadata类
    (PECL rdkafka >= 0.9.1)
    The Metadata class represents broker information. Metadata instances are returned by RdKafka::getMetadata() and RdKafkaKafkaConsumer::getMetadata().
    元数据类表示代理信息。元数据实例由RdKafka::getMetadata() 和RdKafkaKafkaConsumer::getMetadata()返回。

    RdKafkaMetadata {
    /* Methods */
    public RdKafkaMetadataCollection getBrokers ( void )
    public int getOrigBrokerId ( void )
    public string getOrigBrokerName ( void )
    public RdKafkaMetadataCollection getTopics ( void )
    }

    1)public RdKafkaMetadataCollection RdKafkaMetadata::getBrokers ( void )
    Get broker list
    Returns a RdKafkaMetadataCollection of RdKafkaMetadataBroker
    2)public int RdKafkaMetadata::getOrigBrokerId ( void )
    获取源自此元数据的代理id

    3)public string RdKafkaMetadata::getOrigBrokerName ( void )
    获取源自此元数据的代理名称。

    4)public RdKafkaMetadataCollection RdKafkaMetadata::getTopics ( void )
    获取主题列表。根据元数据的请求方式,这可能包含单个主题、本地已知主题列表或所有集群主题。
    Returns a RdKafkaMetadataCollection of RdKafkaMetadataTopic

    RdKafkaMetadataCollection类
    (PECL rdkafka >= 0.9.1)

    集合类用作元数据项的集合。它实现了 Countable and Iterable,因此它可以与count()和foreach一起使用

    RdKafkaMetadataCollection implements Countable , Iterator {
    /* Methods */
    public int count ( void )
    public mixed current ( void )
    public scalar key ( void )
    public void next ( void )
    public void rewind ( void )
    public boolean valid ( void )
    }

    1)public int RdKafkaMetadataCollection::count ( void )
    Returns the number of elements as integer 返回元素数量

    2)public mixed RdKafkaMetadataCollection::current ( void )
    Gets the current value. 获取到当前的值
    返回值:
    The current value if it is valid or NULL otherwise.

    3)public scalar RdKafkaMetadataCollection::key ( void )
    Get the current key.
    返回值:
    The current key if it is valid or NULL otherwise.

    4)public void RdKafkaMetadataCollection::next ( void )
    移到下一个元素。
    5)public void RdKafkaMetadataCollection::rewind ( void )
    将Iterator倒转到第一个元素

    6)public boolean RdKafkaMetadataCollection::valid ( void )
    Checks if current position is valid 检查当前位置是否有效
    Returns TRUE on success or FALSE on failure.

    Predefined Constants
    The constants below are defined by this extension, and will only be available when the extension has either been compiled into PHP or dynamically loaded at runtime.
    下面的常量是由这个扩展定义的,并且只有当扩展编译到PHP或在运行时动态加载时才可用。
    RD_KAFKA_CONSUMER (integer)
    RD_KAFKA_OFFSET_BEGINNING (integer)
    Start consuming from beginning of kafka partition queue: oldest msg.
    RD_KAFKA_OFFSET_END (integer)
    Start consuming from end of kafka partition queue: next msg.
    RD_KAFKA_OFFSET_STORED (integer)
    Start consuming from offset retrieved from offset store.
    RD_KAFKA_PARTITION_UA (integer)
    The unassigned partition is used by the producer API for messages that should be partitioned using the configured or default partitioner.
    RD_KAFKA_PRODUCER (integer)
    RD_KAFKA_VERSION (integer)
    RD_KAFKA_RESP_ERR__BEGIN (integer)
    RD_KAFKA_RESP_ERR__BAD_MSG (integer)
    Local: Bad message format
    RD_KAFKA_RESP_ERR__BAD_COMPRESSION (integer)
    Local: Invalid compressed data
    RD_KAFKA_RESP_ERR__DESTROY (integer)
    Local: Broker handle destroyed
    RD_KAFKA_RESP_ERR__FAIL (integer)
    Local: Communication failure with broker
    RD_KAFKA_RESP_ERR__TRANSPORT (integer)
    Local: Broker transport failure
    RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE (integer)
    Local: Critical system resource failure
    RD_KAFKA_RESP_ERR__RESOLVE (integer)
    Local: Host resolution failure
    RD_KAFKA_RESP_ERR__MSG_TIMED_OUT (integer)
    Local: Message timed out
    RD_KAFKA_RESP_ERR__PARTITION_EOF (integer)
    Broker: No more messages
    RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION (integer)
    Local: Unknown partition
    RD_KAFKA_RESP_ERR__FS (integer)
    Local: File or filesystem error
    RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC (integer)
    Local: Unknown topic
    RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN (integer)
    Local: All broker connections are down
    RD_KAFKA_RESP_ERR__INVALID_ARG (integer)
    Local: Invalid argument or configuration
    RD_KAFKA_RESP_ERR__TIMED_OUT (integer)
    Local: Timed out
    RD_KAFKA_RESP_ERR__QUEUE_FULL (integer)
    Local: Queue full
    RD_KAFKA_RESP_ERR__ISR_INSUFF (integer)
    Local: ISR count insufficient
    RD_KAFKA_RESP_ERR__NODE_UPDATE (integer)
    Local: Broker node update
    RD_KAFKA_RESP_ERR__SSL (integer)
    Local: SSL error
    RD_KAFKA_RESP_ERR__WAIT_COORD (integer)
    Local: Waiting for coordinator
    RD_KAFKA_RESP_ERR__UNKNOWN_GROUP (integer)
    Local: Unknown group
    RD_KAFKA_RESP_ERR__IN_PROGRESS (integer)
    Local: Operation in progress
    RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS (integer)
    Local: Previous operation in progress
    RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION (integer)
    Local: Existing subscription
    RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS (integer)
    Local: Assign partitions
    RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS (integer)
    Local: Revoke partitions
    RD_KAFKA_RESP_ERR__CONFLICT (integer)
    Local: Conflicting use
    RD_KAFKA_RESP_ERR__STATE (integer)
    Local: Erroneous state
    RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL (integer)
    Local: Unknown protocol
    RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED (integer)
    Local: Not implemented
    RD_KAFKA_RESP_ERR__AUTHENTICATION (integer)
    Local: Authentication failure
    RD_KAFKA_RESP_ERR__NO_OFFSET (integer)
    Local: No offset stored
    RD_KAFKA_RESP_ERR__END (integer)
    RD_KAFKA_RESP_ERR_UNKNOWN (integer)
    Unknown broker error
    RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE (integer)
    Broker: Offset out of range
    RD_KAFKA_RESP_ERR_INVALID_MSG (integer)
    Broker: Invalid message
    RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART (integer)
    Broker: Unknown topic or partition
    RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE (integer)
    Broker: Invalid message size
    RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE (integer)
    Broker: Leader not available
    RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION (integer)
    Broker: Not leader for partition
    RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT (integer)
    Broker: Request timed out
    RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE (integer)
    Broker: Broker not available
    RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE (integer)
    Broker: Replica not available
    RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE (integer)
    Broker: Message size too large
    RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH (integer)
    Broker: StaleControllerEpochCode
    RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE (integer)
    Broker: Offset metadata string too large
    RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION (integer)
    Broker: Broker disconnected before response received
    RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS (integer)
    Broker: Group coordinator load in progress
    RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE (integer)
    Broker: Group coordinator not available
    RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP (integer)
    Broker: Not coordinator for group
    RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION (integer)
    Broker: Invalid topic
    RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE (integer)
    Broker: Message batch larger than configured server segment size
    RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS (integer)
    Broker: Not enough in-sync replicas
    RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND (integer)
    Broker: Message(s) written to insufficient number of in-sync replicas
    RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS (integer)
    Broker: Invalid required acks value
    RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION (integer)
    Broker: Specified group generation id is not valid
    RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL (integer)
    Broker: Inconsistent group protocol
    RD_KAFKA_RESP_ERR_INVALID_GROUP_ID (integer)
    Broker: Invalid group.id
    RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID (integer)
    Broker: Unknown member
    RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT (integer)
    Broker: Invalid session timeout
    RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS (integer)
    Broker: Group rebalance in progress
    RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE (integer)
    Broker: Commit offset data size is not valid
    RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED (integer)
    Broker: Topic authorization failed
    RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED (integer)
    Broker: Group authorization failed
    RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED (integer)
    Broker: Cluster authorization failed
    RD_KAFKA_CONF_UNKNOWN (integer)
    RD_KAFKA_CONF_INVALID (integer)
    RD_KAFKA_CONF_OK (integer)
    RD_KAFKA_MSG_PARTITIONER_RANDOM (integer)
    The random partitioner. This was the default partitioner in librdkafka 0.8. Assigns partition randomly.
    RD_KAFKA_MSG_PARTITIONER_CONSISTENT (integer)
    The consistent partitioner. Uses consistent hashing to map identical keys onto identical partitions. Uses CRC32 as hashing function. Messages with no key or empty key are always assigned to the same partition.
    RD_KAFKA_LOG_PRINT (integer)
    The print logger. Prints messages to stderr.
    RD_KAFKA_LOG_SYSLOG (integer)
    The syslog logger. Sends messages to syslog.
    RD_KAFKA_LOG_SYSLOG_PRINT (integer)
    The syslog-print partitioner. Sends messages to syslog and prints them to stderr.

  • 相关阅读:
    如何利用 JConsole观察分析Java程序的运行,进行排错调优
    【解决】网站运行一段时间后就无法访问,重启Tomcat才能恢复
    不允许一个用户使用一个以上用户名与一个服务器或共享
    SVN升级到1.8后 Upgrade working copy
    Windows Server 2012 R2 创建AD域
    JTA 深度历险
    svn merge error must be ancestrally related to,trunk merge branch报错
    OutputStream-InputStream-FileOutputStream-FileInputStream-BufferedOutputStream-BufferedInputStream-四种复制方式-单层文件夹复制
    SVN提交时响应很慢,我是这样解决的。
    docker学习6-docker-compose容器集群编排
  • 原文地址:https://www.cnblogs.com/WebLinuxStudy/p/11547428.html
Copyright © 2011-2022 走看看