zoukankan      html  css  js  c++  java
  • PHP Kafka 消息队列使用

     

    转载自:https://learnku.com/articles/44442

    1. 安装 Kafka 服务

    直接到 kafka 官网 , 下载最新的

      wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz

    解压,进入目录

      tar -zxvf kafka_2.13-2.5.0.tgz
      cd kafka_2.13-2.5.0

    开始使用

    a. 启动 Kafka 服务

    使用安装包中的脚本启动单节点 Zookeeper 实例

      bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

    使用 kafka-server-start.sh 启动 kafka 服务

      bin/kafka-server-start.sh config/server.properties

    创建 topic

      bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    查看 topic 列表,检查是否创建成功

      bin/kafka-topics.sh --list --zookeeper localhost:2181
    
      $ test

    生产者,发送消息

      bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

    服务方面到这里就差不多了,接下来就是 php 的事了。

     

    2. 安装 PHP 扩展#

    rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka

      git clone https://github.com/edenhill/librdkafka.git
      cd librdkafka
      ./configure
      make && make install

    安装 php-rdkafka 扩展

      git clone https://github.com/arnaud-lb/php-rdkafka.git
      cd php-rdkafka
      phpize
      ./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config  ## 这里根据自己的情况填写路径
      make && make install

    php-ini 加上

      extension=rdkafka.so

    重启,php-fpm,就应该可以看到该扩展。

     

    3. 使用#

    创建一个生产者类

    <?php
    class KafkaProducer
    {
        public static $brokerList = '127.0.0.1:9092';
    
        public static function send($message, $topic)
        {
            self::producer($message, $topic);
        }
    
        public static function producer($message, $topic = 'test')
        {
            $conf = new RdKafkaConf();
    
            $conf->set('metadata.broker.list', self::$brokerList);
    
            $producer = new RdKafkaProducer($conf);
    
            $topic = $producer->newTopic($topic);
    
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
    
            $producer->poll(0);
    
            $result = $producer->flush(10000);
    
            if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
                throw new RuntimeException('Was unable to flush, messages might be lost!');
            }
        }
    }

    创建一个消费类

    <?php
    class KafkaConsumer
    {
        public static $brokerList = '127.0.0.1:9092';
    
          public static function consumer()
        {
            $conf = new RdKafkaConf();
    
            $conf->set('group.id', 'test');
    
            $rk = new RdKafkaConsumer($conf);
    
            $rk->addBrokers("127.0.0.1");
    
            $topicConf = new RdKafkaTopicConf();
    
            $topicConf->set('auto.commit.interval.ms', 100);
    
            $topicConf->set('offset.store.method', 'broker');
    
            $topicConf->set('auto.offset.reset', 'smallest');
    
            $topic = $rk->newTopic('test', $topicConf);
    
            $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    
            while (true) {
                $message = $topic->consume(0, 120*10000);
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        var_dump($message);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                        echo "No more messages; will wait for more
    ";
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        echo "Timed out
    ";
                        break;
                    default:
                        throw new Exception($message->errstr(), $message->err);
                        break;
                }
            }
        }
    }
     

    问题汇总#

    a. No Java runtime present, requesting install

    因为 kafka 需要 java 环境支持,所以安装 java 环境。可以到 javase-jdk14-downloads 选择自己的版本进行下载安装

    b. 创建 topic 出现:Replication factor: 1 larger than available brokers: 0

    意思是至少有一个 brokers. 也就是说并没有有效的 brokers 可以用。你要确保你的 kafka 已经启动了

    参考

    www.jianshu.com/p/964bdfeb9465



     
     
  • 相关阅读:
    周志华 机器学习
    王亮 中国科学院自动化研究所
    殷明 合肥工业大学
    批处理命令行 for循环
    CalFrechetDist
    等高线简化线方法对比(多尺度评价方法)
    周成虎
    MFC 使用控制台打印程序信息
    C++ 获得本地磁盘盘符的容量信息
    VS2012+CUDA6.0配置方法
  • 原文地址:https://www.cnblogs.com/myJuly/p/14046659.html
Copyright © 2011-2022 走看看