<?php $conf = new RdKafkaConf(); //捕获错误的回调 $conf->setErrorCb(function ($kafka, $err, $reason) { Log::error('kafkaError', ['errorStr' => rd_kafka_err2str($err), 'err' => $err, 'reason' => $reason, 'content' => $kafka]); }); $rk = new RdKafkaProducer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers($this->url); $topic = $rk->newTopic($this->topic); //写kafka $topic->produce(RD_KAFKA_PARTITION_UA, 0, $str); $rk->poll(0);