zoukankan      html  css  js  c++  java
  • 【kafka学习笔记】PHP接入kafka

    安装扩展

    # 先安装rdkfka库文件
    git clone https://github.com/edenhill/librdkafka.git
    
    或者:
    
    wget https://gitee.com/wanhedigital_1553363148/librdkafka/attach_files/619880/download/librdkafka-1.6.0.tar.gz -O librdkafka.tar.gz
    
    
    cd librdkafka/
    ./configure 
    make
    sudo make install
    
    pecl install   rdkafka
    
    vim [php]/php.ini
    extension=rdkafka.so
    

    生产者

    <?php
    $conf = new RdKafka\Conf();
    $conf->setDrMsgCb(function ($kafka, $message) {
        file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
    });
    $conf->setErrorCb(function ($kafka, $err, $reason) {
        file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
    });
    
    $rk = new RdKafka\Producer($conf);
    $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers("127.0.0.1");
    
    $cf = new RdKafka\TopicConf();
    // -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
    // 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
    $cf->set('request.required.acks', 0);
    $topic = $rk->newTopic("test", $cf);
    
    $option = 'qkl';
    for ($i = 0; $i < 20; $i++) {
        //RD_KAFKA_PARTITION_UA自动选择分区
        //$option可选
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
    }
    
    
    $len = $rk->getOutQLen();
    while ($len > 0) {
        $len = $rk->getOutQLen();
        var_dump($len);
        $rk->poll(50);
    }
    

    Low Level 消费者

    <?php
    $conf = new RdKafka\Conf();
    $conf->setDrMsgCb(function ($kafka, $message) {
        file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
    });
    $conf->setErrorCb(function ($kafka, $err, $reason) {
        file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
    });
    
    //设置消费组
    $conf->set('group.id', 'myConsumerGroup');
    
    $rk = new RdKafka\Consumer($conf);
    $rk->addBrokers("127.0.0.1");
    
    $topicConf = new RdKafka\TopicConf();
    $topicConf->set('request.required.acks', 1);
    //在interval.ms的时间内自动提交确认、建议不要启动
    //$topicConf->set('auto.commit.enable', 1);
    $topicConf->set('auto.commit.enable', 0);
    $topicConf->set('auto.commit.interval.ms', 100);
    
    // 设置offset的存储为file
    //$topicConf->set('offset.store.method', 'file');
    // 设置offset的存储为broker
     $topicConf->set('offset.store.method', 'broker');
    //$topicConf->set('offset.store.path', __DIR__);
    
    //smallest:简单理解为从头开始消费,其实等价于上面的 earliest
    //largest:简单理解为从最新的开始消费,其实等价于上面的 latest
    //$topicConf->set('auto.offset.reset', 'smallest');
    
    $topic = $rk->newTopic("test", $topicConf);
    
    // 参数1消费分区0
    // RD_KAFKA_OFFSET_BEGINNING 重头开始消费
    // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
    // RD_KAFKA_OFFSET_END 最后一条消费
    $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
    //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
    //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    
    while (true) {
        //参数1表示消费分区,这里是分区0
        //参数2表示同步阻塞多久
        $message = $topic->consume(0, 12 * 1000);
        if (is_null($message)) {
            sleep(1);
            echo "No more messages\n";
            continue;
        }
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    

    High LEVEL消费者

    <?php
    /**
     * Created by PhpStorm.
     * User: qkl
     * Date: 2018/8/22
     * Time: 17:58
     */
    $conf = new \RdKafka\Conf();
    
    function rebalance(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
        global $offset;
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign();
    //            $kafka->assign([new RdKafka\TopicPartition("qkl01", 0, 0)]);
                break;
    
            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;
    
            default:
                throw new \Exception($err);
        }
    }
    
    // Set a rebalance callback to log partition assignments (optional)
    $conf->setRebalanceCb(function(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
        rebalance($kafka, $err, $partitions);
    });
    
    // Configure the group.id. All consumer with the same group.id will consume
    // different partitions.
    $conf->set('group.id', 'test-110-g100');
    
    // Initial list of Kafka brokers
    $conf->set('metadata.broker.list', '192.168.216.122');
    
    $topicConf = new \RdKafka\TopicConf();
    
    $topicConf->set('request.required.acks', -1);
    //在interval.ms的时间内自动提交确认、建议不要启动
    $topicConf->set('auto.commit.enable', 0);
    //$topicConf->set('auto.commit.enable', 0);
    $topicConf->set('auto.commit.interval.ms', 100);
    
    // 设置offset的存储为file
    $topicConf->set('offset.store.method', 'file');
    $topicConf->set('offset.store.path', __DIR__);
    // 设置offset的存储为broker
    // $topicConf->set('offset.store.method', 'broker');
    
    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    $topicConf->set('auto.offset.reset', 'smallest');
    
    // Set the configuration to use for subscribed/assigned topics
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new \RdKafka\KafkaConsumer($conf);
    
    //$KafkaConsumerTopic = $consumer->newTopic('qkl01', $topicConf);
    
    // Subscribe to topic 'test'
    $consumer->subscribe(['qkl01']);
    
    echo "Waiting for partition assignment... (make take some time when\n";
    echo "quickly re-joining the group after leaving it.)\n";
    
    while (true) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
    //            $consumer->commit($message);
    //            $KafkaConsumerTopic->offsetStore(0, 20);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    
    消费组特别说明

    特别注意,High LEVEL消费者设置的消费组,kafka服务器才会记录, Low Level消费者设置的消费组,服务器不会记录

    常用脚本实践:https://segmentfault.com/a/1190000016106045

    分享一个打包好的php-rdkafka的类库

    https://github.com/qkl9527/php-rdkafka-class

    当你停下来休息的时候,不要忘记别人还在奔跑!
  • 相关阅读:
    JavaScript 各种遍历方式详解
    使用TortoiseGit操作分支的创建与合并
    前端实现图片压缩上传
    改变input的value值,同时在HTML中将value进行改变
    HTML中字体单位px pt em之间的转换
    解决PHP无法接收post超过1000个字段的问题
    解决微信不能扫描下载的问题
    实现类似于百度实时搜索将结果在下拉框中显示的功能
    Python导入路径含特殊字符的包
    大二秋学期记录
  • 原文地址:https://www.cnblogs.com/HappyTeemo/p/15739784.html
Copyright © 2011-2022 走看看