netty 使用 tcp/ip 协议传输数据。而 tcp/ip 协议是类似水流一样的数据传输方式。多次 访问的时候有可能出现数据粘包的问题,解决这种问题的方式如下:
定长数据流
客户端和服务器,提前协调好,每个消息长度固定。(如:长度 10)。如果客户端或服 务器写出的数据不足 10,则使用空白字符补足(如:使用空格)。
/** * 1. 单线程组 * 2. Bootstrap配置启动信息 * 3. 注册业务处理Handler * 4. connect连接服务,并发起请求 */ import java.nio.charset.Charset; import java.util.Scanner; import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Client4FixedLength { // 处理请求和处理服务端响应的线程组 private EventLoopGroup group = null; // 服务启动相关配置信息 private Bootstrap bootstrap = null; public Client4FixedLength(){ init(); } private void init(){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); // 绑定线程组 bootstrap.group(group); // 设定通讯模式为NIO bootstrap.channel(NioSocketChannel.class); } public ChannelFuture doRequest(String host, int port) throws InterruptedException{ this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelHandler[] handlers = new ChannelHandler[3]; handlers[0] = new FixedLengthFrameDecoder(3); // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象 handlers[1] = new StringDecoder(Charset.forName("UTF-8")); handlers[2] = new Client4FixedLengthHandler(); ch.pipeline().addLast(handlers); } }); ChannelFuture future = this.bootstrap.connect(host, port).sync(); return future; } public void release(){ this.group.shutdownGracefully(); } public static void main(String[] args) { Client4FixedLength client = null; ChannelFuture future = null; try{ client = new Client4FixedLength(); future = client.doRequest("localhost", 9999); Scanner s = null; while(true){ s = new Scanner(System.in); System.out.print("enter message send to server > "); String line = s.nextLine(); byte[] bs = new byte[5]; byte[] temp = line.getBytes("UTF-8"); if(temp.length <= 5){ for(int i = 0; i < temp.length; i++){ bs[i] = temp[i]; } } future.channel().writeAndFlush(Unpooled.copiedBuffer(bs)); TimeUnit.SECONDS.sleep(1); } }catch(Exception e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != client){ client.release(); } } } }
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class Client4FixedLengthHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try{ String message = msg.toString(); System.out.println("from server : " + message); }finally{ // 用于释放缓存。避免内存溢出 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } }
/** * 1. 双线程组 * 2. Bootstrap配置启动信息 * 3. 注册业务处理Handler * 4. 绑定服务监听端口并启动服务 */ import java.nio.charset.Charset; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Server4FixedLength { // 监听线程组,监听客户端请求 private EventLoopGroup acceptorGroup = null; // 处理客户端相关操作线程组,负责处理与客户端的数据通讯 private EventLoopGroup clientGroup = null; // 服务启动相关配置信息 private ServerBootstrap bootstrap = null; public Server4FixedLength(){ init(); } private void init(){ acceptorGroup = new NioEventLoopGroup(); clientGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); // 绑定线程组 bootstrap.group(acceptorGroup, clientGroup); // 设定通讯模式为NIO bootstrap.channel(NioServerSocketChannel.class); // 设定缓冲区大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效) bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024) .option(ChannelOption.SO_RCVBUF, 16*1024) .option(ChannelOption.SO_KEEPALIVE, true); } public ChannelFuture doAccept(int port) throws InterruptedException{ bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelHandler[] acceptorHandlers = new ChannelHandler[3]; // 定长Handler。通过构造参数设置消息长度(单位是字节)。发送的消息长度不足可以使用空格补全。 acceptorHandlers[0] = new FixedLengthFrameDecoder(5); // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象 acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8")); acceptorHandlers[2] = new Server4FixedLengthHandler(); ch.pipeline().addLast(acceptorHandlers); } }); ChannelFuture future = bootstrap.bind(port).sync(); return future; } public void release(){ this.acceptorGroup.shutdownGracefully(); this.clientGroup.shutdownGracefully(); } public static void main(String[] args){ ChannelFuture future = null; Server4FixedLength server = null; try{ server = new Server4FixedLength(); future = server.doAccept(9999); System.out.println("server started."); future.channel().closeFuture().sync(); }catch(InterruptedException e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != server){ server.release(); } } } }
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class Server4FixedLengthHandler extends ChannelHandlerAdapter { // 业务处理逻辑 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String message = msg.toString(); System.out.println("from client : " + message.trim()); String line = "ok "; ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))); } // 异常处理逻辑 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } }
特殊结束符
客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、 ‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。
import java.nio.charset.Charset; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Server4Delimiter { // 监听线程组,监听客户端请求 private EventLoopGroup acceptorGroup = null; // 处理客户端相关操作线程组,负责处理与客户端的数据通讯 private EventLoopGroup clientGroup = null; // 服务启动相关配置信息 private ServerBootstrap bootstrap = null; public Server4Delimiter(){ init(); } public void init(){ acceptorGroup = new NioEventLoopGroup(); clientGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); // 绑定线程组 bootstrap.group(acceptorGroup, clientGroup); // 设定通讯模式为NIO bootstrap.channel(NioServerSocketChannel.class); // 设定缓冲区大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效) bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024) .option(ChannelOption.SO_RCVBUF, 16*1024) .option(ChannelOption.SO_KEEPALIVE, true); } public ChannelFuture doAccept(int port) throws InterruptedException{ bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){ @Override public void initChannel(SocketChannel ch) throws Exception{ // 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。 ByteBuf delimiter =Unpooled.copiedBuffer("$E$".getBytes()); ChannelHandler[] acceptorHandlers =new ChannelHandler[3]; // 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰, // 必须每次初始化通道时创建一个新对象 // 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。 acceptorHandlers[0]=new DelimiterBasedFrameDecoder(1024, delimiter); // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象 acceptorHandlers[1]=new StringDecoder(Charset.forName("UTF-8")); acceptorHandlers[2]=new Server4DelimiterHandler(); ch.pipeline().addLast(acceptorHandlers); } }); ChannelFuture future =bootstrap.bind(port).sync(); return future; } public void release(){ this.acceptorGroup.shutdownGracefully(); this.clientGroup.shutdownGracefully(); } public static void main(String[] args) { ChannelFuture future = null; Server4Delimiter server = null; try{ server = new Server4Delimiter(); future = server.doAccept(9999); System.out.println("server started."); future.channel().closeFuture().sync(); }catch(InterruptedException e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != server){ server.release(); } } } }
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class Server4DelimiterHandler extends ChannelHandlerAdapter { // 业务处理逻辑 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String message = msg.toString(); System.out.println("from client : " + message); String line = "server message $E$ test delimiter handler!! $E$ second message $E$"; ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))); } // 异常处理逻辑 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } }
import java.nio.charset.Charset; import java.util.Scanner; import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Client4Delimiter { // 处理请求和处理服务端响应的线程组 private EventLoopGroup group = null; // 服务启动相关配置信息 private Bootstrap bootstrap = null; public Client4Delimiter(){ init(); } private void init(){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); // 绑定线程组 bootstrap.group(group); // 设定通讯模式为NIO bootstrap.channel(NioSocketChannel.class); } public ChannelFuture doRequest(String host, int port) throws InterruptedException{ this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 数据分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes()); ChannelHandler[] handlers = new ChannelHandler[3]; handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter); // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象 handlers[1] = new StringDecoder(Charset.forName("UTF-8")); handlers[2] = new Client4DelimiterHandler(); ch.pipeline().addLast(handlers); } }); ChannelFuture future = this.bootstrap.connect(host, port).sync(); return future; } public void release(){ this.group.shutdownGracefully(); } public static void main(String[] args) { Client4Delimiter client = null; ChannelFuture future = null; try{ client = new Client4Delimiter(); future = client.doRequest("localhost", 9999); Scanner s = null; while(true){ s = new Scanner(System.in); System.out.print("enter message send to server > "); String line = s.nextLine(); future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))); TimeUnit.SECONDS.sleep(1); } }catch(Exception e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != client){ client.release(); } } } }
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class Client4DelimiterHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try{ String message = msg.toString(); System.out.println("from server : " + message); }finally{ // 用于释放缓存。避免内存溢出 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } }
协议
相对最成熟的数据传递方式。有服务器的开发者提供一个固定格式的协议标准。客户端 和服务器发送数据和接受数据的时候,都依据协议制定和解析消息。
自定义协议格式:
协议格式: HEADcontent-length:xxxxHEADBODYxxxxxxBODY
/** * 1. 双线程组 * 2. Bootstrap配置启动信息 * 3. 注册业务处理Handler * 4. 绑定服务监听端口并启动服务 */ import java.nio.charset.Charset; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; public class Server4Protocol { // 监听线程组,监听客户端请求 private EventLoopGroup acceptorGroup = null; // 处理客户端相关操作线程组,负责处理与客户端的数据通讯 private EventLoopGroup clientGroup = null; // 服务启动相关配置信息 private ServerBootstrap bootstrap = null; public Server4Protocol() { init(); } private void init() { acceptorGroup = new NioEventLoopGroup(); clientGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); // 绑定线程组 bootstrap.group(acceptorGroup, clientGroup); // 设定通讯模式为NIO bootstrap.channel(NioServerSocketChannel.class); // 设定缓冲区大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效) bootstrap.option(ChannelOption.SO_SNDBUF, 16 * 1024).option(ChannelOption.SO_RCVBUF, 16 * 1024) .option(ChannelOption.SO_KEEPALIVE, true); } public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException { bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(acceptorHandlers); } }); ChannelFuture future = bootstrap.bind(port).sync(); return future; } public void release() { this.acceptorGroup.shutdownGracefully(); this.clientGroup.shutdownGracefully(); } public static void main(String[] args) { ChannelFuture future = null; Server4Protocol server = null; try { server = new Server4Protocol(); future = server.doAccept(9999, new Server4ProtocolHandler()); System.out.println("server started."); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != server){ server.release(); } } } }
/** * @Sharable注解 - * 代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。 * 如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。 * */ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @Sharable public class Server4ProtocolHandler extends ChannelHandlerAdapter { // 业务处理逻辑 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String message = msg.toString(); System.out.println("server receive protocol content : " + message); message = ProtocolParser.parse(message); if(null == message){ System.out.println("error request from client"); return ; } System.out.println("from client : " + message); String line = "server message"; line = ProtocolParser.transferTo(line); System.out.println("server send protocol content : " + line); ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))); } // 异常处理逻辑 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server exceptionCaught method run..."); cause.printStackTrace(); ctx.close(); } static class ProtocolParser{ public static String parse(String message) { String[] temp=message.split("HEADBODY"); temp[0]=temp[0].substring(4); temp[1]=temp[1].substring(0,temp[1].length()-4); int length=Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1)); if(length != temp[1].length()){ return null; } return temp[1]; } public static String transferTo(String message){ message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY"; return message; } } }
import java.nio.charset.Charset; import java.util.Scanner; import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; public class Client4Protocol { // 处理请求和处理服务端响应的线程组 private EventLoopGroup group = null; // 服务启动相关配置信息 private Bootstrap bootstrap = null; public Client4Protocol(){ init(); } private void init(){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); // 绑定线程组 bootstrap.group(group); // 设定通讯模式为NIO bootstrap.channel(NioSocketChannel.class); } public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{ this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(handlers); } }); ChannelFuture future = this.bootstrap.connect(host, port).sync(); return future; } public void release(){ this.group.shutdownGracefully(); } public static void main(String[] args) { Client4Protocol client = null; ChannelFuture future = null; try{ client = new Client4Protocol(); future = client.doRequest("localhost", 9999, new Client4ProtocolHandler()); Scanner s = null; while(true){ s = new Scanner(System.in); System.out.print("enter message send to server > "); String line = s.nextLine(); line = Client4ProtocolHandler.ProtocolParser.transferTo(line); System.out.println("client send protocol content : " + line); future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))); TimeUnit.SECONDS.sleep(1); } }catch(Exception e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != client){ client.release(); } } } }
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class Client4ProtocolHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try{ String message = msg.toString(); System.out.println("client receive protocol content : " + message); message = ProtocolParser.parse(message); if(null == message){ System.out.println("error response from server"); return ; } System.out.println("from server : " + message); }finally{ // 用于释放缓存。避免内存溢出 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } static class ProtocolParser{ public static String parse(String message){ String[] temp = message.split("HEADBODY"); temp[0] = temp[0].substring(4); temp[1] = temp[1].substring(0, (temp[1].length()-4)); int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1)); if(length != temp[1].length()){ return null; } return temp[1]; } public static String transferTo(String message){ message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY"; return message; } } }
序列化对象
JBoss Marshalling 序列化
Java 是面向对象的开发语言。传递的数据如果是 Java 对象,应该是最方便且可靠。
/** * 1. 双线程组 * 2. Bootstrap配置启动信息 * 3. 注册业务处理Handler * 4. 绑定服务监听端口并启动服务 */ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import utils.SerializableFactory4Marshalling; public class Server4Serializable { // 监听线程组,监听客户端请求 private EventLoopGroup acceptorGroup = null; // 处理客户端相关操作线程组,负责处理与客户端的数据通讯 private EventLoopGroup clientGroup = null; // 服务启动相关配置信息 private ServerBootstrap bootstrap = null; public Server4Serializable(){ init(); } private void init(){ acceptorGroup = new NioEventLoopGroup(); clientGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); // 绑定线程组 bootstrap.group(acceptorGroup, clientGroup); // 设定通讯模式为NIO bootstrap.channel(NioServerSocketChannel.class); // 设定缓冲区大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效) bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024) .option(ChannelOption.SO_RCVBUF, 16*1024) .option(ChannelOption.SO_KEEPALIVE, true); } public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{ bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder()); ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder()); ch.pipeline().addLast(acceptorHandlers); } }); ChannelFuture future=bootstrap.bind(port).sync(); return future; } public void release(){ this.acceptorGroup.shutdownGracefully(); this.clientGroup.shutdownGracefully(); } public static void main(String[] args){ ChannelFuture future = null; Server4Serializable server = null; try{ server = new Server4Serializable(); future = server.doAccept(9999,new Server4SerializableHandler()); System.out.println("server started."); future.channel().closeFuture().sync(); }catch(InterruptedException e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != server){ server.release(); } } } }
/** * @Sharable注解 - * 代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。 * 如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。 * */ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import utils.GzipUtils; import utils.RequestMessage; import utils.ResponseMessage; @Sharable public class Server4SerializableHandler extends ChannelHandlerAdapter{ // 业务处理逻辑 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from client : ClassName - " + msg.getClass().getName() + " ; message : " + msg.toString()); if(msg instanceof RequestMessage){ RequestMessage request = (RequestMessage)msg; //byte[] attachment = GzipUtils.unzip(request.getAttachment()); //System.out.println(new String(attachment)); } ResponseMessage response = new ResponseMessage(0L, "test response"); ctx.writeAndFlush(response); } // 异常处理逻辑 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server exceptionCaught method run..."); cause.printStackTrace(); ctx.close(); } }
/** * 1. 单线程组 * 2. Bootstrap配置启动信息 * 3. 注册业务处理Handler * 4. connect连接服务,并发起请求 */ import java.util.Random; import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import utils.GzipUtils; import utils.RequestMessage; import utils.SerializableFactory4Marshalling; public class Client4Serializable { // 处理请求和处理服务端响应的线程组 private EventLoopGroup group = null; // 服务启动相关配置信息 private Bootstrap bootstrap = null; public Client4Serializable() { init(); } private void init() { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); // 绑定线程组 bootstrap.group(group); // 设定通讯模式为NIO bootstrap.channel(NioSocketChannel.class); } public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException { this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder()); ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder()); ch.pipeline().addLast(handlers); } }); ChannelFuture future = this.bootstrap.connect(host, port).sync(); return future; } public void release(){ this.group.shutdownGracefully(); } public static void main(String[] args) { Client4Serializable client = null; ChannelFuture future = null; try{ client = new Client4Serializable(); future = client.doRequest("localhost", 9999, new Client4SerializableHandler()); String attachment = "test attachment"; byte[] attBuf = attachment.getBytes(); //attBuf = GzipUtils.zip(attBuf); RequestMessage msg = new RequestMessage(new Random().nextLong(), "test",new byte[0]); //"test", attBuf); future.channel().writeAndFlush(msg); TimeUnit.SECONDS.sleep(1); future.addListener(ChannelFutureListener.CLOSE); }catch(Exception e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != client){ client.release(); } } } }
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class Client4SerializableHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from server : ClassName - " + msg.getClass().getName() + " ; message : " + msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught method run..."); cause.printStackTrace(); ctx.close(); } }
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; public class GzipUtils { public static void main(String[] args) throws Exception { FileInputStream fis = new FileInputStream("D:\3\1.jpg"); byte[] temp = new byte[fis.available()]; int length=fis.read(temp); System.out.println("长度 : " + length); byte[] zipArray = GzipUtils.zip(temp); System.out.println("压缩后的长度 : " + zipArray.length); byte[] unzipArray = GzipUtils.unzip(zipArray); System.out.println("解压缩后的长度 : " + unzipArray.length); FileOutputStream fos = new FileOutputStream("D:\3\101.jpg"); fos.write(unzipArray); fos.flush(); fos.close(); fis.close(); } /** * 解压缩 * @param source 源数据。需要解压的数据。 * @return 解压后的数据。 恢复的数据。 * @throws Exception */ public static byte[] unzip(byte[] source) throws Exception{ ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayInputStream in = new ByteArrayInputStream(source); // JDK提供的。 专门用于压缩使用的流对象。可以处理字节数组数据。 GZIPInputStream zipIn = new GZIPInputStream(in); byte[] temp=new byte[256]; int length = 0; while((length = zipIn.read(temp, 0, temp.length)) != -1){ out.write(temp, 0, length); } // 将字节数组输出流中的数据,转换为一个字节数组。 byte[] target = out.toByteArray(); zipIn.close(); out.close(); return target; } /** * 压缩 * @param source 源数据,需要压缩的数据 * @return 压缩后的数据。 * @throws Exception */ public static byte[] zip(byte[] source) throws Exception{ ByteArrayOutputStream out = new ByteArrayOutputStream(); // 输出流,JDK提供的,提供解压缩功能。 GZIPOutputStream zipOut = new GZIPOutputStream(out); // 将压缩信息写入到内存。 写入的过程会实现解压。 zipOut.write(source); // 结束。 zipOut.finish(); byte[] target = out.toByteArray(); zipOut.close(); return target; } }
import java.io.Serializable; public class RequestMessage implements Serializable { private static final long serialVersionUID = 7084843947860990140L; private Long id; private String message; private byte[] attachment; @Override public String toString() { return "RequestMessage [id=" + id + ", message=" + message + "]"; } public RequestMessage() { super(); } public RequestMessage(Long id, String message, byte[] attachment) { super(); this.id = id; this.message = message; this.attachment = attachment; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } }
import java.io.Serializable; public class ResponseMessage implements Serializable { private static final long serialVersionUID = -8134313953478922076L; private Long id; private String message; @Override public String toString() { return "ResponseMessage [id=" + id + ", message=" + message + "]"; } public ResponseMessage() { super(); } public ResponseMessage(Long id, String message) { super(); this.id = id; this.message = message; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; public class SerializableFactory4Marshalling { /** * 创建Jboss Marshalling解码器MarshallingDecoder * * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 // jboss-marshalling-serial 包提供 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); // 创建了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); // 序列化版本。只要使用JDK5以上版本,version只能定义为5。 configuration.setVersion(5); // 根据marshallerFactory和configuration创建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } /** * 创建Jboss Marshalling编码器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
定时断线重连
客户端断线重连机制。
客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。
要 求对数据的即时性不高的时候,才可使用。
优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对 资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120 秒自动断线。数据变 化 1000 次请求服务器一次。300 秒中自动发送不足 1000 次的变化数据
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; import utils.SerializableFactory4Marshalling; public class Server4Timer { // 监听线程组,监听客户端请求 private EventLoopGroup acceptorGroup = null; // 处理客户端相关操作线程组,负责处理与客户端的数据通讯 private EventLoopGroup clientGroup = null; // 服务启动相关配置信息 private ServerBootstrap bootstrap = null; public Server4Timer(){ init(); } private void init(){ acceptorGroup = new NioEventLoopGroup(); clientGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); // 绑定线程组 bootstrap.group(acceptorGroup, clientGroup); // 设定通讯模式为NIO bootstrap.channel(NioServerSocketChannel.class); // 设定缓冲区大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效) bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024) .option(ChannelOption.SO_RCVBUF, 16*1024) .option(ChannelOption.SO_KEEPALIVE, true); // 增加日志Handler,日志级别为info // bootstrap.handler(new LoggingHandler(LogLevel.INFO)); } public ChannelFuture doAccept(int port) throws InterruptedException{ bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder()); ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder()); // 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。 // 构造参数,就是间隔时长。 默认的单位是秒。 // 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit); ch.pipeline().addLast(new ReadTimeoutHandler(3)); ch.pipeline().addLast(new Server4TimerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); return future; } public void release(){ this.acceptorGroup.shutdownGracefully(); this.clientGroup.shutdownGracefully(); } public static void main(String[] args){ ChannelFuture future = null; Server4Timer server = null; try{ server = new Server4Timer(); future = server.doAccept(9999); System.out.println("server started."); future.channel().closeFuture().sync(); }catch(InterruptedException e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != server){ server.release(); } } } }
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import utils.ResponseMessage; public class Server4TimerHandler extends ChannelHandlerAdapter { // 业务处理逻辑 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from client : ClassName - " + msg.getClass().getName() + " ; message : " + msg.toString()); ResponseMessage response = new ResponseMessage(0L, "test response"); ctx.writeAndFlush(response); } // 异常处理逻辑 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } }
import java.util.Random; import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.WriteTimeoutHandler; import utils.RequestMessage; import utils.SerializableFactory4Marshalling; public class Client4Timer { // 处理请求和处理服务端响应的线程组 private EventLoopGroup group = null; // 服务启动相关配置信息 private Bootstrap bootstrap = null; private ChannelFuture future = null; public Client4Timer(){ init(); } private void init(){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); // 绑定线程组 bootstrap.group(group); // 设定通讯模式为NIO bootstrap.channel(NioSocketChannel.class); // bootstrap.handler(new LoggingHandler(LogLevel.INFO)); } public void setHandlers() throws InterruptedException{ this.bootstrap.handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder()); ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder()); // 写操作自定断线。 在指定时间内,没有写操作,自动断线。 ch.pipeline().addLast(new WriteTimeoutHandler(3)); ch.pipeline().addLast(new Client4TimerHandler()); } }); } public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{ if(future == null){ future = this.bootstrap.connect(host, port).sync(); } if(!future.channel().isActive()){ future = this.bootstrap.connect(host, port).sync(); } return future; } public void release(){ this.group.shutdownGracefully(); } public static void main(String[] args) { Client4Timer client = null; ChannelFuture future = null; try{ client = new Client4Timer(); client.setHandlers(); future = client.getChannelFuture("localhost", 9999); for(int i = 0; i < 3; i++){ RequestMessage msg = new RequestMessage(new Random().nextLong(), "test"+i, new byte[0]); future.channel().writeAndFlush(msg); TimeUnit.SECONDS.sleep(2); } TimeUnit.SECONDS.sleep(5); future = client.getChannelFuture("localhost", 9999); RequestMessage msg = new RequestMessage(new Random().nextLong(), "test", new byte[0]); future.channel().writeAndFlush(msg); }catch(Exception e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != client){ client.release(); } } } }
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class Client4TimerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from server : ClassName - " + msg.getClass().getName() + " ; message : " + msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught method run..."); cause.printStackTrace(); ctx.close(); } /** * 当连接建立成功后,出发的代码逻辑。 * 在一次连接中只运行唯一一次。 * 通常用于实现连接确认和资源初始化的。 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active"); } }