生产者
producer.php文件
<?php
/**
* Created by PhpStorm.
* User: shiyibo
* Date: 2019/2/24
* Time: 12:57 PM
*/
/**
* 消息生产者
*
* 实现的例子来源于:
*
* https://github.com/arnaud-lb/php-rdkafka#examples
*/
$objRdKafka = new RdKafkaProducer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("haha");
// 从终端接收输入
$oInputHandler = fopen('php://stdin', 'r');
while (true) {
echo "
Enter messages:
";
$sMsg = trim(fgets($oInputHandler));
// 空消息意味着退出
if (empty($sMsg)) {
break;
}
// 发送消息
$oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}
echo "done
";
消费者
consumer.php文件
<?php
/**
* 消费者消费消息
*
* 实现的例子来源于:
*
* https://github.com/arnaud-lb/php-rdkafka#examples
*/
$objRdKafka = new RdKafkaConsumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("haha");
/**
* consumeStart
* 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
* 第二个参数标识从什么位置开始拉取消息,可选值为
* RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
* RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
* RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
*/
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);
while (true) {
// 第一个参数是分区,第二个参数是超时时间
$oMsg = $oObjTopic->consume(0, 1000);
// 没拉取到消息时,返回NULL
if (!$oMsg) {
usleep(10000);
continue;
}
if ($oMsg->err) {
echo $msg->errstr(), "
";
break;
} else {
echo $oMsg->payload, "
";
}
}
测试
用命令行运行
php consumer.php
php producer.php
相关文章:
- Kafka在window上安装部署
https://www.cnblogs.com/wangzhaobo/p/14345281.html - php在windows上安装kafka扩展
https://www.cnblogs.com/wangzhaobo/p/14345760.html