zoukankan      html  css  js  c++  java
  • Netty章节二十二:Netty自定义编解码器

    Netty自定义编解码器

    程序示例

    继承ByteToMessageDecoder 的解码器

    public class MyByteToLongDecoder extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println("decode invoked !");
    
            System.out.println(in.readableBytes());
    
            //out.add(in.readLong());   没有加判断可能会出问题IndexOutOfBoundsException
    
            if(in.readableBytes() >= 8){
                out.add(in.readLong());
            }
        }
    }
    
    

    ReplayingDecoder解码器的实现

    public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println("MyByteToLongDecoder2 extends ReplayingDecoder ! ");
    
            out.add(in.readLong());
        }
    }
    

    MessageToByteEncoder编码器的实现

    public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
            System.out.println("encode invoked !");
    
            System.out.println(msg);
            out.writeLong(msg);
        }
    }
    

    MessageToMessageDecoder编码器的实现

    MessageToMessageDecoder 消息类型解码为消息类型 这个可以用于第二个解码器,当第一个解码器将ByteBuf转化为消息类型之后使用
    MessageToMessageDecoder 可以做数据类型转换等等其它操作
    泛型表示待解析的消息类型,要传入进来的

    public class MyLongToStringDecoder extends MessageToMessageDecoder<Long> {
        @Override
        protected void decode(ChannelHandlerContext ctx, Long msg, List<Object> out) throws Exception {
            System.out.println("MyLongToStringDecoder extends MessageToMessageDecoder  !");
    
            out.add(String.valueOf(msg));
        }
    }
    

    Server

    public class MyServer {
        public static void main(String[] args) throws Exception {
    
            HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                        .childHandler(new MyServerInitializer());
                ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    //----------------------------------------------------------------------
    public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            
            //pipeline.addLast(new MyByteToLongDecoder()); 如果使用它需要将MyServerHandler的泛型改成Long,并把下面两行的代码注掉
            pipeline.addLast(new MyByteToLongDecoder2());
            pipeline.addLast(new MyLongToStringDecoder());
            pipeline.addLast(new MyLongToByteEncoder());
            pipeline.addLast(new MyServerHandler());
    
        }
    }
    //----------------------------------------------------------------------
    public class MyServerHandler extends SimpleChannelInboundHandler<String> {
    
        /**
         * @param ctx 上下文,可以获取远程的信息,地址、连接对象
         * @param msg 客户端发来的请求对象
         * @throws Exception
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + "," + msg);
    
            ctx.writeAndFlush(654321L);
        }
    
        /**
         * 出现异常的情况下怎么办
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    Client

    public class MyClient {
        public static void main(String[] args) throws Exception {
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                        .handler(new MyClientInitializer());
                ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
                channelFuture.channel().closeFuture().sync();
            }finally {
                eventLoopGroup.shutdownGracefully();
            }
    
        }
    }
    //----------------------------------------------------------------------
    public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
    
            //pipeline.addLast(new MyByteToLongDecoder());需要把下面两行的代码注掉
            pipeline.addLast(new MyByteToLongDecoder2());
            pipeline.addLast(new MyLongToByteEncoder());
            pipeline.addLast(new MyClientHandler());
        }
    }
    //----------------------------------------------------------------------
    public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
            System.out.println("client output:" + msg);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.channel().writeAndFlush(123456L);       //成功
    
            /*
                两端都无内容输出。
                报错,消息不会被发出,产生一个UnsupportedOperationException异常,
                不支持的消息异常,并提示出Netty的期望消息类型是ByteBuf或FileRegion,其它类型无法被发送到网络
             */
            //ctx.writeAndFlush(123456);
    
            /*
                正常运行,但是没有走自定义的编码器,而是直接在HeadContext的write方法中 unsafe.write(msg, promise); 将ByteBuf写出去了
                但是对方是以Long的方式解码的,所以会出现数据混乱,不正确问题
             */
            //ctx.channel().writeAndFlush(Unpooled.copiedBuffer("helloworld", Charset.forName("utf-8")));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
  • 相关阅读:
    WSGI 简介
    past.deploy
    python中self和cls
    heat template例子
    cinder-api 启动过程学习
    ubuntu安装cloud-init制作成openstack镜像---cloud-init篇
    sus 11.3如何安装与配置cloud-init
    DHCP工作原理
    交换
    路由器
  • 原文地址:https://www.cnblogs.com/mikisakura/p/13177533.html
Copyright © 2011-2022 走看看