一 .入口
在我们的服务器启动的代码之中,存在如下的代码:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)
我们调用channel方法传入来一个NioServerSocketChannel的字节码对象.
服务端的channel正是使用这个字节码对象通过反射的方式进行创建的.
我们回到主程序的入口bind()方法.
二 .初始化Channel对象
ChannelFuture regFuture = initAndRegister();
在bind()方法之中的第一行代码如上.
在该方法之中包含核心的逻辑分成三个:
创建Channel对象
初始化Channel对象
注册Channel对象.
我们下面分析Channel的创建.
channel = channelFactory.newChannel();
通过ChannelFactory对象调用newChannel()方法创建得到Channel对象.
public T newChannel() { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
我们发现是通过反射创建的对象.
下面我们需要分析一下Channel的默认构造函数.
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
我们首先看看newSocket()方法的实现.
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
实际上直接调用jdk的方法创建一个ServerSocketChannel对象.
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); // 创建NioServerSocketChannelConfig 与当前的对象进行绑定,后面都使用这个配置对象进行Channel的配置 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
在该构造函数之中包含两个逻辑,一个调用父类的构造函数完成构造,另外一个就是创建conifg对象,保存我们对服务端channel的配置信息.
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); // 保存jdk的Channel this.ch = ch; // 保存感兴趣的事件 this.readInterestOp = readInterestOp; try { // 配置Channel为非阻塞的模式 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
我们需要看看父类的构造函数.
protected AbstractChannel(Channel parent) { this.parent = parent; // 创建Channel的id id = newId(); // 创建unsafe()对象,这个对象在后面会介绍 unsafe = newUnsafe(); // 创建ChannelPipeline对象,也就是说一个Channel对象会和一个ChannelPipeline进行绑定 pipeline = newChannelPipeline(); }
构造了一个Channel最为核心的组件,unsafe对象和pipeline对象.
好了现在我们完成了Channel的创建过程,下面分析Channel的初始化工作.
init(channel);
这个方法实际上是一个抽象方法,由ServerBootStrap进行实现.
void init(Channel channel) throws Exception { // 保存属性 final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); // 这个也就是我们说的workerGroup对象 final EventLoopGroup currentChildGroup = childGroup; // 这个就是我们保存的childChannel,也就是初始化的ChannelHandler. final ChannelHandler currentChildHandler = childHandler; // 保存childChannel的属性,也就是那些tcp属性 final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } // 向管道之中增加一个Channel的初始化器 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 我们通过handler()方法,存储的Handler对象 ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 会向pipeline对象之中增加一个ServerBootstrapAcceptor 对象 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( // channel , 当前的处理器 子channel的配置属性 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
代码比较多,但是核心的代码吗主要分成下面的几个部分.
[1] 保存服务端channel的配置信息
[2]保存客户端连接的配置信息.
[3]如果服务器端拥有handle,那么配置进去.
[4] 向服务端channel之中添加一个ServerBootstrapAcceptor对象,携带的参数都是客户端连接信息.
在这个方法之中我们调用了execute()方法,在后面的内容之中我们会分析事件循环组的内容.
服务端channel的注册:
doBind0(regFuture, channel, localAddress, promise);
下面分析注册的过程.
channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 这个操作是在boss线程完成的 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });
实际上是向事件循环组之中放入了一个任务,我们现在不管事件循环组的功能,就当做一个线程池就好了,就向向线程组之中提交任务一样.
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
pipeline.bind(localAddress, promise);
tail.bind(localAddress, promise);
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } // 找寻outbound的处理器 final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
找到一个ountbind()处理器,然后会调用next.invokeBind()方法.
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } }
现在pipeline之中仅仅只有tail,head,he之前增加的一个ServerbootAcceptor.
三个之中唯一的ountbind()处理器就是Head.
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
我们发现现在调用了unsafe()对象进行绑定.
protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
现在我们终于找到实际调用jdk的底层帮助实现了channel的注册.
以上就是我们通过bind()方法分析得出的结论,下文我们分析事件循环组的功能.