zoukankan      html  css  js  c++  java
  • Kafka-php-使用 PHP 编写的 Kafka 客户端

    Kafka-php 使用纯粹的PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document , 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和kafka broker 交互,较 v0.1.x 更加稳定高效, 由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本

    安装环境要求

    • PHP 版本大于 5.5
    • Kafka Server 版本大于 0.8.0
    • 消费模块 Kafka Server 版本需要大于 0.9.0

    使用 Composer 安装

    添加 composer 依赖 nmred/kafka-php 到项目的 composer.json 文件中即可,如:

    {
    	"require": {
    		"nmred/kafka-php": "0.2.*"
    	}
    }
    Produce

    <?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use MonologLogger; use MonologHandlerStdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); // 设置生产相关配置,具体配置参数见 [Configuration](Configuration.md) $config = KafkaProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('0.9.0.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new KafkaProducer(function() { return array( array( 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ), ); }); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode, $context) { var_dump($errorCode); }); $producer->send();

    Consumer

    <?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use MonologLogger; use MonologHandlerStdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = KafkaConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('0.9.0.1'); $config->setTopics(array('test')); //$config->setOffsetReset('earliest'); $consumer = new KafkaConsumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });

                                                               

  • 相关阅读:
    June. 26th 2018, Week 26th. Tuesday
    June. 25th 2018, Week 26th. Monday
    June. 24th 2018, Week 26th. Sunday
    June. 23rd 2018, Week 25th. Saturday
    June. 22 2018, Week 25th. Friday
    June. 21 2018, Week 25th. Thursday
    June. 20 2018, Week 25th. Wednesday
    【2018.10.11 C与C++基础】C Preprocessor的功能及缺陷(草稿)
    June.19 2018, Week 25th Tuesday
    June 18. 2018, Week 25th. Monday
  • 原文地址:https://www.cnblogs.com/LXJ416/p/8245386.html
Copyright © 2011-2022 走看看