zoukankan      html  css  js  c++  java
  • PHP处理kafka消息队列

    安装php-kafka 扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式

    低级方式(Low level)

    这种方式没有消费组的概念

    <?php
    
    $rk = new RdKafkaConsumer();
    $rk->setLogLevel(LOG_DEBUG);
    // 指定 broker 地址,多个地址用"," 分割
    $rk->addBrokers("192.168.33.1:9092");
    
    
    $topic = $rk->newTopic("test");
    $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
    
    
    while (true) {
        // 第一个参数是分区号
        // 第二个参数是超时时间
        $msg = $topic->consume(0, 1000);
        if ($msg->err) {
            echo $msg->errstr(), "
    ";
            break;
        } else {
            echo $msg->payload, "
    ";
        }
    }

    高级方式 (High level)

    这种方式可以指定消费组,一个消费组内,一个consumer 进程只能读取一个分区,

    <?php
    
    $conf = new RdKafkaConf();
    
    // Set a rebalance callback to log partition assignments (optional)
    // 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发
    $conf->setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;
    
            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;
    
            default:
                throw new Exception($err);
        }
    });
    
    // 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
    $conf->set('group.id', 'myConsumerGroup1');
    
    //添加 kafka集群服务器地址
    $conf->set('metadata.broker.list', '192.168.33.1:9092');
    
    $topicConf = new RdKafkaTopicConf();
    
    
    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    //当没有初始偏移量时,从哪里开始读取
    $topicConf->set('auto.offset.reset', 'smallest');
    
    
    // Set the configuration to use for subscribed/assigned topics
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new RdKafkaKafkaConsumer($conf);
    
    // 让消费者订阅log 主题
    $consumer->subscribe(['log']);
    
    
    while (true) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more
    ";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out
    ";
                break;
            default:
                throw new Exception($message->errstr(), $message->err);
                break;
        }
    }
    
    ?>
  • 相关阅读:
    【尺取法】Jessica's Reading Problem
    【状态压缩】关灯问题2
    【AC自动机】多模匹配算法
    【蔡勒公式 】根据给定的年月日求出对应星期几
    【线段树】结训赛— H
    【快速幂 && 素数筛 && 数论】Carmichael Numbers
    【线段树】浅析--线段树
    【KMP】数据结构实验之串三:KMP应用
    【线段树】3771->数组计算机
    【字典树】2828 -> 字典树
  • 原文地址:https://www.cnblogs.com/wenhainan/p/10932147.html
Copyright © 2011-2022 走看看