zoukankan      html  css  js  c++  java
  • kafka生产者 消费者

    publisher.php

    <?php
    
    $rk = new RdKafkaProducer();
    $rk->addBrokers("192.168.33.50");
    
    $topic = $rk->newTopic("two");
    
    for ($i = 0; $i < 2; $i++) {
    	$topic->produce(RD_KAFKA_PARTITION_UA, 0, "发送信息: $i");
    	$rk->poll(0);
    }
    
    while ($rk->getOutQLen() > 0) {
    	$rk->poll(50);
    }
    
    ?>
    

      

    consumer.ph

    <?php
    $conf = new RdKafkaConf();
    
    $conf->setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
    	switch ($err) {
    		case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
    			echo "Assign: ";
    			var_dump($partitions);
    			$kafka->assign($partitions);
    			break;
    
    		case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
    			echo "Revoke: ";
    			var_dump($partitions);
    			$kafka->assign(NULL);
    			break;
    
    		default:
    			throw new Exception($err);
    	}
    });
    
    $conf->set('group.id', 'myConsumerGroup');
    
    $conf->set('metadata.broker.list', '192.168.33.50');
    
    $topicConf = new RdKafkaTopicConf();
    
    $topicConf->set('auto.offset.reset', 'smallest');
    
    
    $consumer = new RdKafkaKafkaConsumer($conf);
    
    $consumer->subscribe(['two']);
    
    while (true) {
    	$message = $consumer->consume(120*1000);
    	switch ($message->err) {
    		case RD_KAFKA_RESP_ERR_NO_ERROR:
    			var_dump($message);
    			break;
    		case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    			echo "No more messages; will wait for more
    ";
    			break;
    		case RD_KAFKA_RESP_ERR__TIMED_OUT:
    			echo "Timed out
    ";
    			break;
    		default:
    			throw new Exception($message->errstr(), $message->err);
    			break;
    	}
    }
    

      

    创建topic 不知道如何 用代码创建的topic 生产者就是无法生产

    /usr/local/kafka/bin/kafka-topics.sh --create --topic two  --zookeeper 192.168.33.50:2181 --partitions 1 --replication-factor 1

    查看topic

    /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.33.50:2181 --partitions 1 --replication-factor 1

  • 相关阅读:
    Pytest学习之 autouse=True,自动调用fixture功能
    Pytest学习之xfail使用
    Pytest学习之use fixtures
    python
    python
    python
    python
    python
    python
    python
  • 原文地址:https://www.cnblogs.com/brady-wang/p/13593758.html
Copyright © 2011-2022 走看看