zoukankan      html  css  js  c++  java
  • php使用kafka代码

    生产者

    producer.php文件

    <?php
    /**
     * Created by PhpStorm.
     * User: shiyibo
     * Date: 2019/2/24
     * Time: 12:57 PM
     */
    
    /**
     * 消息生产者
     *
     * 实现的例子来源于:
     *
     * https://github.com/arnaud-lb/php-rdkafka#examples
     */
    
    $objRdKafka = new RdKafkaProducer();
    $objRdKafka->setLogLevel(LOG_DEBUG);
    $objRdKafka->addBrokers("localhost:9092");
    
    $oObjTopic = $objRdKafka->newTopic("haha");
    
    // 从终端接收输入 
    $oInputHandler = fopen('php://stdin', 'r');
    
    while (true) {
        echo "
    Enter  messages:
    ";
        $sMsg = trim(fgets($oInputHandler));
    
       // 空消息意味着退出
        if (empty($sMsg)) {
            break;
        }
    
        // 发送消息
        $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
    }
    
    echo "done
    ";
    

    消费者

    consumer.php文件

    <?php
    
    /**
     * 消费者消费消息
     *
     * 实现的例子来源于:
     *
     * https://github.com/arnaud-lb/php-rdkafka#examples
     */
    
    $objRdKafka = new RdKafkaConsumer();
    $objRdKafka->setLogLevel(LOG_DEBUG);
    $objRdKafka->addBrokers("localhost:9092");
    
    $oObjTopic = $objRdKafka->newTopic("haha");
    
    /**
     * consumeStart
     *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
     *   第二个参数标识从什么位置开始拉取消息,可选值为
     *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
     *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
     *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
     */
    $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);
    
    while (true) {
        // 第一个参数是分区,第二个参数是超时时间
        $oMsg = $oObjTopic->consume(0, 1000);
    
        // 没拉取到消息时,返回NULL
        if (!$oMsg) {
            usleep(10000);
            continue;
        }
    
        if ($oMsg->err) {
            echo $msg->errstr(), "
    ";
            break;
        } else {
            echo $oMsg->payload, "
    ";
        }
    }
    

    测试

    用命令行运行

    php consumer.php
    php producer.php
    

    相关文章:

    1. Kafka在window上安装部署
      https://www.cnblogs.com/wangzhaobo/p/14345281.html
    2. php在windows上安装kafka扩展
      https://www.cnblogs.com/wangzhaobo/p/14345760.html
  • 相关阅读:
    PHP+MySQL
    Appstore排名前十的程序员应用软件
    架构师的平凡之路
    程序员,如何三十而立?
    不懂技术也可以轻松开发一款APP
    php语法学习:轻松看懂PHP语言
    你真的了解软件测试行业吗?
    十个程序员必备的网站推荐
    从更高点看软件开发的侧重点
    php如何实现文件下载
  • 原文地址:https://www.cnblogs.com/wangzhaobo/p/14346329.html
Copyright © 2011-2022 走看看