zoukankan      html  css  js  c++  java
  • kafka生产者 消费者

    publisher.php

    <?php
    
    $rk = new RdKafkaProducer();
    $rk->addBrokers("192.168.33.50");
    
    $topic = $rk->newTopic("two");
    
    for ($i = 0; $i < 2; $i++) {
    	$topic->produce(RD_KAFKA_PARTITION_UA, 0, "发送信息: $i");
    	$rk->poll(0);
    }
    
    while ($rk->getOutQLen() > 0) {
    	$rk->poll(50);
    }
    
    ?>
    

      

    consumer.ph

    <?php
    $conf = new RdKafkaConf();
    
    $conf->setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
    	switch ($err) {
    		case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
    			echo "Assign: ";
    			var_dump($partitions);
    			$kafka->assign($partitions);
    			break;
    
    		case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
    			echo "Revoke: ";
    			var_dump($partitions);
    			$kafka->assign(NULL);
    			break;
    
    		default:
    			throw new Exception($err);
    	}
    });
    
    $conf->set('group.id', 'myConsumerGroup');
    
    $conf->set('metadata.broker.list', '192.168.33.50');
    
    $topicConf = new RdKafkaTopicConf();
    
    $topicConf->set('auto.offset.reset', 'smallest');
    
    
    $consumer = new RdKafkaKafkaConsumer($conf);
    
    $consumer->subscribe(['two']);
    
    while (true) {
    	$message = $consumer->consume(120*1000);
    	switch ($message->err) {
    		case RD_KAFKA_RESP_ERR_NO_ERROR:
    			var_dump($message);
    			break;
    		case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    			echo "No more messages; will wait for more
    ";
    			break;
    		case RD_KAFKA_RESP_ERR__TIMED_OUT:
    			echo "Timed out
    ";
    			break;
    		default:
    			throw new Exception($message->errstr(), $message->err);
    			break;
    	}
    }
    

      

    创建topic 不知道如何 用代码创建的topic 生产者就是无法生产

    /usr/local/kafka/bin/kafka-topics.sh --create --topic two  --zookeeper 192.168.33.50:2181 --partitions 1 --replication-factor 1

    查看topic

    /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.33.50:2181 --partitions 1 --replication-factor 1

  • 相关阅读:
    Spring AOP 随记
    Java设计模式系列 — 构造器模式
    【Java线程安全】 — 常用数据结构及原理(未完结)
    【最佳实践】好用的Quartz管理器类
    Timer和时间调度
    Java9之HashMap与ConcurrentHashMap
    记一次maven的包冲突经历
    hbase高可用集群部署(cdh)
    HBase 1.2.6 完全分布式集群安装部署详细过程
    hadoop-2.7.3完全分布式部署
  • 原文地址:https://www.cnblogs.com/brady-wang/p/13593758.html
Copyright © 2011-2022 走看看