1、参考地址
https://waylau.gitbooks.io/netty-4-user-guide/content/Architectural-Overview/Universal-Asynchronous-IO-API.html
https://github.com/waylau/netty-4-user-guide-demos
2、服务端
以上为关键步骤,服务端处理器,渠道设置,通道设置,也就是第4步。
以下为建一个服务端为例
2.1、初始化ServerBootstrap实例,此实例是netty应用开发的入口
2.2、创建Reactor主线程池,用于接收客户端请求的线程池职责
2.2.1、接收客户端TCP连接,初始化 Channel参数
2.2.2、将链路状态变更事件通知给 Channelpipeline
2.3、创建Reactor从线程池
2.3.1、异步读取通信对端的数据报,发送读事件到 Channelpipeline;
2.3.2、异步发送消息到通信对端,调用 Channelpipeline的消息发送接口;
2.3.3、执行系统调用Task;
2.3.4、执行定时任务Task,例如链路空闲状态监测定时任务.
2.4、设置netty主从线程池
2.5、设置Channel
2.6、childGroup的处理器,用来管理通道
2.7、设置选项 SO_BACKLOG 设置线程队列的链接个数
2.8、SO_KEEPALIVE 设置保持链接活动连接状态
2.9、绑定端口
2.10、关闭线程池
3、客户端
对于服务端核心在于设置childHandler,对于客户端核心在于设置handler。
服务端示例
public class SimpleChatServer{ public static void main(String[] args) { ServerBootstrap boot = new ServerBootstrap(); EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup = new NioEventLoopGroup(); try { boot.group(parentGroup, childGroup); boot.channel(NioServerSocketChannel.class); boot.childHandler(new SimpleChatServerInitializer()); boot.option(ChannelOption.SO_BACKLOG, 128); boot.childOption(ChannelOption.SO_KEEPALIVE, true); boot.bind(NettyConstant.PORT).sync().channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } } }
public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatServerHandler()); } }
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1) /** * A thread-safe Set Using ChannelGroup, you can categorize Channels into a meaningful group. * A closed Channel is automatically removed from the collection, */ public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入 "); channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开 "); // A closed Channel is automatically removed from ChannelGroup, // so there is no need to do "channels.remove(ctx.channel());" } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4) Channel incoming = ctx.channel(); for (Channel channel : channels) { if (channel != incoming){ channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + " "); } else { channel.writeAndFlush("[you]" + s + " "); } } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常"); // 当出现异常就关闭连接 cause.printStackTrace(); ctx.close(); } }
客户端示例
public class SimpleChatClient { public static void main(String[] args) { Bootstrap boot = new Bootstrap(); EventLoopGroup parentGroup = new NioEventLoopGroup(); try { boot.group(parentGroup); boot.channel(NioSocketChannel.class); boot.handler(new SimpleChatClientInitializer()); boot.option(ChannelOption.SO_BACKLOG, 128); Channel channel = boot.connect(NettyConstant.HOST, NettyConstant.PORT).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while(true){ channel.writeAndFlush(in.readLine() + " "); } } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { parentGroup.shutdownGracefully(); } } }
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatClientHandler()); } }
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(s); } }