zoukankan      html  css  js  c++  java
  • php调用kafka消息队列

    2020年10月27日16:49:33

    环境php 7.3 laravel 8

    kafka版本  kafka_2.13-2.6.0.tgz

    扩展https://github.com/arnaud-lb/php-rdkafka

    其他的php扩展不是很久没更新就是扩展关系乱七八糟,建议使用rdkafka

    http://pecl.php.net/package/rdkafka 有编译好的dll扩展

    官方文档 https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html  需要翻墙

    kafka中文文档

    https://kafka.apachecn.org/documentation.html

    https://www.orchome.com/472

    相关概念理解

    https://www.cnblogs.com/rickiyang/category/1487719.html

    注意: 将其中php_rdkafka.dll放入php目录下的ext文件夹内,librdkafka.dll放入php根目录下,然后修改php.ini,添加:

    extension=php_rdkafka.dll

    php -m和phpinfo()都需要验证是否安装成功

    代码:

    KafkaProducerService

    <?php
    
    namespace AppService;
    
    use RdKafkaConf;
    use RdKafkaProducer;
    
    class KafkaProducerService {
    
        public static function doTask() {
    
            $conf = new Conf();
    //        $conf->set('log_level', (string) LOG_DEBUG);
    //        $conf->set('debug', 'all');
            $rk = new Producer($conf);
            $rk->addBrokers('172.18.0.105');
            $topic = $rk->newTopic('zx');
    
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message 222222222222");
    
            $rk->poll(1);
        }
    
    }
    View Code

    KafkaConsumerService low消费者模式

    <?php
    
    namespace AppService;
    
    use RdKafkaConf;
    use RdKafkaConsumer;
    
    class KafkaConsumerService {
    
        public static function doTask() {
    
            $conf = new Conf();
    //        $conf->set('log_level', (string) LOG_DEBUG);
    //        $conf->set('debug', 'all');
            $rk = new Consumer($conf);
            $rk->addBrokers('172.18.0.105');
    
            $topic = $rk->newTopic('zx');
    
            $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
    
            while (true) {
                $msg = $topic->consume(0, 5000);
                if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                    // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
                    continue;
                } elseif ($msg->err) {
                    echo $msg->errstr(), "
    ";
                    break;
                } else {
                    echo $msg->payload, "
    ";
                }
            }
        }
    
    }
    View Code

    high消费者模式

    <?php
    
    namespace AppService;
    
    use RdKafkaConf;
    use RdKafkaConsumer;
    use RdKafkaTopicConf;
    use RdKafkaKafkaConsumer;
    use AppServiceBaseKafkaService;
    
    class KafkaConsumerService extends BaseKafkaService {
    
        //low消费者
    //    public static function doTask() {
    //
    //        $conf = new Conf();
    //        $rk = new Consumer($conf);
    //        $rk->addBrokers(self::$server_ip);
    //        $topic = $rk->newTopic(self::$topic);
    //
    //        $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
    //
    //        while (true) {
    //            $msg = $topic->consume(0, 5000);
    //            if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
    //                // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
    //                continue;
    //            } elseif ($msg->err) {
    //                echo $msg->errstr(), "
    ";
    //                break;
    //            } else {
    //                echo $msg->payload, "
    ";
    //            }
    //        }
    //    }
        //high消费者
        public static function doTask() {
            $conf = new Conf();
            // Set a rebalance callback to log partition assignments (optional)
            $conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {
                switch ($err) {
                    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                        echo "Assign: ";
                        var_dump($partitions);
                        $kafka->assign($partitions);
                        break;
    
                    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                        echo "Revoke: ";
                        var_dump($partitions);
                        $kafka->assign(NULL);
                        break;
    
                    default:
                        throw new Exception($err);
                }
            });
    
            // Configure the group.id. All consumer with the same group.id will consume
            $conf->set('group.id', self::$group_id);
            $conf->set('metadata.broker.list', self::$server_ip);
    
            $conf->set('auto.offset.reset', 'earliest');
    
            $consumer = new KafkaConsumer($conf);
            
            $topicConf = new TopicConf();
    //        $topicConf->set("auto.commit.interval.ms", 1e3);
            $topicConf->set("enable.auto.commit", 0);
    
            $topic = $consumer->newTopic(self::$topic, $topicConf);
            
    //        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
            // Subscribe to topic 'test'
            $consumer->subscribe([self::$topic]);
    
            while (true) {
                $message = $consumer->consume(1 * 1000);
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
    //                    var_dump($message);
                        var_dump($message->payload);
    
    //                    $consumer->commit($message);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                        echo "No more messages; will wait for more
    ";
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        echo "Timed out
    ";
                        break;
                    default:
                        throw new Exception($message->errstr(), $message->err);
                        break;
                }
            }
        }
    
    }
    View Code

    注意:

    1,你测试服务器上的kafka必须的先测试好,不然有些错误,在log里面没法只管体现

    2,$rk->poll(1);必须,不然消费端接受不到信息,因为是编译扩展,所以不清楚底层是怎么实现的

    3,注意消费者模式和参看文档

    一些测试命令

    启动查看启动错误
    bin/kafka-server-start.sh config/server.properties bin/zookeeper-server-start.sh config/zookeeper.properties
    172.18.0.105 本地测试服务Ip
    bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=3000
    //删除
    bin/kafka-topics.sh --delete --topic testTopic --zookeeper localhost:2181
    //列表
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    //创建
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zx
    //生产者客户端
    bin/kafka-console-producer.sh --broker-list 172.18.0.105:9092 --topic zx
    //消费者客户端
    bin/kafka-console-consumer.sh --bootstrap-server 172.18.0.105:9092 --topic zx --from-beginning

    jps查看是否启动
    9173 Kafka
    9462 Jps
    8589 QuorumPeerMain

    现在服务器启动生产者客户端,消费者客户端测试确保kafka跑起来完全没问题,这点一定要确保

    个人的一点感悟,kafka配置和测试有一定难度,不如rabbitmq上手快,而且kafka找错误真的不好找

    虽然kafka号称吞吐性能能够达到T级别,都有集群模式,kafka借助与zookeeper的集群,各自都有优点,如果是java做粘合剂就使用kafka,如果是多语言建议RabbitMQ

     建议java全家桶还是使用java做粘合剂会好一些

  • 相关阅读:
    01 Java基础第一天
    2019牛客暑期多校训练营(第七场)J A+B problem
    SDNU 1477.矩形面积交(思维)
    SDNU 1194.传纸条(DP)&& 1032.机器人
    SDNU 1280.就问你慌不慌(高精度)
    POJ 2528 Mayor's posters(线段树+离散化)
    HDU 1698 Just a Hook(线段树区间赋值)
    POJ 3468 A Simple Problem with Integers (区间加区间查找)
    HDU 1754 I Hate It(线段树单点更改、区间查找最大值)
    HDU 1166 敌兵布阵(线段树单点加区间查询)
  • 原文地址:https://www.cnblogs.com/zx-admin/p/13885953.html
Copyright © 2011-2022 走看看