Netty自定义编解码器
程序示例
继承ByteToMessageDecoder 的解码器
public class MyByteToLongDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decode invoked !");
System.out.println(in.readableBytes());
//out.add(in.readLong()); 没有加判断可能会出问题IndexOutOfBoundsException
if(in.readableBytes() >= 8){
out.add(in.readLong());
}
}
}
ReplayingDecoder解码器的实现
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 extends ReplayingDecoder ! ");
out.add(in.readLong());
}
}
MessageToByteEncoder编码器的实现
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("encode invoked !");
System.out.println(msg);
out.writeLong(msg);
}
}
MessageToMessageDecoder编码器的实现
MessageToMessageDecoder 消息类型解码为消息类型 这个可以用于第二个解码器,当第一个解码器将ByteBuf转化为消息类型之后使用
MessageToMessageDecoder 可以做数据类型转换等等其它操作
泛型表示待解析的消息类型,要传入进来的
public class MyLongToStringDecoder extends MessageToMessageDecoder<Long> {
@Override
protected void decode(ChannelHandlerContext ctx, Long msg, List<Object> out) throws Exception {
System.out.println("MyLongToStringDecoder extends MessageToMessageDecoder !");
out.add(String.valueOf(msg));
}
}
Server
public class MyServer {
public static void main(String[] args) throws Exception {
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
//----------------------------------------------------------------------
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//pipeline.addLast(new MyByteToLongDecoder()); 如果使用它需要将MyServerHandler的泛型改成Long,并把下面两行的代码注掉
pipeline.addLast(new MyByteToLongDecoder2());
pipeline.addLast(new MyLongToStringDecoder());
pipeline.addLast(new MyLongToByteEncoder());
pipeline.addLast(new MyServerHandler());
}
}
//----------------------------------------------------------------------
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
/**
* @param ctx 上下文,可以获取远程的信息,地址、连接对象
* @param msg 客户端发来的请求对象
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "," + msg);
ctx.writeAndFlush(654321L);
}
/**
* 出现异常的情况下怎么办
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Client
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
//----------------------------------------------------------------------
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//pipeline.addLast(new MyByteToLongDecoder());需要把下面两行的代码注掉
pipeline.addLast(new MyByteToLongDecoder2());
pipeline.addLast(new MyLongToByteEncoder());
pipeline.addLast(new MyClientHandler());
}
}
//----------------------------------------------------------------------
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("client output:" + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush(123456L); //成功
/*
两端都无内容输出。
报错,消息不会被发出,产生一个UnsupportedOperationException异常,
不支持的消息异常,并提示出Netty的期望消息类型是ByteBuf或FileRegion,其它类型无法被发送到网络
*/
//ctx.writeAndFlush(123456);
/*
正常运行,但是没有走自定义的编码器,而是直接在HeadContext的write方法中 unsafe.write(msg, promise); 将ByteBuf写出去了
但是对方是以Long的方式解码的,所以会出现数据混乱,不正确问题
*/
//ctx.channel().writeAndFlush(Unpooled.copiedBuffer("helloworld", Charset.forName("utf-8")));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}