1、参考地址
https://waylau.gitbooks.io/netty-4-user-guide/content/Architectural-Overview/Universal-Asynchronous-IO-API.html
https://github.com/waylau/netty-4-user-guide-demos
2、服务端

以上为关键步骤,服务端处理器,渠道设置,通道设置,也就是第4步。
以下为建一个服务端为例
2.1、初始化ServerBootstrap实例,此实例是netty应用开发的入口
2.2、创建Reactor主线程池,用于接收客户端请求的线程池职责
2.2.1、接收客户端TCP连接,初始化 Channel参数
2.2.2、将链路状态变更事件通知给 Channelpipeline
2.3、创建Reactor从线程池
2.3.1、异步读取通信对端的数据报,发送读事件到 Channelpipeline;
2.3.2、异步发送消息到通信对端,调用 Channelpipeline的消息发送接口;
2.3.3、执行系统调用Task;
2.3.4、执行定时任务Task,例如链路空闲状态监测定时任务.
2.4、设置netty主从线程池
2.5、设置Channel
2.6、childGroup的处理器,用来管理通道
2.7、设置选项 SO_BACKLOG 设置线程队列的链接个数
2.8、SO_KEEPALIVE 设置保持链接活动连接状态
2.9、绑定端口
2.10、关闭线程池
3、客户端

对于服务端核心在于设置childHandler,对于客户端核心在于设置handler。
服务端示例
public class SimpleChatServer{
public static void main(String[] args) {
ServerBootstrap boot = new ServerBootstrap();
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
boot.group(parentGroup, childGroup);
boot.channel(NioServerSocketChannel.class);
boot.childHandler(new SimpleChatServerInitializer());
boot.option(ChannelOption.SO_BACKLOG, 128);
boot.childOption(ChannelOption.SO_KEEPALIVE, true);
boot.bind(NettyConstant.PORT).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatServerHandler());
}
}
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)
/**
* A thread-safe Set Using ChannelGroup, you can categorize Channels into a meaningful group.
* A closed Channel is automatically removed from the collection,
*/
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入
");
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开
");
// A closed Channel is automatically removed from ChannelGroup,
// so there is no need to do "channels.remove(ctx.channel());"
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "
");
} else {
channel.writeAndFlush("[you]" + s + "
");
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
客户端示例
public class SimpleChatClient {
public static void main(String[] args) {
Bootstrap boot = new Bootstrap();
EventLoopGroup parentGroup = new NioEventLoopGroup();
try {
boot.group(parentGroup);
boot.channel(NioSocketChannel.class);
boot.handler(new SimpleChatClientInitializer());
boot.option(ChannelOption.SO_BACKLOG, 128);
Channel channel = boot.connect(NettyConstant.HOST, NettyConstant.PORT).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while(true){
channel.writeAndFlush(in.readLine() + "
");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
}
}
}
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatClientHandler());
}
}
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(s);
}
}