zoukankan      html  css  js  c++  java
  • php使用kafka代码

    生产者

    producer.php文件

    <?php
    /**
     * Created by PhpStorm.
     * User: shiyibo
     * Date: 2019/2/24
     * Time: 12:57 PM
     */
    
    /**
     * 消息生产者
     *
     * 实现的例子来源于:
     *
     * https://github.com/arnaud-lb/php-rdkafka#examples
     */
    
    $objRdKafka = new RdKafkaProducer();
    $objRdKafka->setLogLevel(LOG_DEBUG);
    $objRdKafka->addBrokers("localhost:9092");
    
    $oObjTopic = $objRdKafka->newTopic("haha");
    
    // 从终端接收输入 
    $oInputHandler = fopen('php://stdin', 'r');
    
    while (true) {
        echo "
    Enter  messages:
    ";
        $sMsg = trim(fgets($oInputHandler));
    
       // 空消息意味着退出
        if (empty($sMsg)) {
            break;
        }
    
        // 发送消息
        $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
    }
    
    echo "done
    ";
    

    消费者

    consumer.php文件

    <?php
    
    /**
     * 消费者消费消息
     *
     * 实现的例子来源于:
     *
     * https://github.com/arnaud-lb/php-rdkafka#examples
     */
    
    $objRdKafka = new RdKafkaConsumer();
    $objRdKafka->setLogLevel(LOG_DEBUG);
    $objRdKafka->addBrokers("localhost:9092");
    
    $oObjTopic = $objRdKafka->newTopic("haha");
    
    /**
     * consumeStart
     *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
     *   第二个参数标识从什么位置开始拉取消息,可选值为
     *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
     *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
     *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
     */
    $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);
    
    while (true) {
        // 第一个参数是分区,第二个参数是超时时间
        $oMsg = $oObjTopic->consume(0, 1000);
    
        // 没拉取到消息时,返回NULL
        if (!$oMsg) {
            usleep(10000);
            continue;
        }
    
        if ($oMsg->err) {
            echo $msg->errstr(), "
    ";
            break;
        } else {
            echo $oMsg->payload, "
    ";
        }
    }
    

    测试

    用命令行运行

    php consumer.php
    php producer.php
    

    相关文章:

    1. Kafka在window上安装部署
      https://www.cnblogs.com/wangzhaobo/p/14345281.html
    2. php在windows上安装kafka扩展
      https://www.cnblogs.com/wangzhaobo/p/14345760.html
  • 相关阅读:
    python 使用else代替状态变量
    python 实现线程安全的单例模式
    sql语句的执行顺序
    python 实现int函数
    python实现时间o(1)的最小栈
    python实现简单的负载均衡
    python实现求最长回文子串长度
    python lambda表达式
    sql针对某一字段去重,并且保留其他字段
    基本认识
  • 原文地址:https://www.cnblogs.com/wangzhaobo/p/14346329.html
Copyright © 2011-2022 走看看