简单编写一个案例实现:
(1) 服务器端启动监听6666端口, 收到客户端信息之后打印下客户端信息且回传一段服务器端的信息
(2) 客户端启动之后连接到6666端口,且发送一段信息,收到服务器端信息之后打印下服务器发送的信息
1. 代码实现
0. pom
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.11.Final</version> </dependency>
logback.xml 注意是关闭日志
<configuration> <appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern> %msg%n </pattern> </encoder> </appender> <root level="error"> <appender-ref ref="consoleLog"></appender-ref> </root> </configuration>
1. 服务器端程序
1. Server
package netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) throws InterruptedException { // 1. 创建bossGrou好eworkerGroup // bossGroup只负责连接请求,其他交给workerGroup, 两个都是无线循环 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建服务器端启动对象用于设置参数 ServerBootstrap bootstrap = new ServerBootstrap(); // 使用链式编程设置参数 bootstrap.group(bossGroup, workerGroup)// 设置两个组 .channel(NioServerSocketChannel.class) // 设置服务器的通道 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态 .childHandler(new ChannelInitializer<SocketChannel>() { // 设置通道测试对象(匿名对象) @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 给pipeline添加一个handler socketChannel.pipeline().addLast(new ServerHandler()); } }); System.out.println("服务端is ok。。。"); // 启动服务器并绑定端口。绑定一个端口并且同步,生成一个ChannelFuture对象 ChannelFuture channelFuture = bootstrap.bind(6666).sync(); // 对关闭通道进行监控 channelFuture.channel().closeFuture().sync(); } }
2. Handler
package netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类 */ public class ServerHandler extends ChannelInboundHandlerAdapter { /** * 读取事件 * * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息 * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer) * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("ctx = " + ctx); // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer) ByteBuf byteBuf = (ByteBuf) msg; System.out.println("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + ctx.channel().remoteAddress()); } /** * 数据读取完毕事件 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 将数据写到客户端(write + flush) ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8)); } /** * 发生异常事件 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
2. 客户端程序
client
package netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) throws InterruptedException { // 创建一个事件循环组 EventLoopGroup eventExecutors = new NioEventLoopGroup(); try { // 创建一个启动Bootstrap(注意是Netty包下的) Bootstrap bootstrap = new Bootstrap(); // 链式设置参数 bootstrap.group(eventExecutors) // 设置线程组 .channel(NioSocketChannel.class) // 设置通道class .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); System.out.println("客户端is ok..."); // 启动客户端连接服务器(ChannelFuture 是netty的异步模型) ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync(); // 监听关闭通道 channelFuture.channel().closeFuture().sync(); } finally { // 关闭 eventExecutors.shutdownGracefully(); } } }
Handler
package netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类 */ public class ClientHandler extends ChannelInboundHandlerAdapter { /** * 通道就绪事件 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("ClientHandler ctx: " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器!", CharsetUtil.UTF_8)); } /** * 读取事件 * * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息 * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer) * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer) ByteBuf byteBuf = (ByteBuf) msg; System.out.println("服务器会送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("服务器地址:" + ctx.channel().remoteAddress()); } /** * 发生异常事件 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
3. 测试
(1) 启动服务器端:
服务端is ok。。。
(2) 启动客户端
客户端is ok...
(3) 之后服务器先接到消息,服务器端日志:
ctx = ChannelHandlerContext(ServerHandler#0, [id: 0x5028f46e, L:/127.0.0.1:6666 - R:/127.0.0.1:58692])
客户端发送的消息是:hello, 服务器!
客户端地址:/127.0.0.1:58692
(4) 客户端日志
ClientHandler ctx: ChannelHandlerContext(ClientHandler#0, [id: 0x8b3d82f9, L:/127.0.0.1:58692 - R:/127.0.0.1:6666])
服务器会送的消息是:hello, 客户端!
服务器地址:/127.0.0.1:6666
2. 重要类查看
1. new NioEventLoopGroup(); 默认会创建当前服务器逻辑处理器数量*2个NioEventLoop (循环事件)
最终获取数量是:
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
测试如下:(我的机子是4个逻辑处理器的)
对于服务器端的BossGroup来说,没必要用满8个,可以通过参数进行修改。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
再次查看:(也证实了每个NioEventLoop都有自己的Selector、executor任务处理器、taskQueue工作队列等信息,其实也就是多线程的一套信息)
2. ServerHandler处理时打印线程名称可以看到是用了workerGroup线程池
修改netty.ServerHandler#channelRead
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("当前线程: " + Thread.currentThread().getName()); System.out.println("ctx = " + ctx); // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer) ByteBuf byteBuf = (ByteBuf) msg; System.out.println("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + ctx.channel().remoteAddress()); }
可以看到服务器打印的线程名如下:(直到打印到8,线程池创满8个)
服务端is ok。。。 当前线程: nioEventLoopGroup-3-1 ctx = ChannelHandlerContext(ServerHandler#0, [id: 0xe1252cb5, L:/127.0.0.1:6666 - R:/127.0.0.1:59393]) 客户端发送的消息是:hello, 服务器! 客户端地址:/127.0.0.1:59393 当前线程: nioEventLoopGroup-3-2 ctx = ChannelHandlerContext(ServerHandler#0, [id: 0x890f8270, L:/127.0.0.1:6666 - R:/127.0.0.1:59470]) 客户端发送的消息是:hello, 服务器! 客户端地址:/127.0.0.1:59470
3. ChannelHandlerContext 类
ChannelHandlerContext 包含非常重要的信息,如下:
可以看到包含我们创建的ServerHandler
有next、prev属性(双向链表结构)
有pipeline管道,管道内部有channel通道信息以及head、tail头尾等信息。channel包含的重要信息如下:
补充: 我另一台机子的逻辑处理器是8个,查看默认创建的 bossGroup 包含16 个NioEventLoop
(1) 通过任务管理器查看:
如图,1个插槽表示1个CPU; 内核是4表示4核CPU; 8个逻辑处理器表示8线程(一般一个核心对应了一个线程,而intel开发出了超线程技术,1个核心能够做到2个线程计算,所以4个核心则能够做到8个线程)
(2) 也可以通过Java程序获取可用的processors(处理器数量)
int i = Runtime.getRuntime().availableProcessors(); // 8