zoukankan      html  css  js  c++  java
  • Memcache 中实现消息队列

        Memcache 一般用于缓存服务。但是很多时候,比如一个消息广播系统,需要一个消息队列。直接从数据库取消息,负载往往不行。如果将整个消息队列用一个key缓存到memcache里面,

    对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。

        如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。下面是代码:

    class Memcache_Queue
    {
        
    private $memcache;

        
    private $name;

        
    private $prefix;

        
    function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")
        {
            
    if ($memcache == null) {
                
    throw new Exception("memcache object is null, new the object first.");
            }
            
    $this->memcache = $memcache;
            
    $this->name = $name;
            
    $this->prefix = $prefix;

            
    $this->maxSize = $maxSize;
            
    $this->front = 0;
            
    $this->real = 0;
            
    $this->size = 0;
        }

        
    function __get($name)
        {
            
    return $this->get($name);
        }

        
    function __set($name, $value
        {
            
    $this->add($name, $value);
            
    return $this;
        }

        
    function isEmpty() 
        {
            
    return $this->size == 0;
        }

        
    function isFull()
        {
            
    return $this->size == $this->maxSize;
        }

        
    function enQueue($data)
        {
            
    if ($this->isFull()) {
                
    throw new Exception("Queue is Full");
            }
            
    $this->increment("size");
            
    $this->set($this->real, $data);
            
    $this->set("real", ($this->real + 1% $this->maxSize);
            
    return $this;
        }

        
    function deQueue()
        {
            
    if ($this->isEmpty()) {
                
    throw new Exception("Queue is Empty");
            }
            
    $this->decrement("size");
            
    $this->delete($this->front);
            
    $this->set("front", ($this->front + 1% $this->maxSize);
            
    return $this;
        }

        
    function getTop()
        {
            
    return $this->get($this->front);
        }

        
    function getAll()
        {
            
    return $this->getPage();
        }

        
    function getPage($offset = 0, $limit = 0)
        {
            
    if ($this->isEmpty() || $this->size < $offset) {
                
    return null;
            }
            
    $keys[] = $this->getKeyByPos(($this->front + $offset% $this->maxSize);
            
    $num = 1;
            
    for ($pos = ($this->front + $offset + 1% $this->maxSize; $pos != $this->real; $pos = ($pos + 1% $this->maxSize) 
            {
                 
    $keys[] = $this->getKeyByPos($pos);
                 
    $num++;
                 
    if ($limit > 0 && $limit == $num) {
                     
    break;
                 }
            }
            
    return array_values($this->memcache->get($keys));
        }

        
    function makeEmpty()
        {
            
    $keys = $this->getAllKeys();
            
    foreach ($keys as $value) {
                
    $this->delete($value);
            }
            
    $this->delete("real");
            
    $this->delete("front");
            
    $this->delete("size");
            
    $this->delete("maxSize");
        }

        
    private function getAllKeys()
        {
            
    if ($this->isEmpty()) 
            {
                
    return array();
            }
            
    $keys[] = $this->getKeyByPos($this->front);
            
    for ($pos = ($this->front + 1% $this->maxSize; $pos != $this->real; $pos = ($pos + 1% $this->maxSize) 
            {
                
    $keys[] = $this->getKeyByPos($pos);
            }
            
    return $keys;
        }

        
    private function add($pos, $data
        {
            
    $this->memcache->add($this->getKeyByPos($pos), $data);
            
    return $this;
        }

        
    private function increment($pos)
        {
            
    return $this->memcache->increment($this->getKeyByPos($pos));
        }

        
    private function decrement($pos
        {
            
    $this->memcache->decrement($this->getKeyByPos($pos));
        }

        
    private function set($pos, $data
        {
            
    $this->memcache->set($this->getKeyByPos($pos), $data);
            
    return $this;
        }

        
    private function get($pos)
        {
            
    return $this->memcache->get($this->getKeyByPos($pos));
        }

        
    private function delete($pos)
        {
            
    return $this->memcache->delete($this->getKeyByPos($pos));
        }

        
    private function getKeyByPos($pos)
        {
            
    return $this->prefix . $this->name . $pos;
        }
    }
  • 相关阅读:
    创建网络数据集
    [虚拟机]Virtual Box的使用--共享文件夹
    MapControl图层删除或添加触发监听事件
    Dev控件GridView单元格绑定控件
    Log4net中换行符
    没有为 COM 互操作注册程序集 请使用 regasm.exe /tlb 注册该程序集——解决办法
    加载dll过程中assembly失败
    Spring Boot 自带缓存及结合 Redis 使用
    Spring Boot + Redis 初体验
    在 Windows 中使用 C# 启动其他程序
  • 原文地址:https://www.cnblogs.com/niniwzw/p/1604677.html
Copyright © 2011-2022 走看看