zoukankan      html  css  js  c++  java
  • 搭建kafka高级消费 (high-consumer)php7

    说明:有很多同学在服务器上搭建好,kafka,在应用端使用kafka时候出现很多问题,这里提供下我的kafka生产和消费的php函数

    环境说明:

    1:首先php要有kafka扩展,在命令行中输入 php -m  看是否有rdkafka 

    没有的话需要安装配置下:

    --------------- kafka php客户端安装(php-rdkafka) --------------
    1.安装 librdkafka
    git clone https://github.com/edenhill/librdkafka
    cd librdkafka
    ./configure
    make
    sudo make install

    2.安装php-rdkafka
    git clone https://github.com/arnaud-lb/php-rdkafka.git
    cd php-rdkafka
    phpize
    ./configure
    make all -j 5
    sudo make install

    vi /usr/local/lib/php.ini
    加入 extension=rdkafka.so

    2:在kafka控制器中我直接贴出来我的生产和消费函数:

     /**
         * 生产单个消息
         * @param string $topic
         * @param null $post
         */
        function kafka_produce($key=null,$post=null)
        {
            $rk = new RdKafkaproducer();
            $rk->setLogLevel(LOG_DEBUG);
            $rk->addBrokers("localhost:9092,localhost:9093,localhost:9094,localhost:9095");
            $topics = $rk->newTopic('engine.com');
            $topics->produce(1, 0,$post,$key);
    
            echo 'kafka_produce success!!!';
        }
    /**
         * 高级消费模式
         * @param $topic
         * @return int
         * @throws Exception
         */
        function kafka_high_consume($topic='engine.com'){
            $conf = new RdKafkaConf();
            $conf->setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
                switch ($err) {
                    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                        echo "Assign: ";
                        var_dump($partitions);
                        $kafka->assign($partitions);
                        break;
    
                    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                        echo "Revoke: ";
                        var_dump($partitions);
                        $kafka->assign(NULL);
                        break;
    
                    default:
                        throw new Exception($err);
                }
            });
            $conf->set('group.id', '0');
            $conf->set('metadata.broker.list', 'localhost:9092,localhost:9093,localhost:9094,localhost:9095');
    //        针对低延迟进行了优化的配置。这允许PHP进程/请求尽快发送消息并快速终止
            $conf->set('socket.timeout.ms', 50);
            if (function_exists('pcntl_sigprocmask')) {
                pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
                $conf->set('internal.termination.signal', SIGIO);
            } else {
                $conf->set('queue.buffering.max.ms', 1);
            }
    
            $topicConf = new RdKafkaTopicConf();
            $topicConf->set('auto.commit.interval.ms', 100);
            $topicConf->set('auto.offset.reset', 'smallest');
            $topicConf->set('offset.store.path', 'kafka_offset.log');
            $conf->setDefaultTopicConf($topicConf);
    
            $consumer = new RdKafkaKafkaConsumer($conf);
    //        $topics->consumeStart(1, RD_KAFKA_OFFSET_STORED);
    
    //        更新订阅集(自动分配partitions )
            $consumer->subscribe([$topic]);
    
    //        指定topic分配partitions使用那个分区
    //        $consumer->assign([
    //            new RdKafkaTopicPartition("zzy8", 0),
    //            new RdKafkaTopicPartition("zzy8", 1),
    //            ]);
    
            while (true) {
    //            设置120s为超时
                $message = $consumer->consume(120 * 1000);
                if (!empty($message)) {
    
                    switch ($message->err) {
                        case RD_KAFKA_RESP_ERR_NO_ERROR:
                            info('New message received :', $message);
    //                        拆解对象为数组
                            $payload = json_decode($message->payload,true);
                            $Orders = new OrdersController();
                            $key = $message->key;
    //                        根据kafka中不同key,调用对应方法传递处理数据
                            ...
                   ...
                    ...
    break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more "; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out "; var_dump("##################"); break; default: var_dump("nothing"); throw new Exception($message->errstr(), $message->err); break; } } else { var_dump('this is empty obj!!!'); } } return 0; }

      在这里说明下,我放的是我项目中的使用函数,一些参数配置,大家可以根据我的提示自行注释和使用。

  • 相关阅读:
    java中eclipse控制台接受输入的方法
    java中Timer类的详细介绍(详解)
    java中Timer类的详细介绍(详解)
    java中Timer类的详细介绍(详解)
    java中Timer类的详细介绍(详解)
    java中ReentrantLock类的详细介绍(详解)
    java中ReentrantLock类的详细介绍(详解)
    java中ReentrantLock类的详细介绍(详解)
    java中ReentrantLock类的详细介绍(详解)
    Spring中WebApplicationInitializer的理解
  • 原文地址:https://www.cnblogs.com/darrenzzy/p/9355210.html
Copyright © 2011-2022 走看看