上一篇写的是IoAcceptor是服务器端的接收代码,今天要写的是IoConnector,是客户端的连接器。在昨天,我们还留下一些问题没有解决,这些问题今天同样会产生,但是都要等到讲到session的时候才能逐步揭开。先回顾一下问题:
l 我们已经在AbstractPollingIoAcceptor中看到了,mina是将连接(命令)和业务(读写)分不同线程处理的,但是我们还没有看到mina是如何实现对这些线程的管理的。
l 在昨天的最后,我们看到在NioSocketAcceptor中的具体NIO实现,但是我们还没有看到mina是在哪里调用了这些具体操作的。当然这也是mina对连接线程管理的一部分。
这些问题今天也会出现,因为从名字上就能看出,IoConnector和IoAcceptor的构造相差不大,所以在写connector的分析时,主要会从结构和差异上入手,最后再给出昨天没写完的删减版的Acceptor。先回顾一下客户端连接的代码:
02 |
IoConnector
connector = new NioSocketConnector(); |
04 |
connector.setConnectTimeout( 30000 ); |
06 |
connector.getFilterChain().addLast( |
08 |
new ProtocolCodecFilter( new MessageCodecFactory( |
09 |
new InfoMessageDecoder(Charset.forName( "utf-8" )), |
10 |
new InfoMessageEncoder(Charset.forName( "utf-8" ))))); |
12 |
connector.setHandler( new ClientHandler()); |
13 |
IoSession
session = null ; |
15 |
ConnectFuture
future = connector.connect( new InetSocketAddress( |
17 |
future.awaitUninterruptibly(); |
18 |
session
= future.getSession(); |
还是先看IoConnector的结构图,图来自mina官网,用XMind绘制的。在写构成之前,我们还是先看一下mina官网对这些connector的介绍:
As we have to use an IoAcceptor for servers, you have to implement the IoConnector. Again, we have many implementation classes :
- NioSocketConnector : the non-blocking Socket transport Connector
- NioDatagramConnector : the non-blocking UDP transport * Connector*
- AprSocketConnector : the blocking Socket transport * Connector*, based on APR
- ProxyConnector : a Connector providing proxy support
- SerialConnector : a Connector for a serial transport
- VmPipeConnector : the in-VM * Connector*
其中,NioSocketConnector是我们最常用到的,proxy方式虽然在mina的源码中也花了大篇幅去撰写,但可惜的是很少有相关的文档,所以学习的成本还挺高的。今天我们主要还是按照上图画的两条路来看NioSocketConnector。
和昨天一样,我们还是从左边的路走起,看interface IoConnector,这个接口主要定义了连接的方法以及socket连接时用到的参数。在mina中通过IoFuture来描述、侦听在IoSession上实现的异步IO操作,所以这IoConnector中的connect方法都返回了一个ConnectFuture实例。

而在SocketConnector接口中的定义中就显得更简单了,它和IoConnector之间也是接口的继承关系,在SocketConnector中就定义了两类方法,一个对远程地址的get和set,一个拿到session的配置。这些都容易理解。
再来看右边,AbstractIoConnector,这个抽象类主要作用就是实现IoConnector里定义的操作,至于他又继承了AbstractIoService,一是为了用到父类(AbstractIoService)的方法,二是为了将那些父类没有实现的方法继续传递下去,让它(AbstractIoConnector)的子类去实现。所以看多了,好多结构也能看明白了,这里我觉得主要要学习的还是接口、抽象类之间的引用关系。
继续看AbstractIoConnector,这个类主要是实现了connect的逻辑操作(封装了连接前后的一些必要执行步骤和check一些状态),具体的连接操作还是让子类去实现,这个和上篇写的AbstractIoAcceptor一模一样,在AbstractIoAcceptor中,主要也是封装了bind的逻辑操作,真正的bind过程是让子类去实现的简单看下代码:
01 |
public final ConnectFuture
connect(SocketAddress remoteAddress, SocketAddress localAddress, |
02 |
IoSessionInitializer<? extends ConnectFuture>
sessionInitializer) { |
04 |
throw new IllegalStateException( "The
connector has been disposed." ); |
07 |
if (remoteAddress
== null )
{ |
08 |
throw new IllegalArgumentException( "remoteAddress" ); |
11 |
if (!getTransportMetadata().getAddressType().isAssignableFrom(remoteAddress.getClass()))
{ |
12 |
throw new IllegalArgumentException( "remoteAddress
type: " +
remoteAddress.getClass() + "
(expected: " |
13 |
+
getTransportMetadata().getAddressType() + ")" ); |
16 |
if (localAddress
!= null &&
!getTransportMetadata().getAddressType().isAssignableFrom(localAddress.getClass())) { |
17 |
throw new IllegalArgumentException( "localAddress
type: " +
localAddress.getClass() + "
(expected: " |
18 |
+
getTransportMetadata().getAddressType() + ")" ); |
21 |
if (getHandler()
== null )
{ |
22 |
if (getSessionConfig().isUseReadOperation())
{ |
23 |
setHandler( new IoHandler()
{ |
24 |
public void exceptionCaught(IoSession
session, Throwable cause) throws Exception
{ |
28 |
public void messageReceived(IoSession
session, Object message) throws Exception
{ |
32 |
public void messageSent(IoSession
session, Object message) throws Exception
{ |
36 |
public void sessionClosed(IoSession
session) throws Exception
{ |
40 |
public void sessionCreated(IoSession
session) throws Exception
{ |
44 |
public void sessionIdle(IoSession
session, IdleStatus status) throws Exception
{ |
48 |
public void sessionOpened(IoSession
session) throws Exception
{ |
53 |
throw new IllegalStateException( "handler
is not set." ); |
57 |
return connect0(remoteAddress,
localAddress, sessionInitializer); |
61 |
protected abstract ConnectFuture
connect0(SocketAddress remoteAddress, SocketAddress localAddress, |
62 |
IoSessionInitializer<? extends ConnectFuture>
sessionInitializer); |
Connect0才是最后具体的操作,而这一步操作在这个类中被没有给出实现。具体实现放在了AbstractPollingIoConnector上。和昨天一样,这些设计都是对称的,我们还是看三点:
l implementing client transport using a polling strategy
l A Executor will be used for running client connection, and an AbstractPollingIoProcessor will be used for processing connected client I/O operations like reading, writing and closing.
l All the low level methods for binding, connecting, closing need to be provided by the subclassing implementation
至于内部的具体实现,那跟acceptor中没什么区别,连使用的工具类都差别不大,这部分就很容易读懂了,只不过一个是bind一个是connect。
最后我们看NioSocketConnector,具体连接的实现类,只有一个成员变量和NioSocketAcceptor一样:
01 |
private volatile Selector
selector; |
04 |
protected SocketChannel
newHandle(SocketAddress localAddress) throws Exception
{ |
05 |
SocketChannel
ch = SocketChannel.open(); |
07 |
int receiveBufferSize
= (getSessionConfig()).getReceiveBufferSize(); |
08 |
if (receiveBufferSize
> 65535 )
{ |
09 |
ch.socket().setReceiveBufferSize(receiveBufferSize); |
12 |
if (localAddress
!= null )
{ |
13 |
ch.socket().bind(localAddress); |
15 |
ch.configureBlocking( false ); |
只是需要注意,这里面专门有个内部类来处理selectionkey,将遍历的过程都抽离出来了,这个和我们用NIO的一般写法稍有不同,这样做的好处也是为了复用:
01 |
private static class SocketChannelIterator implements Iterator<SocketChannel>
{ |
03 |
private final Iterator<SelectionKey>
i; |
05 |
private SocketChannelIterator(Collection<SelectionKey>
selectedKeys) { |
06 |
this .i
= selectedKeys.iterator(); |
12 |
public boolean hasNext()
{ |
19 |
public SocketChannel
next() { |
20 |
SelectionKey
key = i.next(); |
21 |
return (SocketChannel)
key.channel(); |
27 |
public void remove()
{ |
---------------------------------------------------------
补一个上篇就应该发的acceptor的阉割版,写这样的东西主要还是为了理清楚结构。我主要是把内容简化了,但是结构都没有变,核心的成员变量也没有少:
起点IoService:
01 |
package org.apache.mina.core.rewrite.service; |
04 |
*
IO Service --handler/processor/acceptor/connector |
09 |
public interface IoService
{ |
11 |
void addListener(IoServiceListener
listener); |
14 |
void dispose( boolean awaitTermination); |
17 |
IoHandler
getHandler(); |
19 |
void setHandler(IoHandler
handler); |
22 |
int getManagedSessionCount(); |
左边的路
01 |
package org.apache.mina.core.rewrite.service; |
03 |
import java.io.IOException; |
04 |
import java.net.SocketAddress; |
10 |
*
Acceptor 主要用于:Accepts incoming connection, communicates with clients, and |
11 |
*
fires events to IoHandler |
15 |
public interface IoAcceptor extends IoService
{ |
17 |
SocketAddress
getLocalAddress(); |
19 |
Set<SocketAddress>
getLocalAddresses(); |
21 |
void bind(SocketAddress
localAddress) throws IOException; |
23 |
void bind(Iterable<? extends SocketAddress>
localAddresses) throws IOException; |
25 |
void unbind(SocketAddress
localAddress); |
28 |
/**没有写到IoSession
所以暂时不用*/ |
SocketAcceptor:
01 |
package org.apache.mina.rewrite.transport.socket; |
03 |
import java.net.InetSocketAddress; |
05 |
import org.apache.mina.core.rewrite.service.IoAcceptor; |
07 |
public interface SocketAcceptor extends IoAcceptor
{ |
09 |
InetSocketAddress
getLocalAddress(); |
11 |
void setDefaultLocalAddress(InetSocketAddress
localAddress); |
13 |
public boolean isReuseAddress(); |
再看右边的
001 |
package org.apache.mina.core.rewrite.service; |
003 |
import java.util.concurrent.Executor; |
004 |
import java.util.concurrent.ExecutorService; |
005 |
import java.util.concurrent.Executors; |
006 |
import java.util.concurrent.TimeUnit; |
007 |
import java.util.concurrent.atomic.AtomicInteger; |
009 |
public abstract class AbstractIoService implements IoService
{ |
011 |
private static final AtomicInteger
id = new AtomicInteger(); |
013 |
private final String
threadName; |
015 |
private final Executor
executor; |
017 |
private final boolean createdExecutor; |
019 |
private IoHandler
handler; |
022 |
protected final Object
disposalLock = new Object(); |
024 |
private volatile boolean disposing; |
026 |
private volatile boolean disposed; |
031 |
*
sessionConfig IoSessionConfig |
033 |
*
used for handling execution of IO event. can be null |
035 |
protected AbstractIoService(Object
param, Executor executor) { |
039 |
if (executor
== null )
{ |
040 |
this .executor
= Executors.newCachedThreadPool(); |
041 |
createdExecutor
= true ; |
043 |
this .executor
= executor; |
044 |
createdExecutor
= false ; |
047 |
threadName
= getClass().getSimpleName() + "-" +
id.incrementAndGet(); |
051 |
public void addListener(IoServiceListener
listener) { |
055 |
/**注意这个不是override来的*/ |
056 |
protected final void ececuteWorker(Runnable
worker, String suffix){ |
058 |
String
actualThreadName=threadName; |
060 |
actualThreadName=actualThreadName+ "-" +suffix; |
062 |
executor.execute(worker); |
066 |
public void dispose( boolean awaitTermination)
{ |
071 |
synchronized (disposalLock)
{ |
077 |
} catch (Exception
e) { |
083 |
if (createdExecutor)
{ |
084 |
ExecutorService
e = (ExecutorService) executor; |
087 |
if (awaitTermination)
{ |
090 |
e.awaitTermination(Integer.MAX_VALUE,
TimeUnit.SECONDS); |
092 |
} catch (InterruptedException
e1) { |
094 |
Thread.currentThread().interrupt(); |
101 |
protected abstract void dispose0() throws Exception; |
104 |
public IoHandler
getHandler() { |
109 |
public void setHandler(IoHandler
handler) { |
110 |
if (handler
== null )
{ |
111 |
throw new IllegalArgumentException( "handler
cannot be null" ); |
115 |
throw new IllegalStateException( "when
service is active, cannot be set handler" ); |
118 |
this .handler
= handler; |