接触swoole有一年了,一年前上singwa老师的课,用swoole结合redis实现了一个消息队列,但那个时候是使用TP来做的,TP对swoole的支持并不是特别的友好,在一年后的现在,使用kafka结合easyswoole异步定时任务已经多进程来实现一个高性能的消息队列服务,主要用来实现飞飞物联的设备逻辑(规则引擎),比如根据传感器的数据发短信等等。
首先连接kafka,这里的kaka我使用的百度云提供的kafka服务,自己部署太麻烦而且难以维护,连接的参考例子在这里https://github.com/BCEBIGDATA/kafka-sample-php ,其实最想使用的是微博的那个不使用扩展来连接kafka的库,但是一直没有解决使用ssl文件连接的问题,因此就是用了rdkafka扩展,首先按照样例中的说明安装librdkafka
1
2
3
sh setup-librdkafka.sh
pecl install rdkafka
echo "extension=rdkafka.so" >> /etc/php.ini //根据实际位置
这样就安装好了librdkafka和php扩展,要注意的是版本号必须要新一些的,否则使用ssl的会报没有该设置项的异常,排查这个异常花了一晚上的时间。
接下来在easyswoole创建一个连接kafka的基类,在飞飞物联的项目中只会使用到consumer,因为producer的数据是来自天工的设备数据
kafka.php – 连接kafka的基类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
namespace AppLibKafka;
use RdKafkaConf;
use RdKafkaKafkaConsumer;
use SwooleException;
class Kafka
{
private $topic = '';
private $config = [
'broker' => 'xxxxxxxxx:9092',
'security_protocol' => 'ssl',
'client_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.pem',
'client_key' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.key',
'ca_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/ca.pem',
'group_id' => 'kafka-feifei-swoole-consumer'
];
public function __construct($topic)
{
if(!extension_loaded(rdkafka)){
throw new Exception('rdkafka.so扩展必须开启');
}
if(!isset($topic) || empty($topic)){
throw new Exception('kafak实例化必须设置topic');
}
$this->topic = $topic;
}
public function subscribe(){
$conf = new RdKafkaConf();
$conf->set('metadata.broker.list', $this->config['broker']);
$conf->set('group.id', $this->config['group_id'].rand(0,10));
$conf->set('security.protocol', $this->config['security_protocol']);
$conf->set('ssl.certificate.location', $this->config['client_pem']);
$conf->set('ssl.key.location', $this->config['client_key']);
$conf->set('ssl.ca.location', $this->config['ca_pem']);
$consumer = new RdKafkaKafkaConsumer($conf);
$consumer->subscribe([$this->topic]);
return $consumer;
}
}
这里需要特别注意的是PHPstorm的代码检查器好像找不到rdkafka这个扩展,但是没有关系,我没可以在初始化这个类的时候判断一下扩展是否存在。这里只实现了消费者,要使用消费者需要实例化的时候传入消费者的topic,然后调用subscribe方法,接下来实际在easyswoole的mainServiceCreate中创建三个进程来处理kafka的订阅事件
1
2
3
4
5
6
7
8
9
public static function mainServerCreate(EventRegister $register)
{
// TODO: Implement mainServerCreate() method.
// 注册Kafka消费事件, 开三个进程来处理
$allNum = 3;
for($i = 0; $i < $allNum; $i++){
ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess());
}
}
这里new的Consumer就是处理消费的进程
Consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
<?php
/**
* Created by bingxiong.
* Date: 4/8/19
* Time: 10:22 PM
* Description:
*/
namespace AppLibKafka;
use EasySwooleComponentProcessAbstractProcess;
class Consumer extends AbstractProcess
{
private $isRun = false;
public function run($arg)
{
// 在这里处理kafka连接
// TODO: Implement run() method.
$this->addTick(500,function (){
if(!$this->isRun){
$this->isRun = true;
// 连接kafka并订阅TOPIC
$kafka = new Kafka('xxxxxxxxxxxx');//topic
$consumer = $kafka->subscribe();
while(true){
try{
$message = $consumer->consume(120*1000);
if($message){
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo 'process name is'.$this->getProcessName().'
';
echo "partition:", $message->partition,", offset:", $message->offset,", ", $message->payload, "
";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more
";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out
";
break;
default:
throw new Exception($message->errstr(), $message->err);
break;
}
}else{
break;
}
}catch (Throwable $throwable){
break;
}
}
$this->isRun = false;
}
var_dump($this->getProcessName().' task run check');
});
}
public function onShutDown()
{
// TODO: Implement onShutDown() method.
}
public function onReceive(string $str, ...$args)
{
// TODO: Implement onReceive() method.
}
}
这里使用了一个异步任务addTick,如果长期没有消息的话也会每500秒去检查一下有没有新的消息。这里还是用了一个死循环,在这里死循环中持续处理消息过来之后的逻辑
swoole结合kafka实现超高性能消息队列插图
现在已经使用swoole+kafka拿到设备的数据了,接下来就是使用异步任务或者异步redis之类的去执行相应的业务逻辑了。