Netty Server Start 源码分析
针对程序
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
//or EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
NioEventLoopGroup()
它是基于NIO 选择器(Selector)的Channel对象
无参构造
new NioEventLoopGroup(),创建一个新的实例并且使用默认的线程数,使用默认的ThreadFactory(线程工程),和SelectorProvider并且这是由SelectorProvider是由SelectorProvider.provider()静态方法方法提供的
public NioEventLoopGroup() {
this(0);
}
//一直向里调用直到父类的MultithreadEventLoopGroup()构造函数,这个是决定线程数是多少的核心方法
//MultithreadEventLoopGroup类的静态代码块
static {
/*
SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)
如果第一个系统属性(需要自己设置)的值不存在则返回第二个参数的值(可用的处理器/系统核心数 * 2)
*/
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
//调到这个方法时,参数都是
//(0,null,SelectorProvider.provider(),DefaultSelectStrategyFactory.INSTANCE,RejectedExecutionHandlers.reject())
//后面三个参数都是对应的返回值,这里写的是这个参数如何来的,以便理解
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//最后做初始化的构造方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
....
}
此时的nTreads为24是由 Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));获取的,因为我们没有配置系统属性io.netty.eventLoopThreads所以返回的是 (可用的处理器/系统核心数 * 2) 为24。最后会构造出24个线程为这个EventLoopGroup(事件循环组)工作
带参构造
NioEventLoopGroup(int) 如果创建的时候传入一个int值,那么它将使用这个int值个线程
不设置构造参数的话使用默认个线程(如果没有设置"io.netty.eventLoopThreads"系统属性那么他就会使用系统核心数*2的核心数)
为什么很多人设置为1,因为它是异步的只需要一个线程来不断的监听事件循环,当事件发生的时候获取到事件循环本身,然后将事件相应的处理工作丢给workerGroup
ServerBootstrap对象的方法
group()方法
group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法,设置这个parentGroup(bossGroup(接收远端发来的连接,将处理工作交给childGroup/child))与 child(workerGroup(与客户端打交道))。
这些EventLoopGroup 是用于处理针对于ServerChannel与Channel的所有的events(事件)以及IO(输入输出)
param:
parentGroup就是bossGroup childGroup是workerGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//如果AbstractBootstrap中的group属性没有被设置,则将parentGroup赋给group
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
//如果当前这个类的childGroup为null的话,将传进来的childGroup设置为当前这个类的childGroup
this.childGroup设置值 = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
channel()方法
/*
通过channelClass的class对象来创建Channel的实例。如果你的Channel实现了无无参数的构造函数
则可以使用this或者使用channelFactory()
*/
public B channel(Class<? extends C> channelClass) {
//将ReflectiveChannelFactory对象赋值给,成员属性channelFactory
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
/*
一个ChannelFactory,通过反射的形式调用其默认构造函数来实例化新的Channel。
*/
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
//建设者
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
//检查clazz是否为空,如果为空抛出NullPointerException异常内容是第二个参数,如果不为空返回第一个参数
ObjectUtil.checkNotNull(clazz, "clazz");
try {
//获取这个参数的构造函数赋给constructor,以便后面使用反射创建这个对象
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
}
handler()方法
添加针对bossGroup发挥作用的Handler处理器
childHandler()方法
添加针对workerGroup发挥作用的Handler处理器
/*
设置用于为通道的请求提供服务的ChannelHandler。
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
//childHandler参数不为空,则赋给 this.childHandler
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
核心方法bind()
/*
创建一个新的Channel并将其绑定。
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
/--------------------------------------------------------
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
/--------------------------------------------------------
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化并注册channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/*使用channel()方法添加的ReflectiveChannelFactory工厂反射的创建channel对象(这个channel对象是NioServerSocketChannel),
使用反射创建NioServerSocketChannel的时候会调用AbstractChannel父类的构造方法创建
与这个Channel所关联的ChannelPipeline对象(实际类型是DefaultChannelPipeline)
*/
channel = channelFactory.newChannel();
//初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// 如果newChannel崩溃,则channel可以为null(例如SocketException(“打开的文件太多”))
channel.unsafe().closeForcibly();
// 由于尚未注册频道,因此我们需要强制使用GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 由于尚未注册频道,因此我们需要强制使用GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
/*
正式开始注册
config() 返回当前ServerBootstrap的ServerBootstrapConfig对象
group() 返回当前ServerBootstrap父类AbstractBootstrap里面维护的group对象,就是我们调用group()方法设置的EventLoopGroup,
在本例中group()返回的是bossGroup
*/
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
/*
如果我们在这里,承诺没有失败,那就是下列情况之一:
1)如果尝试从事件循环注册,则此时注册已完成。也就是说,现在尝试bind()或connect()是安全的,因为通道已经注册。
2)如果我们尝试从另一个线程注册,则注册请求已成功添加到事件循环的任务队列以供以后执行。也就是说,现在尝试bind()或connect()是安全的:
因为bind()或connect()将在执行计划的注册任务之后执行
因为register()、bind()和connect()都绑定到同一个线程。
*/
return regFuture;
}
init()
/*
完成Options与Attributes相关的设定,
*/
@Override
void init(Channel channel) {
//ChannelOption是用于配置与channel相关的特别是ChannelConfig里面的这些网络层的基本的配置
//Option初始化的值可以在serverBootstrap初始化的时候使用.option()方法进行设置,不设置netty则使用底层给根据不同情况设定好的值
setChannelOptions(channel, newOptionsArray(), logger);
/*
Attribute/AttributeKey主要维护业务数据可以在程序运行过程中,动态的向里面添加key value对象,然后在后面用到的地方在取出来(类似rquest作用域)
实现了业务数据随着netty调用流程流转,实现数据共享(类似工作流引擎当中的jBPM、Activiti,在某个流程当中可以设置一些数据,然后在
后续的流程节点当中将数据取出来,实现了数据随着流程流转,可以在后面再取出来)
*/
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
//在调用init()方法之前的channelFactory.newChannel();的时候以及创建好了与这个Channel所关联的ChannelPipeline对象,所以可以直接使用
ChannelPipeline p = channel.pipeline();
//currentChildGroup就是我们创建的workerGroup
final EventLoopGroup currentChildGroup = childGroup;
//currentChildHandler是我们调用childHandler()方法设置的Handler处理器,这里是MyServerInitializer
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//这里只是将ChannelInitializer对象添加到管道当中,initChannel()方法并不会执行,而是后续的某一个时刻会被调用
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
//将生成的Channel对象对应的pipeline拿到
final ChannelPipeline pipeline = ch.pipeline();
//如果之前调用了handler()方法则将添加的对象addLast()到这个ChannelPipeline当中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//向Channel中添加一个接收器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
config().group().register(channel)
//MultithreadEventLoopGroup类的方法
@Override
public ChannelFuture register(Channel channel) {
//这个register()方法调的是SingleThreadEventLoop类中的
return next().register(channel);
}
@Override
public EventLoop next() {
//返回一个
return (EventLoop) super.next();
}
@Override
public EventExecutor next() {
return chooser.next();
}
//DefaultEventExecutorChooserFactory类中的GenericEventExecutorChooser内部类的方法
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
//保存的就是这个事件循环组的所以线程,本程序中就是一个一个的NioEventLoop。就是new NioEventLoopGroup()的时候创建的那些线程对象
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
/*返回的是这个事件循环组中的某一个事件执行器(EventExecutor)根据Math.abs(idx.getAndIncrement() % executors.length)计算结果选择其中一个*/
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
SingleThreadEventLoop
@Override
public ChannelFuture register(Channel channel) {
//创建一个Promise,传入要注册的Channel与事当前的件循环组
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//调用cahnnel()得到刚刚创建promise对象传进去的Channel对象,然后调用它(AbstractNioChannel)的unsafe()方法得到一个NioUnsafe对象,这个Unsafe对象来自于它的父类AbstractChannel,AbstractNioChannel做了一个向下类型转换,最后调用register()方法
promise.channel().unsafe().register(this, promise);
return promise;
}
//register()方法是AbstractChannel中的-----------------------------------
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
/*
核心代码段
判断当前正在执行这行代码的线程是不是SingleThreadEventExecutor中维护的thread
如果是则直接调用register0()注册
如果不是则将这注册任务以一个任务的形式提交给eventLoop(SingleThreadEventExecutor)当中维护的那个线程对象,让它去执行解决了多线程并发的问题
*/
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//注册的最底层实现,完成注册
doRegister();
neverRegistered = false;
registered = true;
// 确保在实际通知诺言之前,先调用handlerAdded(...)。这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
//如果从未注册过频道,则仅触发channelActive。如果通道已注销并重新注册,则可以防止激活多个通道。
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 此通道之前已注册,并且已设置autoRead()。这意味着我们需要再次开始读取,以便处理入站数据。
//
// 可以接收客户端消息了
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
//doRegister()方法是由子类AbstractNioChannel实现的
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//javaChannel()返回的是SelectableChannel(ServerSocketChannelImpl),然后将这个channel注册到eventLoop().unwrappedSelector()返回的Selector上,0是感兴趣的事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
重要的类说明
ChannelOption<T>
ChannelOption允许以类型安全的方式配置一个ChannelConfig
到底支持哪一种ChannelOption取决于ChannelConfig的实际实现也可能依赖于它所属的运输层的本质。
用于存储这个channel与TCP/IP相关的一些基本的配置信息,它是不存储值的值在其它地方存储,它里面存放的是
值的类型(ChannelOption本身并不维护选项的值的信息,它只维护这个选项本身/这个名字本身)
param:
泛型 T ChannelOption值的类型(某一个选项/某一个设置项的类型)
ChannelConfig
对于channel的一套配置属性
下转换到更具体的配置类型,如SocketChannelConfig或使用setOptions(Map)设置与传输具体相关的属性:
Channel ch = ...;
SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
cfg.setTcpNoDelay(false);
Option map
一个Option map属性是动态的只能写的属性,可以进行Channel的配置而无需向下类型转换。
名称 | 相关setter方法 |
---|---|
ChannelOption.CONNECT_TIMEOUT_MILLIS | setConnectTimeoutMillis(int) |
ChannelOption.WRITE_SPIN_COUNT | setWriteSpinCount(int) |
ChannelOption.WRITE_BUFFER_WATER_MARK | setWriteBufferWaterMark(WriteBufferWaterMark) |
ChannelOption.ALLOCATOR | setAllocator(ByteBufAllocator) |
ChannelOption.AUTO_READ | setAutoRead(boolean) |
还有很多选项都位于ChannelConfig的子类当中,比如说你可以配置一些参数特定于TCP/IP scoket 参数
AttributeKey<T>
属性键可以用于在AttributeMap外面访问Attribute,请注意相同的名字不可能有多个键(keys)
param:
T Attribute类型,其可以通过该访问AttributeKey 。
Channel与ChannelContext作用域
直接向Channel上面附加的属性与向ChannelContext附加的属性,它们的作用域有什么不同
-
Netty4.0
对于整个channel来说它有一个map,用于维护它的属性和值的映射关系。而针对每一个ChannelHandlerContext也拥有自己的一个map,在Netty的组件当中只要有一个handler就会有一个与之相关和对应的ChannelHandlerContext。如果有10个handler那么Netty就会创建10ChannelHandlerContext同时在10个ChannelHandlerContext当中就会拥有10个不同的map用于分别存放这个handler在自己的作用域中所拥有的key value值,而channel本身又有一个独立的map映射信息。这种做法有两个问题:
- 当你在A handler当中set的值,在B handler当中是拿不到的,或者说你在channel当中设置的值,在handler中也是拿不到的
- 浪费内存,创建了太多个map每个ChannelHandlerContext都会有一个
-
Netty4.1之后
只会有一个map对象,而这个map对象会被channel以及这个channel上的所有handler所共享,而且key是不会重复的