首先安装kafka扩展链接附上:https://www.cnblogs.com/dalaowang/p/13469004.html
其次安装kafka扩展包,用composer安装:
composer require "nmred/kafka-php"
安装完成后会在vendor目录先生成这个文件:
把kafka封装成单例模式进行调用:
<?php namespace apputils; class Kafka { private static $instance; private $client; private static $result; private $db; private static $collectionName; private $broker_list = '127.0.0.1'; //现在是一个,也可以多个,用逗号隔开 private $topic = 'test'; //定义topic private $partition = 0; //定义topic的物理分组,这里是0 private $producer = null; //存储producer对象 //构造器私有化:禁止从类外部实例化 private function __construct() { if (empty($this->broker_list)) { echo 'broker not config'; } $rk = new RdKafkaProducer(); //实例化对象 if (empty($rk)) { echo 'producer error1'; } $rk->setLogLevel(LOG_DEBUG); //设置错误级别 if(!$rk->addBrokers($this->broker_list)) {//设置Broker地址 echo 'producer error2'; } $this->producer = $rk; } //克隆方法私有化:禁止从外部克隆对象 private function __clone() { } //生成当前类的唯一实例 public static function get_instance() { if (!self::$instance instanceof self) { self::$instance = new self(); } return self::$instance; } //生产者的方法(生产者把日志向消息队列发送) public function send($message = []) { $topic = $this->producer->newTopic($this->topic); //创建topic $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode([$message]));//生产 } }
上面的代码基本不用改,还是需要根据实际情况而定
控制器调用方式:
public function kafkaTest(){ apputilsKafka::get_instance()->send(['string1']); echo 'OK~'; }