zoukankan      html  css  js  c++  java
  • swoole结合kafka实现超高性能消息队列

    接触swoole有一年了,一年前上singwa老师的课,用swoole结合redis实现了一个消息队列,但那个时候是使用TP来做的,TP对swoole的支持并不是特别的友好,在一年后的现在,使用kafka结合easyswoole异步定时任务已经多进程来实现一个高性能的消息队列服务,主要用来实现飞飞物联的设备逻辑(规则引擎),比如根据传感器的数据发短信等等。

    首先连接kafka,这里的kaka我使用的百度云提供的kafka服务,自己部署太麻烦而且难以维护,连接的参考例子在这里https://github.com/BCEBIGDATA/kafka-sample-php ,其实最想使用的是微博的那个不使用扩展来连接kafka的库,但是一直没有解决使用ssl文件连接的问题,因此就是用了rdkafka扩展,首先按照样例中的说明安装librdkafka

    1
    2
    3
    sh setup-librdkafka.sh
    pecl install rdkafka
    echo "extension=rdkafka.so" >> /etc/php.ini //根据实际位置
    这样就安装好了librdkafka和php扩展,要注意的是版本号必须要新一些的,否则使用ssl的会报没有该设置项的异常,排查这个异常花了一晚上的时间。

    接下来在easyswoole创建一个连接kafka的基类,在飞飞物联的项目中只会使用到consumer,因为producer的数据是来自天工的设备数据

    kafka.php – 连接kafka的基类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    namespace AppLibKafka;

    use RdKafkaConf;
    use RdKafkaKafkaConsumer;
    use SwooleException;


    class Kafka
    {
    private $topic = '';

    private $config = [
    'broker' => 'xxxxxxxxx:9092',
    'security_protocol' => 'ssl',
    'client_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.pem',
    'client_key' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.key',
    'ca_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/ca.pem',
    'group_id' => 'kafka-feifei-swoole-consumer'
    ];

    public function __construct($topic)
    {
    if(!extension_loaded(rdkafka)){
    throw new Exception('rdkafka.so扩展必须开启');
    }

    if(!isset($topic) || empty($topic)){
    throw new Exception('kafak实例化必须设置topic');
    }
    $this->topic = $topic;
    }

    public function subscribe(){
    $conf = new RdKafkaConf();
    $conf->set('metadata.broker.list', $this->config['broker']);
    $conf->set('group.id', $this->config['group_id'].rand(0,10));
    $conf->set('security.protocol', $this->config['security_protocol']);
    $conf->set('ssl.certificate.location', $this->config['client_pem']);
    $conf->set('ssl.key.location', $this->config['client_key']);
    $conf->set('ssl.ca.location', $this->config['ca_pem']);
    $consumer = new RdKafkaKafkaConsumer($conf);
    $consumer->subscribe([$this->topic]);
    return $consumer;
    }

    }
    这里需要特别注意的是PHPstorm的代码检查器好像找不到rdkafka这个扩展,但是没有关系,我没可以在初始化这个类的时候判断一下扩展是否存在。这里只实现了消费者,要使用消费者需要实例化的时候传入消费者的topic,然后调用subscribe方法,接下来实际在easyswoole的mainServiceCreate中创建三个进程来处理kafka的订阅事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static function mainServerCreate(EventRegister $register)
    {
    // TODO: Implement mainServerCreate() method.
    // 注册Kafka消费事件, 开三个进程来处理
    $allNum = 3;
    for($i = 0; $i < $allNum; $i++){
    ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess());
    }
    }
    这里new的Consumer就是处理消费的进程

    Consumer.php

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    <?php
    /**
    * Created by bingxiong.
    * Date: 4/8/19
    * Time: 10:22 PM
    * Description:
    */

    namespace AppLibKafka;
    use EasySwooleComponentProcessAbstractProcess;

    class Consumer extends AbstractProcess
    {
    private $isRun = false;
    public function run($arg)
    {

    // 在这里处理kafka连接
    // TODO: Implement run() method.
    $this->addTick(500,function (){
    if(!$this->isRun){
    $this->isRun = true;
    // 连接kafka并订阅TOPIC
    $kafka = new Kafka('xxxxxxxxxxxx');//topic
    $consumer = $kafka->subscribe();
    while(true){
    try{
    $message = $consumer->consume(120*1000);
    if($message){
    switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    echo 'process name is'.$this->getProcessName().' ';
    echo "partition:", $message->partition,", offset:", $message->offset,", ", $message->payload, " ";
    break;
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more ";
    break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out ";
    break;
    default:
    throw new Exception($message->errstr(), $message->err);
    break;
    }
    }else{
    break;
    }
    }catch (Throwable $throwable){
    break;
    }
    }

    $this->isRun = false;
    }
    var_dump($this->getProcessName().' task run check');
    });
    }

    public function onShutDown()
    {
    // TODO: Implement onShutDown() method.
    }

    public function onReceive(string $str, ...$args)
    {
    // TODO: Implement onReceive() method.
    }
    }
    这里使用了一个异步任务addTick,如果长期没有消息的话也会每500秒去检查一下有没有新的消息。这里还是用了一个死循环,在这里死循环中持续处理消息过来之后的逻辑

    swoole结合kafka实现超高性能消息队列插图

    现在已经使用swoole+kafka拿到设备的数据了,接下来就是使用异步任务或者异步redis之类的去执行相应的业务逻辑了。

  • 相关阅读:
    vagrant up报错 Warning: Authentication failure. Retrying...解决方案
    node读写Excel操作
    批量转换word为pdf
    分享7个shell脚本实例--shell脚本练习必备
    shell脚本实例,通向shell脚本大师的必经之路
    前端优化DNS预解析
    如何选择开源协议
    深入理解document.referrer的用法
    使用 WebRTC 构建简单的前端视频通讯
    深入理解WebRTC
  • 原文地址:https://www.cnblogs.com/xiami2046/p/13277085.html
Copyright © 2011-2022 走看看