zoukankan      html  css  js  c++  java
  • RdKafka使用

    RdKafka使用

    标签(空格分隔): 未分类

    生产消息(Producer)

    $config = new RdKafkaConf();
    $config->set('metadata.broker.list', $this->brokerList);
    $producer = new RdKafkaProducer($config);
    $topic = $producer->newTopic($topic);
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
    $producer->poll(0);
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
        return false;
    }
    return true;
    

    消费消息(Consumer)

    $conf = new RdKafkaConf();
    $conf->set('group.id', $this->groupName);
    
    $consumer = new RdKafkaConsumer($conf);
    $consumer->addBrokers($this->brokerList);
    
    $topicConf = new RdKafkaTopicConf();
    $topicConf->set('auto.commit.interval.ms', 100);
    $topicConf->set('offset.store.method', 'broker');
    $topicConf->set('auto.offset.reset', 'smallest');
    
    $topic = $consumer->newTopic($this->topicName, $topicConf);
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    while (true) {
        $message = $topic->consume(0, 120*10000);
        if ($message->err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            print("err: " . $message->err);
            print("errstr: " . $message->errstr());
        } else {
            var_dump($message->payload);
        }
    }
  • 相关阅读:
    第五章4
    第五章3
    第五章2
    第五章1
    第四章14
    第四章13
    第四章12
    第四章11
    第五章例5-6
    第五章例5-4
  • 原文地址:https://www.cnblogs.com/yanweifeng/p/14611823.html
Copyright © 2011-2022 走看看