zoukankan      html  css  js  c++  java
  • redis 队列及ACK代码实现

    概述

    使用PHP+Redis简单实现一下队列以及ACK,确保服务的可靠性。

    流程图

    流程图

    代码

        

    RedisQueueAbstract
    <?php

    namespace RedisQueue;

    use RedisLibsRedisClient;
    use RedisQueueQueueTrait;

    abstract class RedisQueueAbstract
    {
    use QueueTrait;

    // redis
    protected $redis;

    // 队列
    protected $queue;

    // 队列值
    protected $queueVal;

    // 自动 Ack
    protected $autoAck = true;

    // 队列阻塞时间
    protected $queueTimeout = 20;

    // 无数据休眠时间
    protected $sleepTimeout = 5;

    // 处理超时时间,最小10秒
    protected $execTimeout = 60;

    public function __construct()
    {
    $this->redis = (new RedisClient)->connect();
    }

    /**
    * 处理取出的队列数据
    *
    * @param array $data
    * @return mixed
    */
    abstract protected function handle(array $data);

    /**
    * 设置队列
    *
    * @param string $queue
    * @return self
    */
    public function setQueue(string $queue): self
    {
    $this->queue = $queue;
    return $this;
    }

    /**
    * 加入队列
    *
    * @param array $data
    * @return integer
    */
    public function dispatch(array $data): bool
    {
    return (bool) $this->redis->lPush($this->queue, $this->handleData($data));
    }

    /**
    * 启动消费者
    *
    * @return void
    */
    public function run(): void
    {
    $this->start();
    }

    /**
    * ACK操作,删除ack队列的消息
    *
    * @return void
    */
    public function ack(): void
    {
    $this->redis->lRem($this->getACKQueue(), $this->queueVal, 1);
    }

    /**
    * 开始
    *
    * @return void
    */
    private function start(): void
    {
    while (true) {
    try {
    $queueVal = $this->redis->bRPopLPush($this->queue, $this->getACKQueue(), $this->queueTimeout);
    if (! $queueVal) {
    echo sprintf("sleep %d秒" . PHP_EOL, $this->sleepTimeout);
    sleep($this->sleepTimeout);
    continue;
    }
    $this->queueVal = $queueVal;
    $this->alterAckQueueVal();
    $this->handle($this->decodeData($this->queueVal)['data']);
    if ($this->autoAck) {
    $this->ack();
    }
    } catch (Exception $e) {
    echo $e->getMessage();
    continue;
    }
    }
    }

    /**
    * 处理加入队列的数据
    *
    * @param array $data
    * @return string
    */
    private function handleData(array $data): string
    {
    return $this->encodeData([
    'queue' => $this->queue,
    'data' => $data,
    'join_date' => date('Y-m-d H:i:s'),
    'timeout' => $this->execTimeout
    ]);
    }

    /**
    * 修改刚加入ack队列的值
    *
    * @return void
    */
    private function alterAckQueueVal(): void
    {
    $val = $this->decodeData($this->queueVal);
    $val['pop_date'] = date('Y-m-d H:i:s');
    $val['timeout_date'] = date('Y-m-d H:i:s', (time() + min($this->execTimeout, 10)));
    $val['timeout'] = $this->execTimeout;
    $newQueueVal = $this->encodeData($val);
    $this->redis->multi()
    ->lPush($this->getACKQueue(), $newQueueVal)
    ->lRem($this->getACKQueue(), $this->queueVal, 1)
    ->exec();
    $this->queueVal = $newQueueVal;
    }
    }


    AckQueue
    <?php

    namespace RedisQueue;

    use RedisLibsRedisClient;
    use RedisQueueQueueTrait;

    class AckQueue
    {

    use QueueTrait;

    protected $redis;

    protected $ackQueue = 'list:ack';

    protected $queueValList = [];

    public function __construct()
    {
    $this->redis = (new RedisClient)->connect();
    }

    public function run(): void
    {
    try{
    if (($len = $this->redis->lLen($this->ackQueue)) == 0) {
    echo '无数据,跳过';
    return;
    }
    for ($i=0; $i < $len; $i++) {
    $queueVal = $this->redis->lIndex($this->getACKQueue(), $i);
    if (! $queueVal) {
    break;
    }
    $queueValData = $this->decodeData($queueVal);
    $originQueue = $queueValData['queue'];
    if (! array_key_exists('timeout_date', $queueValData)) {
    // 防止误处理刚加入队列的数据
    if ((strtotime($queueValData['join_date']) + $queueValData['timeout']) > time()) {
    echo '保险起见,暂不处理';
    continue;
    }
    if (in_array($queueVal, $this->queueValList)) {
    echo '第二次处理';
    $this->makeValToOriginQueue($originQueue, $queueVal);
    } else {
    $this->queueValList[] = $queueVal;
    echo '第一次不处理';
    sleep(5);
    }
    $i--;
    continue;
    }
    if (strtotime($queueValData['timeout_date']) < time()) {
    $this->makeValToOriginQueue($originQueue, $queueVal);
    $i--;
    continue;
    }
    }
    } catch(Exception $e) {
    echo $e->getMessage();
    }
    }

    /**
    * 还原队列数据到原队列
    *
    * @param string $originQueue
    * @param string $queueVal
    * @return void
    */
    public function makeValToOriginQueue(string $originQueue, string $queueVal): void
    {
    $queueValData = $this->decodeData($queueVal);
    $queueValData['timeout_date'] = date('Y-m-d H:i:s', (time() + min($queueValData['timeout'], 10)));
    $nowQueueVal = $this->encodeData($queueValData);
    $this->redis->multi()
    ->rPush($originQueue, $nowQueueVal)
    ->lRem($this->getACKQueue(), $queueVal, 1)
    ->exec();
    }
    }


    QueueTrait
    <?php

    namespace RedisQueue;

    trait QueueTrait
    {

    /**
    * 数据编码
    *
    * @param array $data
    * @return string
    */
    private function encodeData(array $data): string
    {
    return json_encode($data);
    return base64_encode(json_encode($data));
    }

    /**
    * 数据解码
    *
    * @return array
    */
    private function decodeData(string $data): array
    {
    return json_decode($data, true);
    return json_decode(base64_decode($data), true);
    }

    /**
    * 获取ACK队列
    *
    * @return string
    */
    private function getACKQueue(): string
    {
    return 'list:ack';
    }
    }

    使用
    消费者
    <?php

    namespace Example;

    use RedisQueueRedisQueueAbstract;;

    class ExampleQueue extends RedisQueueAbstract
    {

    protected $queue;

    protected $autoAck = false;

    protected $execTimeout = 30;


    protected function handle(array $data)
    {
    print_r($data);
    }
    }

    <?php

    namespace Example;

    require_once __DIR__ . '/../vendor/autoload.php';

    use ExampleExampleQueue;

    (new ExampleQueue())->run();


    ACK
    定时任务执行

    <?php

    namespace Example;

    require_once __DIR__ . '/../vendor/autoload.php';

    use RedisQueueAckQueue;

    (new AckQueue())->run();

  • 相关阅读:
    iOS中block实现的探究
    [ES6] 19. for ... of
    [ES6] 18. Map
    [ES6] 17. Set
    [ES6] 16. Object Enhancements
    [MEAN Stack] First API -- 5. Using $resource to setup REST app
    [AngularJS] Provider
    [MEAN Stack] First API -- 4. Organize app structure
    [AngularJS] ngCloak
    [Angular-Scaled Web] 9. Control your promises with $q
  • 原文地址:https://www.cnblogs.com/liliuguang/p/13219145.html
Copyright © 2011-2022 走看看