TCP基于链接的协议,并且保证有序性。
但是,每个包的长度,需要明确,否则会发生粘包现象。
以下示例为一个自定义协议的例子,其中包含了拆包的内容。
所有的类:

协议类:
public class PersonProtocol {
private int length;
private byte[] content;
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
解码器类:
public class PersonProtocolDecode extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decode invoke!");
//拆包的内容如下:先获取长度,再根据获取的长度,获取到包内的内容。
int length = in.readInt();
byte[] content = new byte[length];
//获取到内容
in.readBytes(content);
PersonProtocol personProtocol = new PersonProtocol();
personProtocol.setLength(length);
personProtocol.setContent(content);
out.add(personProtocol);
}
}
编码器类:
public class PersonProtocolEncode extends MessageToByteEncoder<PersonProtocol>{
@Override
protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception {
System.out.println("encode invoke!");
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent());
//不需要flush的原因是,此时还是在jvm内部处理代码,并未涉及到io
}
}
服务器处理类:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
import java.util.UUID;
public class ServerHandler extends SimpleChannelInboundHandler<PersonProtocol>{
private int count = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
System.out.println("接收到的消息长度:"+msg.getLength()
+",消息内容: "+new String(msg.getContent(), Charset.forName("utf-8")));
System.out.println("消息的次数:"+ ++count);
PersonProtocol result = new PersonProtocol();
UUID uuid = UUID.randomUUID();
byte[] count = uuid.toString().getBytes(Charset.forName("utf-8"));
System.out.println("发给客户端的数据为:长度"+count.length+",内容:"+uuid );
result.setLength(count.length);
result.setContent(count);
ctx.channel().writeAndFlush(result);
}
}
客户端处理类:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
public class ClientHandler extends SimpleChannelInboundHandler<PersonProtocol>{
private int count = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
System.out.println("接收到的消息长度"+msg.getLength()
+",消息内容: "+new String(msg.getContent(), Charset.forName("utf-8")));
System.out.println("消息的次数:"+ ++count);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
String str = "你好吗?,我很好!,你好吗?我很好!";
for(int i = 0;i < str.split(",").length;i++){
String temp = str.split(",")[i];
System.out.println(temp+",i:"+i);
PersonProtocol pp = new PersonProtocol();
pp.setLength(temp.getBytes(Charset.forName("utf-8")).length);
pp.setContent(temp.getBytes(Charset.forName("utf-8")));
ctx.writeAndFlush(pp);
}
}
}
服务端启动类:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //分发事件循环组
EventLoopGroup workGroup = new NioEventLoopGroup();//处理通道事件循环组
ServerBootstrap serverBootstrap = new ServerBootstrap();//初始化服务器
serverBootstrap.group(bossGroup,workGroup) //将两个循环组绑定到服务器
.channel(NioServerSocketChannel.class) //指定通道类型,当前使用NIO模式
.childHandler(new ChannelInitializer<SocketChannel>(){ //指定通道中的过滤器链
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new PersonProtocolDecode()); //解码器过滤
pipeline.addLast(new PersonProtocolEncode());//编码器
pipeline.addLast(new ServerHandler());//具体的业务处理,一般放在最后面
}
});
ChannelFuture channelFuture = serverBootstrap.bind(12345).sync();//绑定到本机的12345端口,等待同步处理结果
channelFuture.channel().closeFuture().sync();//阻塞等待closeFuture的返回,同步等待
bossGroup.shutdownGracefully();//优雅关闭
workGroup.shutdownGracefully();
}
}
客户端启动类:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).
handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new PersonProtocolDecode());
pipeline.addLast(new PersonProtocolEncode());
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 12345).sync();
channelFuture.channel().closeFuture().sync();//客户端阻塞,一直运行。
eventLoopGroup.shutdownGracefully();//优雅关闭
}
}
运行服务端,再运行客户端,即可完成测试。