Netty是一个高性能、异步事件驱动的网路通信框架 ,由于精力有限,本人并没有对其源 码做了特别细致的研究。如果下面的内容有错误或不严谨的地方,也请大家指正和谅解。
Netty的线程模型是Reactor主从模型的变种,去掉了线程池,使用串行化实现。
Reactor主从模型如下图所示
(mainReactor负责监听server socket,accept新连接;并将建立的socket分派给subReactor。
subReactor负责多路分离已连接的socket,读写网络数据,对业务处理功能,其扔给worker线程池完成。通常,subReactor个数上可与CPU个数等同)
Netty中Reactor模式的参与者主要有下面一些组件:
Selector
EventLoopGroup/EventLoop
ChannelPipeline
Selector即为NIO中提供的多路复用器,这里不再阐述。
EventLoopGroup/EventLoop
EventLoopGroup是一组EventLoop的抽象,在Netty服务器编程中我们需要BossEventLoopGroup和WorkerEventLoopGroup两个EventLoopGroup来进行工作。通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程,也就是说BossEventLoopGroup的线程数参数为1。BossEventLoop主要负责接收客户端的连接并
将SocketChannel交给WorkerEventLoopGroup来进行处理。
ChannelPipeline
ChannelPipeline在Reactor模式中担任请求处理器的角色,通过调用相关的handler进行相对应的处理。
以下是第一个入门程序
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public static void main(String[] args) throws Exception { // 1 第一个线程组 是用于接收Client端连接的 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 2 第二个线程组 是用于实际的业务处理操作的 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 3 创建一个辅助类Bootstrap,就是对我们的Server进行一系列的配置 ServerBootstrap bbootStarp = new ServerBootstrap(); // 把俩个工作线程组加入进来 bbootStarp.group(bossGroup, workerGroup) // 指定使用NioServerSocketChannel这种类型的通道 .channel(NioServerSocketChannel.class) // 一定要使用 childHandler 去绑定具体的 事件处理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ServerHandler()); } }); // 绑定指定的端口 进行监听 ChannelFuture channelFuture = bbootStarp.bind(8765).sync(); // 让其不关闭 channelFuture.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; public class ServerHandler extends ChannelInboundHandlerAdapter { //处理客户端发送过来的数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf byteBuffer = (ByteBuf) msg; byte[] data = new byte[byteBuffer.readableBytes()]; byteBuffer.readBytes(data); String sData = new String(data,"utf-8"); System.out.println("server "+sData); //写操作完成以后就断开连接 ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes())).addListener(ChannelFutureListener.CLOSE); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static void main(String[] args) throws Exception { EventLoopGroup workgroup = new NioEventLoopGroup(); Bootstrap boot = new Bootstrap(); boot.group(workgroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture channelFuture = boot.connect("127.0.0.1", 8765).sync(); //buf channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes())); channelFuture.channel().closeFuture().sync(); workgroup.shutdownGracefully(); } }
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // do something ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String request = new String(data, "utf-8"); System.out.println("Client: " + request); } finally { // 如果没有调用write或者writeAndFlush 需要手动释放msg ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
可以看到,server 和client的创建连接或者发送请求的代码的都是相对固定的,实际开发中 我们专注写handler 就好。