一 编码解码概念
编码(encode)在程序中其实就是序列化,将对象转为 字节数组,方便于网络传输;
解码(decode)在程序中实际上就是反序列化,将字节数组转为原始对象。
在jdk 自带的序列化机制需要实现 java.io.Serializable接口并生成序列化ID,就可以实现对象的序列化
但java原生的序列化机制有些缺点:
- 无法跨语言,扩展性差;
- 序列化后码流太大,性能低;
二 编码器与节码器
2.1 解码器
netty 中提供了抽象节码器类ByteToMessageDecoder
,ReplayingDecoder
继承了ByteToMessageDecoder类,是对其的一种扩展,但并非所有的 ByteBuf 操作都被支持;MessageToMessageDecoder
解码器是将一种消息转为另一种消息;
MessageToMessageDecoder 中默认实现了一些解码器,我们可以直接拿来使用;比如 之前文章我们使用过的 字符串解码器StringDecoder,baes64解码器Base64Encoder;
ByteToMessageDecoder 也提供 了一些解码器,比如我们之前用过的 定长解码器FixedLengthFrameDecoder,换行解码器LineBasedFrameDecoder;
整个解码的流程是先将入站的数据通过解码器解码后传给 ChannelInboundHandlerAdapter;
2.2 编码器
与解码器对应,netty 提供了与之相对应的编码器;
ByteToMessageEncoder和MessageToMessageDecoder相对应;
MessageToByteEncoder 和 MessageToMessageDecoder 相对应;
2.3 手动实现字符串编解码
编码器实现,直接将字符串转为字节数组传输;
/**
* @Author lsc
* <p> </p>
*/
public class MsgEncode extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
String msg = (String) o;
byte[] bytes = msg.getBytes(Charset.forName("UTF-8"));
byteBuf.writeBytes(bytes);
}
}
解码器实现, 将字节数组转为字符串;
/**
* @Author lsc
* <p> </p>
*/
public class MsgDeocde extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int size = byteBuf.readableBytes();
if (size<=2){
System.out.println("报文长度不满足抛弃");
return;
}
// 创建字节数组
byte[] bytes = new byte[size];
// 缓冲区数据读入字节数组
byteBuf.readBytes(bytes);
// 编码转为字符串
String body = (new String(bytes, "UTF-8"));
list.add(body);
}
}
然后将编码器,与解码器注入 pipeline; 服务端示例如下;
/**
* @Author lsc
* <p>通道初始化 </p>
* @Param
* @Return
*/
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 管道(Pipeline)持有某个通道的全部处理器
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加编码器和解码器
pipeline.addLast(new MsgEncode());
pipeline.addLast(new MsgDeocde());
// 添加处理器
pipeline.addLast(new NettyServerHandler());
}
}
客户端也需要注入;
public void connect(int port, String host) throws InterruptedException {
// 创建线程组
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
// netty启动辅助类
Bootstrap bootstrap = new Bootstrap();
//
bootstrap.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 处理IO事件
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加编码器和解码器
pipeline.addLast(new MsgEncode());
pipeline.addLast(new MsgDeocde());
// 处理器
pipeline.addLast(new NettyClientHandler());
}
});
// 异步操作
ChannelFuture connect = bootstrap.connect(host, port).sync();
// 关闭客户端
connect.channel().closeFuture().sync();
// 退出线程组
nioEventLoopGroup.shutdownGracefully();
}
客户端处理器直接发送字符串消息
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public NettyClientHandler() {
super();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("Unexpected exception from downstream : [{}]" ,cause.getMessage());
}
/* *
* @Author lsc
* <p>触发回调 </p>
* @Param [ctx]
* @Return void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String message = "关注公众号知识追寻者回复netty获取本教程源码";
// 写入数据
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 打印
System.out.println("get the data from server: "+msg);
}
}
同理服务端处理器也是直接发送字符串消息
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(" get the data from client : " + msg);
// 构造响应数据
String responseData = "那天刚刚好遇见你";
ctx.writeAndFlush(responseData);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 写入 seocketChannel
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 异常关闭资源句柄
ctx.close();
}
}
服务启动后服务端效果示例图如下
客户端效果图示例如下