zoukankan      html  css  js  c++  java
  • php-kafka

    1、环境依赖

    The extension support both PHP 5 and PHP 7.
    The extension requires » librdkafka >= 0.8 for basic
    functionality, and >= 0.9 for the high level consumer.
     

    2、安装librdkafka

    git clone https://github.com/edenhill/librdkafka.git
    ./configure
    make
    sudo make install
     

    3、安装php-rdkafka

    sudo pecl install rdkafka
    #Add the following line to your php.ini file:
    extension=rdkafka.so
    重启服务器,查看phpinfo,即安装好了rdkafka拓展
     

    4、使用

    producer:

    <?php
    $rk = new RdKafkaProducer();
        $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers("127.0.0.1");
    $topic = $rk->newTopic("test");
    for ($i = 0; $i < 10; $i++) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
            $rk->poll(0);
    }
    while ($rk->getOutQLen() > 0) {
            $rk->poll(50);
    }
    ?>

    Low level consumer:

    <?php
    $conf = new RdKafkaConf();
    // Set the group id. This is required when storing offsets on the broker
    $conf->set('group.id', 'myConsumerGroup');
    $rk = new RdKafkaConsumer($conf);
    $rk->addBrokers("127.0.0.1");
    $topicConf = new RdKafkaTopicConf();
        $topicConf->set('auto.commit.interval.ms', 100);
    // Set the offset store method to 'file'
    $topicConf->set('offset.store.method', 'file');
        $topicConf->set('offset.store.path', sys_get_temp_dir());
    // Alternatively, set the offset store method to 'broker'
    // $topicConf->set('offset.store.method', 'broker');
    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    $topicConf->set('auto.offset.reset', 'smallest');
    $topic = $rk->newTopic("test", $topicConf);
    // Start consuming partition 0
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    while (true) {
        $message = $topic->consume(0, 120*10000);
        switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more
    ";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out
    ";
                    break;
                default:
                    throw new Exception($message->errstr(), $message->err);
                break;
        }
    }
    ?>

    High-level consumer:

    <?php
    $conf = new RdKafkaConf();
    // Set a rebalance callback to log partition assignments (optional)
    $conf->setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    echo "Assign: ";
                    var_dump($partitions);
                    $kafka->assign($partitions);
                break;
             case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                 echo "Revoke: ";
                 var_dump($partitions);
                     $kafka->assign(NULL);
                 break;
             default:
                throw new Exception($err);
        }
    });
    // Configure the group.id. All consumer with the same group.id will consume
    // different partitions.
    $conf->set('group.id', 'myConsumerGroup');
    // Initial list of Kafka brokers
    $conf->set('metadata.broker.list', '127.0.0.1');
    $topicConf = new RdKafkaTopicConf();
    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    $topicConf->set('auto.offset.reset', 'smallest');
    // Set the configuration to use for subscribed/assigned topics
    $conf->setDefaultTopicConf($topicConf);
    $consumer = new RdKafkaKafkaConsumer($conf);
    // Subscribe to topic 'test'
    $consumer->subscribe(['test']);
    echo "Waiting for partition assignment... (make take some time when
    ";
    echo "quickly re-joining the group after leaving it.)
    ";
    while (true) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more
    ";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out
    ";
                    break;
                default:
                    throw new Exception($message->errstr(), $message->err);
                break;
        }
    }
    ?>
  • 相关阅读:
    重读APUE(7)-link/unlink与mkdir/rmdir
    重读APUE(6)-umask
    社交系统中用户好友关系数据库设计
    修改Web项目的名称后,在TomCat中部署项目,项目的名称还是与原来相同的解决方案
    域名解析TTL是什么意思 TTL值设置为多少合适?
    mysql权限控制—新建用户允许其远程连接
    毕业设计技术方向(转载)
    统一资源定位符URL的组成
    开发中model,entity和pojo的区别
    要不要签三方协议
  • 原文地址:https://www.cnblogs.com/leedaily/p/8458851.html
Copyright © 2011-2022 走看看