zoukankan      html  css  js  c++  java
  • PHP 消息队列 Kafka 使用

    安装 Kafka 服务

    直接到 kafka 官网 , 下载最新的

    wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
    

      

    解压,进入目录

    tar -zxvf kafka_2.13-2.5.0.tgz
    cd kafka_2.13-2.5.0
    

      

    启动 Kafka 服务

    使用安装包中的脚本启动单节点 Zookeeper 实例

    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    

      

    使用 kafka-server-start.sh 启动 kafka 服务

     

    bin/kafka-server-start.sh config/server.properties
    

      

    创建 topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

      

    查看 topic 列表,检查是否创建成功

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    $ test
    

      

    生产者,发送消息

    bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
    

      

    服务方面到这里就差不多了,接下来就是 php 的事了。

    安装 PHP 扩展

    rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka

    git clone https://github.com/edenhill/librdkafka.git
    cd librdkafka
    ./configure
    make && make install
    

      

    安装 php-rdkafka 扩展

    git clone https://github.com/arnaud-lb/php-rdkafka.git
    cd php-rdkafka
    phpize
    ./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config  ## 这里根据自己的情况填写路径
    make && make install
    

      

    在 php-ini 加上

     

    extension=rdkafka.so
    

      

    重启,php-fpm,就应该可以看到该扩展。

    使用Kafka

    创建一个生产者类

    <?php
    class KafkaProducer
    {
        public static $brokerList = '127.0.0.1:9092';
        public static function send($message, $topic)
     {
            self::producer($message, $topic);
        }
        public static function producer($message, $topic = 'test')
        {
            $conf = new RdKafkaConf();
            $conf->set('metadata.broker.list', self::$brokerList);
            $producer = new RdKafkaProducer($conf);
            $topic = $producer->newTopic($topic);
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
            $producer->poll(0);
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
                throw new RuntimeException('Was unable to flush, messages might be lost!');
    
          }
        }
    }
    

      

    创建一个消费类

    <?php
    class KafkaConsumer
    {
        public static $brokerList = '127.0.0.1:9092';
          public static function consumer()
        {
            $conf = new RdKafkaConf();
            $conf->set('group.id', 'test');
            $rk = new RdKafkaConsumer($conf);
       $rk->addBrokers("127.0.0.1");
            $topicConf = new RdKafkaTopicConf();
            $topicConf->set('auto.commit.interval.ms', 100);
            $topicConf->set('offset.store.method', 'broker');
            $topicConf->set('auto.offset.reset', 'smallest');
            $topic = $rk->newTopic('test', $topicConf);
            $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
            while (true) {
                $message = $topic->consume(0, 120*10000);
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        var_dump($message);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
           echo "No more messages; will wait for more
    ";
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        echo "Timed out
    ";
                        break;
                    default:
                        throw new Exception($message->errstr(), $message->err);
                        break;
                }
            }
        }
    }
    

      

    问题汇总

    1、 No Java runtime present, requesting install

    因为 kafka 需要 java 环境支持,所以安装 java 环境。可以到 javase-jdk14-downloads 选择自己的版本进行下载安装

    2、创建 topic 出现:Replication factor: 1 larger than available brokers: 0

    意思是至少有一个 brokers. 也就是说并没有有效的 brokers 可以用。你要确保你的 kafka 已经启动了

  • 相关阅读:
    Debian 添加Apache2
    最全面试资源,题库
    vue中的坑
    javascript事件相关4
    javascript事件相关3
    javascript事件相关2
    javascript事件学习笔记
    javascript 点点滴滴 jquery
    javascript 点点滴滴 jquery
    三栏自适应布局解决方案
  • 原文地址:https://www.cnblogs.com/a609251438/p/12917757.html
Copyright © 2011-2022 走看看