<?php
#生产
$rk = new RdKafkaProducer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");
$topic = $rk->newTopic("test3");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
}
<?php
#消费
<?php
$rk = new RdKafkaConsumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");
$topic = $rk->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$msg = $topic->consume(0, 1000);
if (null === $msg) {
continue;
} elseif ($msg->err) {
echo $msg->errstr(), "
";
break;
} else {
echo $msg->payload, "
";
}
}