我们使用memcached客户端进行get、set还是比较方便的。比如说一个get的操作:
MemcachedClient c = new MemcachedClient(new InetSocketAddress("127.0.0.1", 11211)); c.get("akey");
然而,这两句代码的背后是什么呢?看一下源码吧^_^
首先,随便搞个spymemcached的源码包下来。我用的是2.9的spymemcached。maven pom dependency节点:
<dependency> <groupId>net.spy</groupId> <artifactId>spymemcached</artifactId> <version>2.9.0</version> </dependency>
以下是我整理的spymemcached的get流程。
根据源码,主要流程在2个类中:MemcachedClient.java、MemcachedConnection.java。其中MemcachedConnection.java是最重要的。
简单流程如下所示:
启动:
1、创建mcClient。
MemcachedClient c = new MemcachedClient(new InetSocketAddress("127.0.0.1", 11211));
通过里面的工厂,创建MemcachedConnection线程对象。MemcachedConnection是继承于Thread类的,是一个线程。
MemcachedConnection这个线程对象启动后,无限循环监听操作队列(IO事件):
/** * Infinitely loop processing IO. */ @Override public void run() { while (running) { try { handleIO(); } catch (IOException e) { logRunException(e); } catch (CancelledKeyException e) { logRunException(e); } catch (ClosedSelectorException e) { logRunException(e); } catch (IllegalStateException e) { logRunException(e); } catch (ConcurrentModificationException e) { logRunException(e); } } getLogger().info("Shut down memcached client"); }
客户端调用get方法:
1、用户调用MemcachedClient.java的get方法。期望获取memcached中某个key的内容。
2、get方法调用asyncGet方法。把get请求通过countDownLatch,并且future对象获取数据(异步非阻塞)。
2.1、把key定位到某个memcached node中(不管1只还是多只memcached,都用数组存储了,即使只有1个memcached点都是在跑一致性hash的)。注意,这里是有故障转移机制的如果某个点的memcached不可用,会使用另外一个点:
protected void addOperation(final String key, final Operation o) { MemcachedNode placeIn = null; MemcachedNode primary = locator.getPrimary(key); if (primary.isActive() || failureMode == FailureMode.Retry) { placeIn = primary; } else if (failureMode == FailureMode.Cancel) { o.cancel(); } else { // Look for another node in sequence that is ready. for (Iterator<MemcachedNode> i = locator.getSequence(key); placeIn == null && i.hasNext();) { MemcachedNode n = i.next(); if (n.isActive()) { placeIn = n; } } // If we didn't find an active node, queue it in the primary node // and wait for it to come back online. if (placeIn == null) { placeIn = primary; this.getLogger().warn( "Could not redistribute " + "to another node, retrying primary node for %s.", key); } } assert o.isCancelled() || placeIn != null : "No node found for key " + key; if (placeIn != null) { addOperation(placeIn, o); } else { assert o.isCancelled() : "No node found for " + key + " (and not immediately cancelled)"; } }
2.2、把这个操作,放到MemcachedConnection的操作队列中。
(MemcachedClient更多的是一个facade,核心业务都在MemcachedConnection中)
处理任务:
1、通过NIO的selector模式。获取readIO事件。
2、handleReads方法,数据返回。
总结
spymemcached是基于nio的,一个无阻塞设计的memcached客户端。不会因为并发数过大而导致系统崩溃。