学习Netty框架,三连问:
什么是Netty框架?
为什么要用Netty框架?
怎么用Netty框架?
什么是Netty框架?
Netty 是一个广受欢迎的异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。
Netty 是一个基于 NIO 的网络编程框架,使用 Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程。
作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
为什么要用Netty框架?
因为Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了JDK 原生 NIO 程序的问题。
JDK 原生 NIO 程序的问题:
JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:
1)NIO 的类库和 API 繁杂,使用麻烦:你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
2)需要具备其他的额外技能做铺垫:例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
3)可靠性能力补齐,开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
4)JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。
Netty的主要特点有:
1)设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。
2)使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
3)高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
4)安全:完整的 SSL/TLS 和 StartTLS 支持。
5)社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。
Netty 常见的使用场景如下:
1)互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
2)游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。
3)大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
有兴趣的读者可以了解一下目前有哪些开源项目使用了 Netty的Related Projects。
怎么用?(简单入门)
可参考学习 netty 官方API: http://netty.io/4.1/api/index.html
配置 pom.xml
1 <dependency> 2 <groupId>io.netty</groupId> 3 <artifactId>netty-all</artifactId> 4 <version>4.1.31.Final</version> 5 </dependency>
配置 ServerConnection.java
package com.example.demo.net; import java.net.InetSocketAddress; import org.apache.log4j.Logger; import com.example.demo.handler.ServerMsgHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class ServerConnection { Logger logger = Logger.getLogger(ServerConnection.class); private final int port ; private EventLoopGroup bossGroup ; private EventLoopGroup workerGroup ; public ServerConnection(int port) { this.port = port ; } /*** * NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器, * Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。 在这个例子中我们实现了一个服务端的应用, * 因此会有2个NioEventLoopGroup会被使用。 第一个经常被叫做‘boss’,用来接收进来的连接。 * 第二个经常被叫做‘worker’,用来处理已经被接收的连接, 一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。 * 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现, * 并且可以通过构造函数来配置他们的关系。 */ public void run() { System.out.println("启动服务端Netty连接"); bossGroup = new NioEventLoopGroup() ; workerGroup = new NioEventLoopGroup() ; /** * ServerBootstrap 是一个服务端启动NIO服务的辅助启动类 , 可以在这个服务中直接使用Channel */ ServerBootstrap bootstrap = new ServerBootstrap() ; /** * 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常 */ bootstrap = bootstrap.group(bossGroup, workerGroup) ; /*** * ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的连接 * 这里告诉Channel如何获取新的连接. */ bootstrap = bootstrap.channel(NioServerSocketChannel.class) ; /*** * 绑定端口 */ bootstrap = bootstrap.localAddress(new InetSocketAddress(port)) ; /*** * 你可以设置这里指定的通道实现的配置参数。 我们正在写一个TCP/IP的服务端, * 因此我们被允许设置socket的参数选项比如tcpNoDelay和keepAlive。 * 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。 */ bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128) ; /*** * option()是提供给NioServerSocketChannel用来接收进来的连接。 * childOption()是提供给由父管道ServerChannel接收到的连接, * 在这个例子中也是NioServerSocketChannel。 */ bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) ; /*** * 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。 ChannelInitializer是一个特殊的处理类, * 目的是帮助使用者配置一个新的Channel。 * 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel * 或者其对应的ChannelPipeline来实现你的网络程序。 当你的程序变的复杂时,可能你会增加更多的处理类到pipline上, * 然后提取这些匿名类到最顶层的类上。 */ bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline() ; pipeline.addLast("decoder", new StringDecoder()) ; pipeline.addLast("encoder", new StringEncoder()) ; pipeline.addLast("handler", new ServerMsgHandler()) ; } }) ; bootstrap.bind().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if ( future.isSuccess() ) { System.out.println("服务端开始监听") ; // logger.info("服务端开始监听") ; }else { logger.error("服务端无法使用监听端口",future.cause()) ; } } }) ; } public void shutdown() { logger.info("关闭 Server 端口"); bossGroup.shutdownGracefully() ; workerGroup.shutdownGracefully() ; } }
配置 ServerMsgHandler.java
package com.example.demo.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ServerMsgHandler extends ChannelInboundHandlerAdapter { /** * 这里我们覆盖了chanelRead()事件处理方法。 每当从客户端收到新的数据时, 这个方法会在收到消息时被调用, * 这个例子中,收到的消息的类型是ByteBuf * * @param ctx * 通道处理的上下文信息 * @param msg * 接收的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务端接收的消息:"+msg.toString()) ; //向客户端发送消息 String str = msg.toString() ; if ( "高性能NIO框架——Netty".equals(str) ) { ctx.writeAndFlush( "客户端 , 你好!") ; } // ctx.writeAndFlush(msg.toString()+"你好!") ; } /*** * 这个方法会在发生异常时触发 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /** * exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO * 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来 并且把关联的 channel * 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不 同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。 */ // 出现异常就关闭 cause.printStackTrace() ; ctx.close() ; } }
配置 ClientConnection.java
package com.example.demo.net; import java.net.InetSocketAddress; import org.apache.log4j.Logger; import com.example.demo.handler.ClientMsgHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class ClientConnection { Logger logger = Logger.getLogger(ClientConnection.class); private final int port ; private EventLoopGroup bossGroup ; private EventLoopGroup workerGroup ; private Channel channel ; public ClientConnection(int port) { this.port = port ; } public Channel getChannel() { return this.channel ; } /*** * NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器, * Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。 在这个例子中我们实现了一个服务端的应用, * 因此会有2个NioEventLoopGroup会被使用。 * 第一个经常被叫做‘boss’,用来接收进来的连接。 * 第二个经常被叫做‘worker’,用来处理已经被接收的连接, 一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。 * 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现, * 并且可以通过构造函数来配置他们的关系。 * @throws InterruptedException */ public void run() { System.out.println("启动客户端Netty连接"); bossGroup = new NioEventLoopGroup() ; workerGroup = new NioEventLoopGroup() ; /** * Bootstrap 是客户端一个启动NIO服务的辅助启动类 , 可以在这个服务中直接使用Channel */ Bootstrap bootstrap = new Bootstrap() ; // ServerBootstrap bootstrap = new ServerBootstrap() ; /** * 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常 */ // bootstrap.group(bossGroup, workerGroup) bootstrap.group(bossGroup) /*** * ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的连接 * 这里告诉Channel如何获取新的连接. */ // .channel(NioServerSocketChannel.class) .channel(NioSocketChannel.class) /*** * 绑定端口,等价于bootstrap.bind("127.0.0.1", port) ,若下面用了,就要把这个注释掉,不然会报错 */ // .localAddress(new InetSocketAddress(port)) .remoteAddress("127.0.0.1", port) /*** * 你可以设置这里指定的通道实现的配置参数。 我们正在写一个TCP/IP的服务端, * 因此我们被允许设置socket的参数选项比如tcpNoDelay和keepAlive。 * 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。 */ // .option(ChannelOption.SO_BACKLOG, 128) /*** * option()是提供给NioServerSocketChannel用来接收进来的连接。 * childOption()是提供给由父管道ServerChannel接收到的连接, * 在这个例子中也是NioServerSocketChannel。 */ // .childOption(ChannelOption.SO_KEEPALIVE, true) /*** * 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。 ChannelInitializer是一个特殊的处理类, * 目的是帮助使用者配置一个新的Channel。 * 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel * 或者其对应的ChannelPipeline来实现你的网络程序。 当你的程序变的复杂时,可能你会增加更多的处理类到pipline上, * 然后提取这些匿名类到最顶层的类上。 */ // .childHandler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline() ; pipeline.addLast("decoder", new StringDecoder()) ; pipeline.addLast("encoder", new StringEncoder()) ; // pipeline.addLast("handler", new ClientMsgHandler()) ; pipeline.addLast(new ClientMsgHandler()) ; } }) ; /* bootstrap.bind().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if ( future.isSuccess() ) { System.out.println("客户端开始监听") ; logger.info("客户端开始监听") ; }else { logger.error("客户端无法使用监听端口",future.cause()) ; } } }) ; */ /* //绑定端口,开始接收进来的连接 ChannelFuture cFuture; try { // cFuture = bootstrap.connect(host, port).sync(); // cFuture = bootstrap.bind("127.0.0.1", port).sync(); cFuture = bootstrap.bind().sync() ; //在这里拿到这个channel,是为了 等下 测试消息发送 用的 channel = cFuture.channel(); } catch (InterruptedException e) { e.printStackTrace(); } */ ChannelFuture cf; try { cf = bootstrap.connect().sync(); channel = cf.channel(); channel.writeAndFlush("ClientConnection客户端已成功启动!"); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if ( future.isSuccess() ) { System.out.println("客户端开始监听") ; // logger.info("客户端开始监听") ; }else { // logger.error("客户端无法使用监听端口",future.cause()) ; } } }) ; } catch (InterruptedException e) { e.printStackTrace(); } } public void shutdown() { System.out.println("关闭 Client 端口"); logger.info("关闭 Client 端口"); bossGroup.shutdownGracefully() ; workerGroup.shutdownGracefully() ; } }
配置 ClientMsgHandler.java
package com.example.demo.handler; import org.apache.log4j.Logger; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ClientMsgHandler extends ChannelInboundHandlerAdapter { Logger logger = Logger.getLogger(ClientMsgHandler.class) ; /** * 这里我们覆盖了chanelRead()事件处理方法。 每当从客户端收到新的数据时, 这个方法会在收到消息时被调用, * 这个例子中,收到的消息的类型是ByteBuf * * @param ctx * 通道处理的上下文信息 * @param msg * 接收的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // super.channelRead(ctx, msg); System.out.println("客户端接收的消息:"+msg.toString()) ; //向服务端发送消息 ctx.writeAndFlush("服务端 , 你好!") ; } /*** * 这个方法会在发生异常时触发 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /** * exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO * 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来 并且把关联的 channel * 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不 同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。 */ // 出现异常就关闭 cause.printStackTrace() ; ctx.close() ; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // super.channelActive(ctx); // logger.info("client channel active"); System.out.println("client channel active"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel inactive"); ctx.close() ; } }
编写测试类 DemoTest.java
1 package com.example.demo.netty; 2 3 import org.junit.runner.RunWith; 4 import org.springframework.boot.test.context.SpringBootTest; 5 import org.springframework.test.context.ActiveProfiles; 6 import org.springframework.test.context.junit4.SpringRunner; 7 8 import com.example.demo.DemoApplicationTests; 9 import com.example.demo.net.ClientConnection; 10 import com.example.demo.net.ServerConnection; 11 12 import io.netty.channel.Channel; 13 14 @RunWith(SpringRunner.class) 15 @SpringBootTest(classes = DemoApplicationTests.class) 16 @ActiveProfiles("test") 17 public class DemoTest { 18 19 public static void main(String[] args) { 20 int port = 2222 ; 21 Thread serverThread = new Thread( new Runnable() { 22 @Override 23 public void run() { 24 new ServerConnection(port).run() ; 25 } 26 } ) ; 27 serverThread.start() ; 28 ClientConnection clientConnection = new ClientConnection(port) ; 29 // Thread clientThread = new Thread( new Runnable() { 30 // @Override 31 // public void run() { 32 // new ClientConnection(port).run() ; 33 // } 34 // } ) ; 35 // clientThread.start() ; 36 37 clientConnection.run(); 38 Channel channel = clientConnection.getChannel() ; 39 channel.writeAndFlush("高性能NIO框架——Netty"); 40 41 // new Thread( ()->{ 42 // new ServerConnection(port) ; 43 // } ).start() ; 44 // new Thread( ()->{ 45 // new ClientConnection(port) ; 46 // } ).start() ; 47 48 } 49 }
服务端与客户端的区别:
1. 在客户端只创建了一个NioEventLoopGroup实例,因为客户端并不需要使用I/O多路复用模型,需要有一个Reactor来接受请求。只需要单纯的读写数据即可
2. 在客户端只需要创建一个Bootstrap对象,它是客户端辅助启动类,功能类似于ServerBootstrap。
共同学习,共同进步,若有补充,欢迎指出,谢谢!