1. 学习目的
- BIO、NIO和AIO的区别?
- NIO的组成?
- Netty的特点?
- Netty的线程模型?
- TCP 粘包/拆包的原因及解决方法?
- 了解哪几种序列化协议?
- 如何选择序列化协议?
- Netty的零拷贝实现?
- Netty的高性能表现在哪些方面?
- NIOEventLoopGroup源码?
2.学习Netty
- 为什么学习Netty
分布式的时代 分布式框架中要用到
Spring5 去servlet化,底层使用netty
SpringBoot 内部实现web容器
Zookeeper 底层通信
Dubbo 分布式服务框架,多协议支持(RPC) Netty
为成为架构师筑基 - Netty能帮助我们解决什么问题?
框架:简化开发一系列解决方案的集合
封装IO操作的框架
复杂的业务场景中,没有说用一个单独的IO APi,经常遇到的问题:手写线程(Thread),多线程处理、性能问题、单双工,影响业务的开发。
IO + 多线程来解决问题
类似的框架:Mina Netty的前身(不是一个人开发的) - 为什么要封装IO操作
阻塞和非阻塞:
自己回答:处理端在处理数据的时候,如果被处理对象的数据没有准备好,线程会阻塞在当前线程,譬如socket.accept()方法 ,在等待网络连接的时候,如果没有请求连接,线程会处于阻塞状态,如果是非阻塞模型,当前线程可以去处理其他事物
Input Output
是相对于内存而言
磁盘只是input output的一端,除了磁盘还有网络
IO模型
阻塞非阻塞:参照IO操作 读数据或取数据时候的一种处理机制
BIO阻塞:
流
NIO:
通道 Buffer 轮询
同步与异步
在处理数据的时候,在同一时间点能同时做多个处理:异步
在同一时间只能做一个处理:同步
- 多路复用体现:
ServerSocketChannel
SocketChannle (从多路复用器中拿到客户端的引用)
都来自于 (SelectionKey)key.channel
NIO的操作过于繁琐,于是有了Netty,对NIO进行了封装。
IO(BIO)Block IO 同步阻塞IO
NIO Non-Block IO 同步非阻塞IO (可以使用线程池的方式,实现异步)
AIO Async IO 异步非阻塞IO (事件驱动,回调实现异步)
3. NIO
3.1 NIO的核心组件(三件套)
- Buffer、Selector、Channel、
3.1.1 Buffer
- 缓冲区属性
容量:capacity:表示该缓冲区可以保存多少数据。
极限:limit:表示缓冲区的当前终点,不能对缓冲区中超过极限的数据进行读写操作。极限可以修改,有利于缓冲区的重用。
位置:position:表示缓冲区中下一个读写单元的位置,每次读写缓冲区的数据时,都会改变位置值,为下一次读写数据做准备。
属性关系为 capacity>=limit>=position>=0 - 改变缓冲区3个属性的方法
clear():把极限限制为容量值,再把位置设为0.
flip():把极限值设为位置值,再把位置设为0.
rewind():不改变极限,把位置设为0. - ByteBuffer类没有提供公开的构造方法,但是提供了两个获得ByteBuffer实例的静态工厂方法。
allocate(int capacity):返回一个ByteBuffer对象
directAllocate(int capacity):返回一个ByteBuffer对象,称为直接缓冲区 - 所有的缓冲区都提供了读写缓冲区的方法
get():相对读。从缓冲区的当前位置读取一个单元的数据,读完后把位置值加1。
get(index):决对读。从参数index指定的位置读取一个单元的数据。
put():相对写。向缓冲区的当前位置写入一个单元的数据,写完后把位置值加1.
put(int index):绝对写。向参数index指定的位置写入一个单元的数据。
- buffer操作的流程
3.1.2 Selector
3.1.3 Channel
是多路复用的基础
解决的问题:
- 请求过多,导致连接线程过多
- 处理数据较大,导致线程长时间占用 也会导致阻塞
selector模型,IO调用不会被阻塞,
3.2何为多路复用
3.3Netty支持的功能与特性
Netty
Netty封装了NIO,Reactor模型,声明BOSSS线程,worker线程,可以修改BOSS线程和worker线程的数量
- 编码过程
//Netty服务 //NIO //BIO
ServerBootstrap ServerSocketChannel ServerSocket
主线程处理类,这样的写法,底层使用反射
子线程处理类 ,Handler
无锁化串行编程
编码器
解码器
业务处理逻辑
针对主线程的最大配置数
针对子线程的配置,保持长连接
//SocketChannel的封装
ChannelHandlerContext
- 总结
Netty就是一个同时支持多协议的网络通信框架
官网:
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。
协议编解码
2. Netty支持的协议
官网图
Netty的高性能
比传统IO性能提升了8倍多。
- 传统RPC调用性能差的三宗罪
- 阻塞IO不具备弹性伸缩能力(一个请求,一个应答),高并发导致宕机
- Java序列化编码、解码的性能问题,直接是字节流的读写
- 传统IO线程模型过多占用CPU资源
- 高性能的三个主题:
IO模型 传输
数据协议 协议 (协议越简单,通信效率越高)
线程模型 线程 (编解码的处理) - 异步非阻塞通信
NioEventLoop聚合了多路复用器Selector,可以同时并发处理成百上千个客户端Channel,由于读写操作都是非阻塞的,可以充分提升IO线程的运行效率,避免由于频繁阻塞IO阻塞导致的线程挂起。 - 零拷贝(直接缓冲区)
- 接收和发送ByteBuffer使用堆外内存直接内存进行socket读写。
- AbstractNioByteChannel$NioByteUnsafe extends AbstractNioUnsafe
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
public abstract class DefaultMaxMessagesRecvByteBufAllocator
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
- public abstract class ByteBuf implements ReferenceCounted, Comparable
- 提供了组合Buffer对象,可以聚合多个ByteBuffer对象。
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf>{
private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer();
private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();
private final ByteBufAllocator alloc;
private final boolean direct;
private final int maxNumComponents;
}
添加 ByteBuf,不需要做内存拷贝,相关代码如下:
private CompositeByteBuf addComponents0(boolean increaseWriterIndex,
final int cIndex, ByteBuf[] buffers, int arrOffset) {
final int len = buffers.length, count = len - arrOffset;
int readableBytes = 0;
int capacity = capacity();
for (int i = 0; i < buffers.length; i++) {
readableBytes += buffers[i].readableBytes();
// Check if we would overflow.
// See https://github.com/netty/netty/issues/10194
checkForOverflow(capacity, readableBytes);
}
// only set ci after we've shifted so that finally block logic is always correct
int ci = Integer.MAX_VALUE;
try {
checkComponentIndex(cIndex);
shiftComps(cIndex, count); // will increase componentCount
int nextOffset = cIndex > 0 ? components[cIndex - 1].endOffset : 0;
for (ci = cIndex; arrOffset < len; arrOffset++, ci++) {
ByteBuf b = buffers[arrOffset];
if (b == null) {
break;
}
Component c = newComponent(ensureAccessible(b), nextOffset);
components[ci] = c;
nextOffset = c.endOffset;
}
return this;
} finally {
// ci is now the index following the last successfully added component
if (ci < componentCount) {
if (ci < cIndex + count) {
// we bailed early
removeCompRange(ci, cIndex + count);
for (; arrOffset < len; ++arrOffset) {
ReferenceCountUtil.safeRelease(buffers[arrOffset]);
}
}
updateComponentOffsets(ci); // only need to do this here for components after the added ones
}
if (increaseWriterIndex && ci > cIndex && ci <= componentCount) {
writerIndex += components[ci - 1].endOffset - components[cIndex].offset;
}
}
}
- transfreTo()直接将文件缓冲区的数据发送到目标Channel,避免循环导致内存拷贝的问题。
public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion {
public long transferTo(WritableByteChannel target, long position) throws IOException {
long count = this.count - position;
if (count < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position +
" (expected: 0 - " + (this.count - 1) + ')');
}
if (count == 0) {
return 0L;
}
if (refCnt() == 0) {
throw new IllegalReferenceCountException(0);
}
// Call open to make sure fc is initialized. This is a no-oop if we called it before.
open();
long written = file.transferTo(this.position + position, count, target);
if (written > 0) {
transferred += written;
} else if (written == 0) {
// If the amount of written data is 0 we need to check if the requested count is bigger then the
// actual file itself as it may have been truncated on disk.
//
// See https://github.com/netty/netty/issues/8868
validate(this, position);
}
return written;
}
}
Netty 文件传输 DefaultFileRegion 通过 transferTo()方法将文件发送到目标 Channel 中,下面重点看 FileChannel 的 transferTo()方法,它的 API DOC 说明如下:
public abstract class FileChannel{
public abstract long transferTo(long var1, long var3, WritableByteChannel var5) throws IOException;
}
对于很多操作系统它直接将文件缓冲区的内容发送到目标 Channel 中,而不需要通过拷贝的方式,这是一种更加高效 的传输方式,它实现了文件传输的“零拷贝”。
- 内存池
三个维度
- Pooled与unPooled(池化与非池化)
性能测试经验表明,采用内存池的 ByteBuf 相比于朝生夕灭的 ByteBuf,性能高 23 倍左右(性能数据与使用场景强相 关)。
UnPooled的内存分配
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
}
- newDirectBuffer
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
}
- PoolArena的allocate
abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
}
abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
}
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
}
通过 RECYCLER 的 get 方法循环使用 ByteBuf 对象,如果是非内存池实现,则直接创建一个新的 ByteBuf 对象。从缓 冲池中获取 ByteBuf 之后,调用 AbstractReferenceCountedByteBuf 的 setRefCnt 方法设置引用计数器,用于对象的引 用计数和内存回收(类似 JVM 垃圾回收机制)。
2. UnSage和非UnSafe(底层读写与应用程序读写)
- Heap与Direct(堆内存与堆外内存)
netty有pooled buffer 和 unpooled buffer
pooled 先初始化一定大小的空间 ,读写效率 ,内存分配 堆外内存读写高,但是内存管理比较麻烦。
- 高效的Reactor线程模型
- Reactor单线程模型
netty的接收连接,分发调度、读写都由一个线程完成。 - Reactor多线程模型
反应接待线程、反应线程池(处理读写,编解码) - 主从Reactor多线程模型
接收线程 主线程池(调度不同模块业务的资源) 派发不同的业务逻辑线程
从线程
- 无锁化的串行设计理念
Pipeline(管道) pipeline的添加是有顺序的。
pipeline的工作模式:责任链模式,双向链表 两种类型的handle
Inbound OutBound 负责数据处理的触发和回调 什么时候触发,什么时候回调
pipeline可以巧妙的解决权限问题,有一个好用的东西,事件传播,异常和消息可以一层层往上传。
MessageToMessageEncoder extends ChannelOutboundHandlerAdapter 是一个OutBound,OutBound说明数据需要一个回调处理了。
- 高效的并发编程 Netty的高效并发编程主要体现在如下几点:
- volatile的大量、正确使用
- CAS和原子类的广泛使用
- 线程安全容器的使用
- 通过读写锁提升并发性能
- 高性能的序列化框架
影响序列化性能的关键因素总结如下:
- 序列化后的码流大小(影响网络带宽的占用)
- 序列化&反序列化的性能(CPU资源占用)
- 是否支持跨语言(异构系统的对接和开发语言切换)
- 灵活的TCP参数配置能力
Netty核心之服务端Channel源码分析
服务端Channel的创建
bind()[用户代码入口]
initAndRegister()[初始化并注册]
newChannel()[创建服务端channel]
init()[初始化服务端Channel]
配置都会保存在 channel.config
- ServerBootStrap
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel
pipeline 初始化
反射创建
客户端BootStrap
Channel简介
在 Netty 中,Channel 是一个 Socket 的抽象,它为用户提供了关于 Socket 状态(是否是连接还是断开)以及对 Socket 的读写等操作。每当 Netty 建立了一个连接后, 都创建一个对应的 Channel 实例。 除了 TCP 协议以外,Netty 还支持很多其他的连接协议, 并且每种协议还有 NIO(非阻塞 IO)和 OIO(Old-IO, 即传统的 阻塞 IO)版本的区别。不同协议不同的阻塞类型的连接都有不同的 Channel 类型与之对应
NioSocketChannel的创建
Bootstrap 是 Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化。
Netty 客户端初始化时所需的所有内容:
1、EventLoopGroup:不论是服务器端还是客户端, 都必须指定 EventLoopGroup。在这个例子中, 指定了 NioEventLoopGroup, 表示一个 NIO 的 EventLoopGroup。
2、ChannelType: 指定 Channel 的类型。 因为是客户端,因此使用了 NioSocketChannel。
3、Handler: 设置处理数据的 Handler。
客户端通过 Bootstrap 启动后,都做了哪些工作?
NioSocketChannel 的 类层次结构
客户端连接代码的初始化 Bootstrap 中调用了一个 channel()方法,传入的参数是 NioSocketChannel.class,
在这个方法中其实就是初始化了一个 ReflectiveChannelFactory 的对象:
而 ReflectiveChannelFactory 实现了 ChannelFactory 接口, 它提供了唯一的方法, 即 newChannel()方法,
ChannelFactory, 顾名思义, 就是创建 Channel 的工厂类。进入到 ReflectiveChannelFactory 的 newChannel()方法中, 我们看到其实现代码如下:
public T newChannel() { // 删除了 try...catch 块 return clazz.newInstance(); }
1、Bootstrap 中的 ChannelFactory 实现类是 ReflectiveChannelFactory。
2、通过 channel()方法创建的 Channel 具体类型是 NioSocketChannel。
Channel 的实例化过程其实就是调用 ChannelFactory 的 newChannel()方法,而实例化的 Channel 具体类型又是和初 始化 Bootstrap 时传入的 channel()方法的参数相关。因此对于客户端的 Bootstrap 而言,创建的 Channel 实例就是 NioSocketChannel。
客户端 Channel 的初始化
至此, NioSocketChannel 就初始化就完成了, 我们可以稍微总结一下 NioSocketChannel 初始化所做的工作内容:
1、调用 NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个新的 Java NIOSocketChannel。
2、AbstractChannel(Channel parent)中需要初始化的属性:
id:每个 Channel 都拥有一个唯一的 id。
parent:属性置为 null。
unsafe:通过 newUnsafe()实例化一个 unsafe 对象,它的类型是 AbstractNioByteChannel.NioByteUnsafe 内部类。
pipeline:是 new DefaultChannelPipeline(this)新创建的实例。
3、AbstractNioChannel 中的属性:
ch:赋值为 Java SocketChannel,即 NioSocketChannel 的 newSocket()方法返回的 Java NIO SocketChannel。
readInterestOp:赋值为 SelectionKey.OP_READ
ch:被配置为非阻塞,即调用 ch.configureBlocking(false)。
4、NioSocketChannel 中的属性:
config = new NioSocketChannelConfig(this, socket.socket())
Unsafe 字段的初始化
在实例化 NioSocketChannel 的过程中,会在父类 AbstractChannel 的构造方法中调用 newUnsafe()来获取一个 unsafe 实例。那么 unsafe 是怎么初始化的呢? 它的作用是什么?
其实 unsafe 特别关键,它封装了对 Java 底层 Socket 的操作,因此实际上是沟通 Netty 上层和 Java 底层的重要的 桥梁。
interface Unsafe {
/**
* Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
* receiving data.
*/
RecvByteBufAllocator.Handle recvBufAllocHandle();
/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.
*/
SocketAddress localAddress();
/**
* Return the {@link SocketAddress} to which is bound remote or
* {@code null} if none is bound yet.
*/
SocketAddress remoteAddress();
/**
* Register the {@link Channel} of the {@link ChannelPromise} and notify
* the {@link ChannelFuture} once the registration was complete.
*/
void register(EventLoop eventLoop, ChannelPromise promise);
/**
* Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
* it once its done.
*/
void bind(SocketAddress localAddress, ChannelPromise promise);
/**
* Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
* If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
* pass {@code null} to it.
*
* The {@link ChannelPromise} will get notified once the connect operation was complete.
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void disconnect(ChannelPromise promise);
/**
* Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void close(ChannelPromise promise);
/**
* Closes the {@link Channel} immediately without firing any events. Probably only useful
* when registration attempt failed.
*/
void closeForcibly();
/**
* Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
* {@link ChannelPromise} once the operation was complete.
*/
void deregister(ChannelPromise promise);
/**
* Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
* {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.
*/
void beginRead();
/**
* Schedules a write operation.
*/
void write(Object msg, ChannelPromise promise);
/**
* Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
*/
void flush();
/**
* Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
* It will never be notified of a success or error and so is only a placeholder for operations
* that take a {@link ChannelPromise} as argument but for which you not want to get notified.
*/
ChannelPromise voidPromise();
/**
* Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
*/
ChannelOutboundBuffer outboundBuffer();
}
}
这些方法其实都是对应到相关的 Java 底层的 Socket 的操作。
AbstractChannel 的构造方法中,在这里调用了 newUnsafe()获取一个新的 unsafe 对象,而 newUnsafe() 方法在 NioSocketChannel 中被重写了。来看代码:
- NioSocketChannel
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
Pipeline 的初始化
上面我们分析了 NioSocketChannel 的大体初始化过程, 但是我们漏掉了一个关键的部分,即 ChannelPipeline 的 初始化。在 Pipeline 的注释说明中写到“Each channel has its own pipeline and it is created automatically when a new channel is created.”,我们知道,在实例化一个 Channel 时,必然都要实例化一个 ChannelPipeline。而我们确实在 AbstractChannel 的构造器看到了 pipeline 字段被初始化为 DefaultChannelPipeline 的实例。那么我们就来看一下, DefaultChannelPipeline 构造器做了哪些工作。
*
public class DefaultChannelPipeline implements ChannelPipeline {
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
}
DefaultChannelPipeline 的构造器需要传入一个 channel,而这个 channel 其实就是我们实例化的 NioSocketChannel, DefaultChannelPipeline 会将这个 NioSocketChannel 对象保存在 channel 字段中。DefaultChannelPipeline 中还有两 个特殊的字段,即 head 和 tail,这两个字段是双向链表的头和尾。其实在 DefaultChannelPipeline 中,维护了一个以 AbstractChannelHandlerContext 为节点元素的双向链表,这个链表是 Netty 实现 Pipeline 机制的关键。
HeadContext 的类继承层次结构:
TailContext 的继承层次结构
链表中 head 是一个 ChannelOutboundHandler,而 tail 则是一个 ChannelInboundHandler。
HeadContext 的构造器:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
它调用了父类 AbstractChannelHandlerContext 的构造器,并传入参数 inbound = false,outbound = true。而 TailContext 的构造器与 HeadContext 的相反,它调用了父类 AbstractChannelHandlerContext 的构造器,并传入参数 inbound = true,outbound = false。即 header 是一个 OutBoundHandler,而 tail 是一个 InBoundHandler,关于这 一特征,大家要特别注意,先记住,后续我们分析到 Netty 的 Pipeline 时,我们会反复用到 inbound 和 outbound 这 两个属性。
EventLoop 的初始化
我们在一开始就实例化了一个 NioEventLoopGroup 对象,因此我们就从它的 构造器中追踪一下 EventLoop 的初始化过程。
NioEventLoop 有几个重载的构造器,不过内容都没有太大的区别,最终都是调用的父类 MultithreadEventLoopGroup 的构造器:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
如果我们传入的线程数 nThreads 是 0,那么 Netty 会为我们设置默认的线程数,DEFAULT_EVENT_LOOP_THREADS
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
Netty 首先会从系统属性中获取"io.netty.eventLoopThreads"的值,如果我们没有设置的话,那么就返回默认值:即处理 器核心数 * 2。回到 MultithreadEventLoopGroup 构造器中会继续调用父类 MultithreadEventExecutorGroup 的构造器:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
newChooser 方法里面看看其实现逻辑,
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
上面的代码逻辑主要表达的意思是:即如果 nThreads 是 2 的幂,则使用 PowerOfTwoEventExecutorChooser,否则 使用 GenericEventExecutorChooser。这两个 Chooser 都重写 next()方法。next()方法的主要功能就是讲数组索引循环位移.
这个运算逻辑其实很简单,就是每次让索引自增后和数组长度取模:idx.getAndIncrement() % executors.length。但 是就连一个非常简单的数组索引运算,Netty 都帮我们做了优化。因为在计算机底层,&与比%运算效率更高。
好了,分析到这里我们应该已经非常清楚 MultithreadEventExecutorGroup 中的处理逻辑,简单做一个总结:
1、创建一个大小为 nThreads 的 SingleThreadEventExecutor 数组。
2、根据 nThreads 的大小,创建不同的 Chooser,即如果 nThreads 是 2 的幂,则使用 PowerOfTwoEventExecutorChooser,反之使用 GenericEventExecutorChooser。不论使用哪个 Chooser,它们的功 能都是一样的,即从 children 数组中选出一个合适的 EventExecutor 实例。
3、调用 newChhild()方法初始化 children 数组。
根据上面的代码,我们也能知道:MultithreadEventExecutorGroup 内部维护了一个 EventExecutor 数组,而 Netty 的 EventLoopGroup的实现机制其实就建立在MultithreadEventExecutorGroup之上。每当Netty需要一个EventLoop 时, 会调用 next()方法获取一个可用的 EventLoop。
上面代码的最后一部分是 newChild()方法,这个是一个抽象方法,它的任务是实例化 EventLoop 对象。我们跟踪一下 它的代码。可以发现。这个方法在 NioEventLoopGroup 类中有实现,其内容很简单:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}
其实逻辑很简单就是实例化一个 NioEventLoop 对象, 然后返回 NioEventLoop 对象。
最后总结一下整个 EventLoopGroup 的初始化过程:
1、EventLoopGroup(其实是 MultithreadEventExecutorGroup)内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads,这样就构成了一个线程池。
2、如果我们在实例化 NioEventLoopGroup 时,如果指定线程池大小,则 nThreads 就是指定的值,反之是处理 器核心数 * 2。
3、MultithreadEventExecutorGroup 中会调用 newChild()抽象方法来初始化 children 数组。
4、抽象方法 newChild()是在 NioEventLoopGroup 中实现的,它返回一个 NioEventLoop 实例。
5、NioEventLoop 属性赋值:
provider:在 NioEventLoopGroup 构造器中通过 SelectorProvider.provider()获取一个 SelectorProvider。
selector:在 NioEventLoop 构造器中通过调用通过 provider.openSelector()方法获取一个 selector 对象。
Channel 注册到 Selector
在前面的分析中,我们提到 Channel 会在 Bootstrap 的 initAndRegister()中进行初始化,但是这个方法还会将初始 化好的 Channe 注册到 NioEventLoop 的 selector 中。接下来我们来分析一下 Channel 注册的过程。
AbstractBootstrap 的 initAndRegister()方法:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
当 Channel 初始化后,紧接着会调用 group().register()方法来向 selector 注册 Channel。我们继续跟踪的话,会 发现其调用链如下:
通过跟踪调用链, 最终我们发现是调用到了 unsafe 的 register 方法,那么接下来我们就仔细看一下
AbstractChannel$AbstractUnsafe.register()方法中到底做了什么?
AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
register0(promise);
}
首先,将 eventLoop 赋值给 Channel 的 eventLoop 属性,而我们知道这个 eventLoop 对象其实是 MultithreadEventLoopGroup 的 next()方法获取的,根据我们前面的分析,我们可以确定 next()方法返回的 eventLoop 对象是 NioEventLoop 实例。register()方法接着调用了 register0()方法:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register0()方法又调用了 AbstractNioChannel 的 doRegister()方法:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
看到 javaChannel()这个方法在前面我们已经知道了,它返回的是一个 Java NIO 的 SocketChannel 对象,这里我 们将这个 SocketChannel 注册到与 eventLoop 关联的 selector 上了。
我们总结一下 Channel 的注册过程:
1、首先在 AbstractBootstrap 的 initAndRegister()方法中, 通过 group().register(channel),调用 MultithreadEventLoopGroup 的 register()方法。
2、在 MultithreadEventLoopGroup 的 register()中,调用 next()方法获取一个可用的 SingleThreadEventLoop, 然 后调用它的 register()方法。
3、在 SingleThreadEventLoop 的 register()方法中,调用 channel.unsafe().register(this, promise)方法来获取channel 的 unsafe()底层操作对象,然后调用 unsafe 的 register()。
4、在 AbstractUnsafe 的 register()方法中, 调用 register0()方法注册 Channel 对象。
5、在 AbstractUnsafe 的 register0()方法中,调用 AbstractNioChannel 的 doRegister()方法。
6、AbstractNioChannel 的 doRegister()方法通过 javaChannel().register(eventLoop().selector, 0, this)将 Channel 对应的 Java NIO 的 SocketChannel 注册到一个 eventLoop 的 selector 中,并且将当前 Channel 作为 attachment 与 SocketChannel 关联。
总的来说,Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联,因此这也体现了,在 Netty 中,每个 Channel 都会关联一个特定的 EventLoop,并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执 行的;当关联好 Channel 和 EventLoop 后,会继续调用底层 Java NIO 的 SocketChannel 对象的 register()方法,将底 层 Java NIO 的 SocketChannel 注册到指定的 selector 中。通过这两步,就完成了 Netty 对 Channel 的注册过程。
NIO的弊端
同步单线程