zoukankan      html  css  js  c++  java
  • php使用rdkafka进行消费

    • 如仅作为消费者或生产者,直接使用下面消费者或生产者的代码,并安装扩展即可。

    • PHP要安装rdkafka扩展,而rdkafka又依赖librdkafka,因此你需要安装rdkafka和librdkafka,之后就可以与kafka服务器交互了。

    • 如搭建kafka服务,需要jdk环境和zookeeper,以及kafka远程访问的配置,请参考

    消费者

    <?php
    /**
     * 代码中的输出注释都可以打开供调试使用
     * 对 中台生产的  用户信息 进行消费
     * Date: 2019/7/31
     */
    // 设置将要消费消息的主题
    $topic = 'alikafka-jl-yz-zt-updata-test';
    $host = '172.168.50.233';
    $group_id = 'CID_alikafka_jl_lz';
    
    $conf = new RdKafkaConf();
    // 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发
    $conf->setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
    //            echo "Assign: "; 
    //            var_dump($partitions);
                $kafka->assign($partitions);
                break;
            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
    //            echo "Revoke: ";
    //            var_dump($partitions);
                $kafka->assign(NULL);
                break;
            default:
                throw new Exception($err);
        }
    });
    // 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,
    // 所以同一个组内的消费者数量如果订阅了一个topic,
    // 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
    $conf->set('group.id', $group_id);
    
    // 添加 kafka集群服务器地址
    $conf->set('metadata.broker.list', $host); //'localhost:9092,localhost:9093,localhost:9094,localhost:9095'
    
    // 针对低延迟进行了优化的配置。这允许PHP进程/请求尽快发送消息并快速终止
    $conf->set('socket.timeout.ms', 50);
    //多进程和信号
    if (function_exists('pcntl_sigprocmask')) {
        pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
        $conf->set('internal.termination.signal', SIGIO);
    } else {
        $conf->set('queue.buffering.max.ms', 1);
    }
    
    $topicConf = new RdKafkaTopicConf();
    // 在interval.ms的时间内自动提交确认、建议不要启动, 1是启动,0是未启动
    $topicConf->set('auto.commit.enable', 1);
    $topicConf->set('auto.commit.interval.ms', 100);
    //smallest:简单理解为从头开始消费,largest:简单理解为从最新的开始消费
    $topicConf->set('auto.offset.reset', 'smallest');
    // 设置offset的存储为broker
    //$topicConf->set('offset.store.method', 'broker');
    // 设置offset的存储为file
    //$topicConf->set('offset.store.method', 'file');
    // 设置offset的存储路径
    $topicConf->set('offset.store.path', 'kafka_offset.log');
    //$topicConf->set('offset.store.path', __DIR__);
    
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new RdKafkaKafkaConsumer($conf);
    
    // 更新订阅集(自动分配partitions )
    $consumer->subscribe([$topic]);
    
    //        指定topic分配partitions使用那个分区
    //        $consumer->assign([
    //            new RdKafkaTopicPartition("zzy8", 0),
    //            new RdKafkaTopicPartition("zzy8", 1),
    //            ]);
    
    while (true) {
    //            设置120s为超时
        $message = $consumer->consume(3 * 1000);
        if (!empty($message)) {
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump('New message received :', $message);  // 打印消息
    //              拆解对象为数组,并根据业务需求处理数据
                    $payload = json_decode($message->payload,true);
                    $key = $message->key;
    //              根据kafka中不同key,调用对应方法传递处理数据*(如果有必要的话)
                    //对该条message进行处理,比如用户数据同步, 记录日志。
    //                var_dump("asasasasasasasasasasasas");
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more
    ";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out
    ";
                    var_dump("##################");
                    break;
                default:
                    var_dump("nothing");
                    throw new Exception($message->errstr(), $message->err);
                    break;
            }
    
        } else {
            var_dump('this is empty obj!!!');
        }
    }
    

    生产者

    <?php
    /**
     * Date: 2019/8/1
     */
    $conf = new RdKafkaConf();
    
    $conf->setDrmSgCb(function ($kafka, $message){
        file_put_contents("d:/dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
    });
    $conf->setErrorCb(function ($kafka, $err, $reason){
        file_put_contents("d:/err_cb.log",sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
    });
    $rk = new RdKafkaProducer($conf);
    $rk->setLogLevel(LOG_DEBUG);
    
    $rk->addBrokers("127.0.0.1");
    
    $cf = new RdKafkaTopicConf();
    // -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
    // 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
    $cf->set('request.required.acks', 0);
    $topic = $rk->newTopic("test", $cf);
    
    $option = 'qkl';
    for ($i = 0; $i < 10; $i++) {
        //RD_KAFKA_PARTITION_UA自动选择分区
        //$option可选
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message . $i", $option);
    }
    
    $len = $rk->getOutQLen();
    while ($len > 0) {
        $len = $rk->getOutQLen();
    //            var_dump($len);
        $rk->poll(10);
    }
    var_dump("finish");exit;
    
    • 列如:将消费者保存为consumer.php文件后,使用php命令行运行
  • 相关阅读:
    BUUCTF-[强网杯 2019]随便注
    Oracle 存储过程
    java.lang.OutOfMemoryError: Java heap space
    Oracle 约束
    Docker 学习1 容器技术基础入门
    Kubernetes 学习1 Devops 核心要点和k8s架构概述
    mysql Sql语句
    Shell 编程详解
    git 学习
    Linux awk学习
  • 原文地址:https://www.cnblogs.com/lz0925/p/11280654.html
Copyright © 2011-2022 走看看