zoukankan      html  css  js  c++  java
  • Tp5使用Kafka:封装生产者、消费者操作类

    【相关文章】PHP操作Kafka:php-rdkafka扩展的安装

    1、config.php中配置:

    //kafka连接配置
    'kafka_server' => [
        'host' => '127.0.0.1:9092',
        'topic' => 'topic1',
    ],

    2、创建一个生产者 KafkaProducer.php :

    _config = config('kafka_server');
            $this->_rk = new \RdKafka\Producer();
            $this->_rk->setLogLevel(LOG_DEBUG);
            $this->_rk->addBrokers($this->_config['host']);
            $this->_topic = $this->_rk->newTopic($this->_config['topic']);
        }
    
        public function add($data){
            $this->_topic->produce(RD_KAFKA_PARTITION_UA, 0, $data);
            $this->_rk->poll(0);
            while ($this->_rk->getOutQLen() > 0) {
                $this->_rk->poll(50);
            }
        }
    }

    3、创建一个消费者 KafkaConsumer.php :

    _partition = $partition;
            $this->_config = config('kafka_server');
    
            $conf = new \RdKafka\Conf();
            $conf->set('group.id', $groupId);
    
            $rk = new \RdKafka\Consumer($conf);
            $rk->addBrokers($this->_config['host']);
    
            $topicConf = new \RdKafka\TopicConf();
            $topicConf->set('auto.commit.interval.ms', 100);
    
            $topicConf->set('offset.store.method', 'file');
            $topicConf->set('offset.store.path', sys_get_temp_dir());
            $topicConf->set('auto.offset.reset', 'smallest');
            $this->_topic = $rk->newTopic($this->_config['topic'], $topicConf);
            $this->_topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);
        }
    
        public function run(){
            while (true) {
                $message = $this->_topic->consume($this->_partition, 120*10000);
                Logger('KafkaConsumer::run::1', [$message]);
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        $this->exceTask($message->payload);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF: //等待接收信息
                        error_log("No more messages; will wait for more\n");
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT: //超时
                        error_log("Timed out\n");
                        break;
                    default:
                        throw new \Exception($message->errstr(), $message->err);
                        break;
                }
            }
        }
    
        private function exceTask($jsonData){
            error_log($jsonData);
            $paramsArr = json_decode($jsonData,true);
            if(!isset($paramsArr['className']) || !isset($paramsArr['funcName'])){
                error_log("Param error\n");
            }
            $objUrl = '\\app\\index\\service\\' . $paramsArr['className'];
            try {
                $obj = new $objUrl();
                return call_user_func([$obj, $paramsArr['funcName']], $jsonData);
            } catch (\Exception $e) {
                error_log("Func not found\n");
            }
        }
    
    }

    4、测试:

      调用生产者生产 Test.php:

     'TestService',
              'funcName' => 'index',
              'user_id' => 1,
            ];
            KafkaProducer::getInstance()->add(json_encode($data));
        }
    }

      消费者消费回调 TestService.php : 

    启动消费者 KafkaConsumer -> run() 后,调用生产者生产 Test.php,此时在消费者消费回调 TestService.php 中可看到如下日志记录,则消费者消费成功:

    IT成长中的那些事儿
  • 相关阅读:
    弹性盒布局(Flexbox布局)
    CSS子元素在父元素中水平垂直居中的几种方法
    Vue中watch用法详解
    深入理解vue中的slot与slot-scope
    Spring 源码学习 03:创建 IoC 容器的几种方式
    Spring 源码学习 02:关于 Spring IoC 和 Bean 的概念
    Spring 源码阅读环境的搭建
    DocView 现在支持自定义 Markdown 模版了!
    Dubbo 接口,导出 Markdown ,这些功能 DocView 现在都有了!
    线程池 ThreadPoolExecutor 原理及源码笔记
  • 原文地址:https://www.cnblogs.com/life_lt/p/15672990.html
Copyright © 2011-2022 走看看