<?php
class MQueue
{
public static $client;
private $expire;
private $sleepTime;
private $queueName;
private $retryNum;
private $MAXNUM;
private $canRewrite;
private $HEAD;
private $TAIL;
private $LEN;
const LOCK_KEY = '_Fox_MQ_LOCK_';
const LENGTH_KEY = '_Fox_MQ_LENGTH_';
const VALU_KEY = '_Fox_MQ_VAL_';
const HEAD_KEY = '_Fox_MQ_HEAD_';
const TAIL_KEY = '_Fox_MQ_TAIL_';
public function __construct($queueName = '', $maxqueue = 1, $canRewrite = false, $expire = 0, $config = '')
{
if (empty($config)) {
self::$client = memcache_pconnect('127.0.0.1', 11211);
} elseif (is_array($config)) {
self::$client = memcache_pconnect($config['host'], $config['port']);
} elseif (is_string($config)) {
$tmp = explode(':', $config);
$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
self::$client = memcache_pconnect($conf['host'], $conf['port']);
}
if (!self::$client)
return false;
ignore_user_abort(true);
set_time_limit(0);
$this->access = false;
$this->sleepTime = 1000;
$expire = (empty($expire)) ? 0 : (int) $expire + 1;
$this->expire = $expire;
$this->queueName = $queueName;
$this->retryNum = 20000;
$this->MAXNUM = $maxqueue != null ? $maxqueue : 1;
$this->canRewrite = $canRewrite;
$this->getHeadAndTail();
if (!isset($this->HEAD) || empty($this->HEAD))
$this->HEAD = 0;
if (!isset($this->TAIL) || empty($this->TAIL))
$this->TAIL = 0;
if (!isset($this->LEN) || empty($this->LEN))
$this->LEN = 0;
}
private function getHeadAndTail()
{
$this->HEAD = (int) memcache_get(self::$client, $this->queueName . self::HEAD_KEY);
$this->TAIL = (int) memcache_get(self::$client, $this->queueName . self::TAIL_KEY);
$this->LEN = (int) memcache_get(self::$client, $this->queueName . self::LENGTH_KEY);
}
private function lock()
{
if ($this->access === false) {
$i = 0;
while (!memcache_add(self::$client, $this->queueName . self::LOCK_KEY, 1, false, $this->expire)) {
usleep($this->sleepTime);
@$i++;
if ($i > $this->retryNum) {
return false;
break;
}
}
return $this->access = true;
}
return false;
}
private function incrHead()
{
$this->HEAD++;
if ($this->HEAD >= $this->MAXNUM) {
$this->HEAD = 0;
}
;
$this->LEN--;
if ($this->LEN < 0) {
$this->LEN = 0;
}
;
memcache_set(self::$client, $this->queueName . self::HEAD_KEY, $this->HEAD, false, $this->expire);
memcache_set(self::$client, $this->queueName . self::LENGTH_KEY, $this->LEN, false, $this->expire);
}
private function incrTail()
{
$this->TAIL++;
if ($this->TAIL >= $this->MAXNUM) {
$this->TAIL = 0;
}
;
$this->LEN++;
if ($this->LEN >= $this->MAXNUM) {
$this->LEN = $this->MAXNUM;
}
;
memcache_set(self::$client, $this->queueName . self::TAIL_KEY, $this->TAIL, false, $this->expire);
memcache_set(self::$client, $this->queueName . self::LENGTH_KEY, $this->LEN, false, $this->expire);
}
private function unLock()
{
memcache_delete(self::$client, $this->queueName . self::LOCK_KEY);
$this->access = false;
}
public function isFull()
{
if ($this->canRewrite)
return false;
return $this->LEN == $this->MAXNUM ? true : false;
}
public function isEmpty()
{
return $this->LEN == 0 ? true : false;
}
public function getLen()
{
return $this->LEN;
}
public function push($data = '')
{
$result = false;
if (empty($data))
return $result;
if (!$this->lock()) {
return $result;
}
$this->getHeadAndTail();
if ($this->isFull()) {
$this->unLock();
return false;
}
if (memcache_set(self::$client, $this->queueName . self::VALU_KEY . $this->TAIL, $data, MEMCACHE_COMPRESSED, $this->expire)) {
if ($this->TAIL == $this->HEAD && $this->LEN >= 1) {
$this->incrHead();
}
$this->incrTail();
$result = true;
}
$this->unLock();
return $result;
}
public function pop($length = 0)
{
if (!is_numeric($length))
return false;
if (!$this->lock())
return false;
$this->getHeadAndTail();
if (empty($length))
$length = $this->LEN;
if ($this->isEmpty()) {
$this->unLock();
return false;
}
if ($length > $this->LEN)
$length = $this->LEN;
$data = $this->popKeyArray($length);
$this->unLock();
return $data;
}
private function popKeyArray($length)
{
$result = array();
if (empty($length))
return $result;
for ($k = 0; $k < $length; $k++) {
$result[] = @memcache_get(self::$client, $this->queueName . self::VALU_KEY . $this->HEAD);
@memcache_delete(self::$client, $this->queueName . self::VALU_KEY . $this->HEAD, 0);
if ($this->TAIL == $this->HEAD && $this->LEN <= 1) {
$this->LEN = 0;
memcache_set(self::$client, $this->queueName . self::LENGTH_KEY, $this->LEN, false, $this->expire);
break;
} else {
$this->incrHead();
}
}
return $result;
}
private function reset($all = false)
{
if ($all) {
memcache_delete(self::$client, $this->queueName . self::HEAD_KEY, 0);
memcache_delete(self::$client, $this->queueName . self::TAIL_KEY, 0);
memcache_delete(self::$client, $this->queueName . self::LENGTH_KEY, 0);
} else {
$this->HEAD = $this->TAIL = $this->LEN = 0;
memcache_set(self::$client, $this->queueName . self::HEAD_KEY, 0, false, $this->expire);
memcache_set(self::$client, $this->queueName . self::TAIL_KEY, 0, false, $this->expire);
memcache_set(self::$client, $this->queueName . self::LENGTH_KEY, 0, false, $this->expire);
}
}
public function memFlush()
{
memcache_flush(self::$client);
}
public function clear($all = false)
{
if (!$this->lock())
return false;
$this->getHeadAndTail();
$Head = $this->HEAD;
$Length = $this->LEN;
$curr = 0;
for ($i = 0; $i < $Length; $i++) {
$curr = $this->$Head + $i;
if ($curr >= $this->MAXNUM) {
$this->HEAD = $curr = 0;
}
@memcache_delete(self::$client, $this->queueName . self::VALU_KEY . $curr, 0);
}
$this->unLock();
$this->reset($all);
return true;
}
}