过去几年中,我们一直在使用、构建和宣传消息队列,我们认为它们是很令人敬畏的,这也不是什么秘密。我们相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: 1. 解耦 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 |
2. 冗余 有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。 3. 扩展性 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 |
4. 灵活性 & 峰值处理能力 当你的应用上了Hacker News的首页,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。请查看我们关于峰值处理能力的博客文章了解更多此方面的信息。 5. 可恢复性 当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。 |
6. 送达保证 消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。 |
7.排序保证 在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。 8.缓冲 在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。 |
9. 理解数据流 在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。 10. 异步通信 很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。 |
我们相信上述十个原因,使得消息队列成为在进程或应用之间进行通信的最好形式。我们已经花费了一年时间来创建和学习IronMQ,我们的客户也通过消息队列完成了许多不可思议的事情。队列是创建强大的分布式应用的关键,它可以利用云技术所提供的所有强大能量。 如果现在你想要开始使用一个高效的、可靠的、托管的消息队列,下载IronMQ吧。如果你想联系我们的工程师,咨询如何把队列集成到你的应用中去,他们随时欢迎你的访问:get.iron.io/chat。 |
英文原文:Top 10 Uses For A Message Queue
最简单的用mysql实现消息队列:
http://www.jb51.net/article/30164.htm
php Memcache 中实现消息队列
Memcache 一般用于缓存服务。但是很多时候,比如一个消息广播系统,需要一个消息队列。直接从数据库取消息,负载往往不行。如果将整个消息队列用一个key缓存到memcache里面.
对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。
如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。下面是代码:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
class Memcache_Queue { private $memcache; private $name; private $prefix; function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") { if ($memcache == null) { throw new Exception("memcache object is null, new the object first."); } $this->memcache = $memcache; $this->name = $name; $this->prefix = $prefix; $this->maxSize = $maxSize; $this->front = 0; $this->real = 0; $this->size = 0; } function __get($name) { return $this->get($name); } function __set($name, $value) { $this->add($name, $value); return $this; } function isEmpty() { return $this->size == 0; } function isFull() { return $this->size == $this->maxSize; } function enQueue($data) { if ($this->isFull()) { throw new Exception("Queue is Full"); } $this->increment("size"); $this->set($this->real, $data); $this->set("real", ($this->real + 1) % $this->maxSize); return $this; } function deQueue() { if ($this->isEmpty()) { throw new Exception("Queue is Empty"); } $this->decrement("size"); $this->delete($this->front); $this->set("front", ($this->front + 1) % $this->maxSize); return $this; } function getTop() { return $this->get($this->front); } function getAll() { return $this->getPage(); } function getPage($offset = 0, $limit = 0) { if ($this->isEmpty() || $this->size < $offset) { return null; } $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize); $num = 1; for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); $num++; if ($limit > 0 && $limit == $num) { break; } } return array_values($this->memcache->get($keys)); } function makeEmpty() { $keys = $this->getAllKeys(); foreach ($keys as $value) { $this->delete($value); } $this->delete("real"); $this->delete("front"); $this->delete("size"); $this->delete("maxSize"); } private function getAllKeys() { if ($this->isEmpty()) { return array(); } $keys[] = $this->getKeyByPos($this->front); for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); } return $keys; } private function add($pos, $data) { $this->memcache->add($this->getKeyByPos($pos), $data); return $this; } private function increment($pos) { return $this->memcache->increment($this->getKeyByPos($pos)); } private function decrement($pos) { $this->memcache->decrement($this->getKeyByPos($pos)); } private function set($pos, $data) { $this->memcache->set($this->getKeyByPos($pos), $data); return $this; } private function get($pos) { return $this->memcache->get($this->getKeyByPos($pos)); } private function delete($pos) { return $this->memcache->delete($this->getKeyByPos($pos)); } private function getKeyByPos($pos) { return $this->prefix . $this->name . $pos; } }
项目中需要实现一个简单的队列,为了这么点小小的功能, 再去新搭建一个 RabbitMQ, Redis或者MongoDB 之类的东西, 感觉有点动静太大,为了吃根香肠何必杀一头猪? 所以考虑用手头上现有的memcache 搭建一个简单的内存消息队列.
memcache功能太简单了,只能 set get 和delete, 只能保存key-value的数据, 不能保存列表。 当然也可以把一个列表给序列化了之后存进memcache, 但是会存在并发的问题, 每次保存数据(插队或者出队)的时候都要给数据加锁,在高并发的情况下很难保证数据的一致性!
但是memcache 有一个 increment 的操作,为某一个键对应的值进行加1(实际上是加法运算, 默认加1), 这个操作是原子性的, 所以我们可以通过这个来维护一个自增的id来保证数据的唯一。 再加上两个指针来维护起始键值, 这样就构建了一个简单的但相队列!!
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
<?php /** * memcache构建的简单内存队列 * * @author: jeffjing */ class memList { private $memcache; // memcache类 private $queKeyPrefix; //数据键前缀 private $startKey; //开始指针键 private $startKey; //结束指针键 public function __construct($key){ $this->queKeyPrefix = "MEMQUE_{$key}_"; $this->startKey = "MEMQUE_SK_{$key}"; $this->endKey = "MEMQUE_EK_{$key}"; } /** * 获取列表 * 先拿到开始结束指针, 然后去拿数据 * * @return array */ public function getList(){ $startP = $this->memcache->get($this->startKey); $endP = $this->memcache->get($this->endKey); empty($startP) && $startP = 0; empty($endP) && $endP = 0; $arr = array(); for($i = $startP ; $i < $endP; ++$i) { $key = $this->queKeyPrefix . $i; $arr[] = $this->memcache->get($key); } return $arr; } /** * 插入队列 * 结束指针后移,拿到一个自增的id * 再把值存到指针指定的位置 * * @return void */ public function in($value){ $index = $this->memcache->increment($this->endKey); $key = $this->queKeyPrefix . $index; $this->memcache->set($key, $value); } /** * 出队 * 很简单, 把开始值取出后开始指针后移 * * @return mixed */ public function out(){ $result = $this->memcache->get($this->startKey); $this->memcache->increment($this->startKey); return $result; } }