概述
使用PHP+Redis简单实现一下队列以及ACK,确保服务的可靠性。
流程图
代码
RedisQueueAbstract
<?php
namespace RedisQueue;
use RedisLibsRedisClient;
use RedisQueueQueueTrait;
abstract class RedisQueueAbstract
{
use QueueTrait;
// redis
protected $redis;
// 队列
protected $queue;
// 队列值
protected $queueVal;
// 自动 Ack
protected $autoAck = true;
// 队列阻塞时间
protected $queueTimeout = 20;
// 无数据休眠时间
protected $sleepTimeout = 5;
// 处理超时时间,最小10秒
protected $execTimeout = 60;
public function __construct()
{
$this->redis = (new RedisClient)->connect();
}
/**
* 处理取出的队列数据
*
* @param array $data
* @return mixed
*/
abstract protected function handle(array $data);
/**
* 设置队列
*
* @param string $queue
* @return self
*/
public function setQueue(string $queue): self
{
$this->queue = $queue;
return $this;
}
/**
* 加入队列
*
* @param array $data
* @return integer
*/
public function dispatch(array $data): bool
{
return (bool) $this->redis->lPush($this->queue, $this->handleData($data));
}
/**
* 启动消费者
*
* @return void
*/
public function run(): void
{
$this->start();
}
/**
* ACK操作,删除ack队列的消息
*
* @return void
*/
public function ack(): void
{
$this->redis->lRem($this->getACKQueue(), $this->queueVal, 1);
}
/**
* 开始
*
* @return void
*/
private function start(): void
{
while (true) {
try {
$queueVal = $this->redis->bRPopLPush($this->queue, $this->getACKQueue(), $this->queueTimeout);
if (! $queueVal) {
echo sprintf("sleep %d秒" . PHP_EOL, $this->sleepTimeout);
sleep($this->sleepTimeout);
continue;
}
$this->queueVal = $queueVal;
$this->alterAckQueueVal();
$this->handle($this->decodeData($this->queueVal)['data']);
if ($this->autoAck) {
$this->ack();
}
} catch (Exception $e) {
echo $e->getMessage();
continue;
}
}
}
/**
* 处理加入队列的数据
*
* @param array $data
* @return string
*/
private function handleData(array $data): string
{
return $this->encodeData([
'queue' => $this->queue,
'data' => $data,
'join_date' => date('Y-m-d H:i:s'),
'timeout' => $this->execTimeout
]);
}
/**
* 修改刚加入ack队列的值
*
* @return void
*/
private function alterAckQueueVal(): void
{
$val = $this->decodeData($this->queueVal);
$val['pop_date'] = date('Y-m-d H:i:s');
$val['timeout_date'] = date('Y-m-d H:i:s', (time() + min($this->execTimeout, 10)));
$val['timeout'] = $this->execTimeout;
$newQueueVal = $this->encodeData($val);
$this->redis->multi()
->lPush($this->getACKQueue(), $newQueueVal)
->lRem($this->getACKQueue(), $this->queueVal, 1)
->exec();
$this->queueVal = $newQueueVal;
}
}
AckQueue
<?php
namespace RedisQueue;
use RedisLibsRedisClient;
use RedisQueueQueueTrait;
class AckQueue
{
use QueueTrait;
protected $redis;
protected $ackQueue = 'list:ack';
protected $queueValList = [];
public function __construct()
{
$this->redis = (new RedisClient)->connect();
}
public function run(): void
{
try{
if (($len = $this->redis->lLen($this->ackQueue)) == 0) {
echo '无数据,跳过';
return;
}
for ($i=0; $i < $len; $i++) {
$queueVal = $this->redis->lIndex($this->getACKQueue(), $i);
if (! $queueVal) {
break;
}
$queueValData = $this->decodeData($queueVal);
$originQueue = $queueValData['queue'];
if (! array_key_exists('timeout_date', $queueValData)) {
// 防止误处理刚加入队列的数据
if ((strtotime($queueValData['join_date']) + $queueValData['timeout']) > time()) {
echo '保险起见,暂不处理';
continue;
}
if (in_array($queueVal, $this->queueValList)) {
echo '第二次处理';
$this->makeValToOriginQueue($originQueue, $queueVal);
} else {
$this->queueValList[] = $queueVal;
echo '第一次不处理';
sleep(5);
}
$i--;
continue;
}
if (strtotime($queueValData['timeout_date']) < time()) {
$this->makeValToOriginQueue($originQueue, $queueVal);
$i--;
continue;
}
}
} catch(Exception $e) {
echo $e->getMessage();
}
}
/**
* 还原队列数据到原队列
*
* @param string $originQueue
* @param string $queueVal
* @return void
*/
public function makeValToOriginQueue(string $originQueue, string $queueVal): void
{
$queueValData = $this->decodeData($queueVal);
$queueValData['timeout_date'] = date('Y-m-d H:i:s', (time() + min($queueValData['timeout'], 10)));
$nowQueueVal = $this->encodeData($queueValData);
$this->redis->multi()
->rPush($originQueue, $nowQueueVal)
->lRem($this->getACKQueue(), $queueVal, 1)
->exec();
}
}
QueueTrait
<?php
namespace RedisQueue;
trait QueueTrait
{
/**
* 数据编码
*
* @param array $data
* @return string
*/
private function encodeData(array $data): string
{
return json_encode($data);
return base64_encode(json_encode($data));
}
/**
* 数据解码
*
* @return array
*/
private function decodeData(string $data): array
{
return json_decode($data, true);
return json_decode(base64_decode($data), true);
}
/**
* 获取ACK队列
*
* @return string
*/
private function getACKQueue(): string
{
return 'list:ack';
}
}
使用
消费者
<?php
namespace Example;
use RedisQueueRedisQueueAbstract;;
class ExampleQueue extends RedisQueueAbstract
{
protected $queue;
protected $autoAck = false;
protected $execTimeout = 30;
protected function handle(array $data)
{
print_r($data);
}
}
<?php
namespace Example;
require_once __DIR__ . '/../vendor/autoload.php';
use ExampleExampleQueue;
(new ExampleQueue())->run();
ACK
定时任务执行
<?php
namespace Example;
require_once __DIR__ . '/../vendor/autoload.php';
use RedisQueueAckQueue;
(new AckQueue())->run();