zoukankan      html  css  js  c++  java
  • PHP 实践kafka 生产消息

    首先安装kafka扩展链接附上:https://www.cnblogs.com/dalaowang/p/13469004.html

    其次安装kafka扩展包,用composer安装:

    composer require "nmred/kafka-php"

    安装完成后会在vendor目录先生成这个文件:

    把kafka封装成单例模式进行调用:

    <?php
    
    namespace apputils;
    
    class Kafka
    {
        private static $instance;
        private  $client;
        private static $result;
        private  $db;
        private static $collectionName;
    
        private $broker_list = '127.0.0.1';  //现在是一个,也可以多个,用逗号隔开
        private $topic = 'test';                      //定义topic
        private $partition = 0;                         //定义topic的物理分组,这里是0
    
        private $producer = null;                  //存储producer对象
    
        //构造器私有化:禁止从类外部实例化
        private function __construct()
        {
            if (empty($this->broker_list))
            {
                echo 'broker not config';
            }
            $rk = new RdKafkaProducer();  //实例化对象
    
            if (empty($rk)) {
                echo 'producer error1';
            }
            $rk->setLogLevel(LOG_DEBUG);  //设置错误级别
            if(!$rk->addBrokers($this->broker_list)) {//设置Broker地址
                echo 'producer error2';
            }
            $this->producer = $rk;
    
        }
    
        //克隆方法私有化:禁止从外部克隆对象
        private function __clone()
        {
    
        }
    
        //生成当前类的唯一实例
        public static function get_instance()
        {
            if (!self::$instance instanceof self) {
                self::$instance = new self();
            }
            return self::$instance;
        }
    
        //生产者的方法(生产者把日志向消息队列发送)
        public function send($message = [])
        {
            $topic = $this->producer->newTopic($this->topic);  //创建topic
    
            $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode([$message]));//生产
        }
    
    }

    上面的代码基本不用改,还是需要根据实际情况而定

    控制器调用方式:

    public function kafkaTest(){
            apputilsKafka::get_instance()->send(['string1']);
            echo 'OK~';
        }
  • 相关阅读:
    session_id 生成原理
    压缩后的数据 要经过 base64_encode 后才能在网络上传送
    MySQL ANALYZE TABLE
    mysql 优化2
    mysql 查询优化
    第归调用
    『GoLang』函数
    『GoLang』控制结构
    『GoLang』语法基础
    『Python』装饰器
  • 原文地址:https://www.cnblogs.com/dalaowang/p/13491987.html
Copyright © 2011-2022 走看看