G MEMCACHEQ AS MESSAGE QUEUE
PHP,消息队列,MEMCACHEQ
使用消息队列(MESSAGE QUEUE)可以把某些耗时的工作推后,然后在后台慢慢地去执行,
这样就不会让你的用户等待太久。
今天介绍PHP的消息队列: MEMCACHEQ。
MEMCACHEQ
MEMCACHEQ的特性:
1 简单易用
2 处理速度快
3 多条队列
4 并发性能好
5 与memcache的协议兼容。这就意味着只要装了memcache的extension就可以了,不需要额外的插件。
6 在zend framework中使用也很方便。
MEMCACHEQ依赖于libevent和BerkleyDB。
BerkleyDB用于持久化存储队列的数据。 这样在MEMCACHEQ崩溃或者服务器挂掉的时候,
不至于造成数据的丢失。这一点很重要,很重要。
MemcacheQ will run as a daemon and communicates over the UDP protocol. It can handle concurrent connections and can be used in a load-balanced environment.
启动参数:
memcacheq -d -r -H /data/memcacheq -N -R -v -L 1024 -u nobody &>
/data/memcacheq/error.log
解释一下:
-d 守护进程
-r Maximize core file limit
-v 详细输出
-u 以用户的身份来运行
-H BDB 文件的保存目录
-N Performance improvement
-R 太久的日志文件会被删除。
-L 日志缓存大小,默认是32K。 1024表示1024K。
其它参数:
-h help
-vv 更加详细的输出
不使用-d的话,输出会直接显示到控制台。
ZEND_QUEUE
zend framework有一个与MEMCACHEQ通信的adapter:
Zend_Queue_Adapter_Memcacheq
http://framework.zend.com/manual/en/zend.queue.adapters.html
下面用一个真实的例子来演示如何在zend framework中使用MEMCACHEQ。
一个新闻网站,每条新闻显示的时候,都要显示出来此新闻的访问量,同时还要把它的访问量加1。
这个访问量是保存到news表中的,与news的其它信息保存在一起。
这个数据变化得非常快,缓存的意义不大。
如果每次用户查看新闻的时候,都去数据库中执行一次update visitCount+1的操作,
肯定是比较费力的。
用户必须等待update完成之后,才能看到新闻的内容。
使用MEMCACHEQ之后呢,我们可以把每一次访问都记录到消息队列中,然后在后台再周期性去更新数据库。
写入消息队列的速度是非常快的,比直接更新mysql快得多得多。
在viewNews.php中:
<?php // MemcacheQ config $queueOptions = array( 'name' => 'example-queue', 'driverOptions' => array( 'host' => '127.0.0.1', 'port' => 22201 ) ); // Instantiate Zend_Queue $queue = new Zend_Queue('MemcacheQ', $queueOptions); // Find out if the queue exists if (!$queue->getAdapter()->isExists($queueOptions['name'])) { // If not, create it $queue->createQueue($queueOptions['name']); } // Build a query string (key=val&key=val) because we need a scalar value //用户在$timestamp访问了$page页面。 $params = http_build_query( array( 'timestamp' => $timestamp, 'page' => $page ) ); // Send the data to the queue $queue->send($params);
这样就把这一次访问保存到了消息队列[example-queue]中。
然后再搞个cron,去处理队列中的消息。
<?php // MemcacheQ config $queueOptions = array( 'name' => 'example-queue', 'driverOptions' => array( 'host' => '127.0.0.1', 'port' => 22201 ) ); // Instantiate Zend_Queue $queue = new Zend_Queue('MemcacheQ', $queueOptions); // Retrieve 5 items from the queue $messages = $queue->receive(5); // $message is now a instance of Zend_Queue_Message_Iterator // @TODO: Use a nice FilterIterator ;) foreach ($messages as $job) { if ('creating queue' == $job->body || false === $job->body) { // Skip message continue; } // Parse the query string to a array parse_str($job->body, $data); // Execute the heavy process //更新数据库 $this->registerHit($data['timestamp'], $data['page']); }