zoukankan      html  css  js  c++  java
  • Netty(四、序列化与反序列化)

    序列化就是将对象的状态信息转换成可以存储或传输的过程。

    Netty序列化对象一般有以下几种方式:

    JDK

    JBoss Marshalling

    Protocol Buffers

    kryo

    JDK

    实体类

    Request

    package com.wk.test.nettyTest.jdk;
    
    import java.io.Serializable;
    
    public class Request implements Serializable {
        private String id;
        private String name;
        private String info;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getInfo() {
            return info;
        }
    
        public void setInfo(String info) {
            this.info = info;
        }
    }

    Response

    package com.wk.test.nettyTest.jdk;
    
    import java.io.Serializable;
    
    public class Response implements Serializable{
        
        private static final long serialVersionUID = 1L;
        
        private String id;
        private String name;
        private String responseMessage;
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getResponseMessage() {
            return responseMessage;
        }
        public void setResponseMessage(String responseMessage) {
            this.responseMessage = responseMessage;
        }
        
    
    }

    服务端 

    NettyServerTest

    package com.wk.test.nettyTest.jdk;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyServerTest {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class);
    
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //设置日志
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            sc.pipeline().addLast(new ObjectEncoder());
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));
                            sc.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            ChannelFuture cf = b.bind(8090).sync();
    
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }

    ServerHandler

    package com.wk.test.nettyTest.jdk;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request request = (Request)msg;
            System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo());
            Response response = new Response();
            response.setId(request.getId());
            response.setName("response" + request.getName());
            response.setResponseMessage("响应内容" + request.getInfo());
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        
        
    }

    客户端

    NettyClientTest

    package com.wk.test.nettyTest.jdk;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyClientTest {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class);
    
        private static class SingletonHolder {
            static final NettyClientTest instance = new NettyClientTest();
        }
    
        public static NettyClientTest getInstance() {
            return SingletonHolder.instance;
        }
    
        private EventLoopGroup group;
        private Bootstrap b;
        private ChannelFuture cf;
    
        private NettyClientTest() {
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            sc.pipeline().addLast(new ObjectEncoder());
                            //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));
                            sc.pipeline().addLast(new ClientHandler());
                        }
                    });
        }
    
        public void connect() {
            try {
                this.cf = b.connect("127.0.0.1", 8090).sync();
                System.out.println("远程服务器已经连接, 可以进行数据交换..");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public ChannelFuture getChannelFuture() {
    
            if (this.cf == null) {
                this.connect();
            }
            if (!this.cf.channel().isActive()) {
                this.connect();
            }
    
            return this.cf;
        }
    
        public static void main(String[] args) throws InterruptedException {
            final NettyClientTest c = NettyClientTest.getInstance();
            ChannelFuture future = c.getChannelFuture();
    
            Request request = new Request();
            request.setId("1");
            request.setName("上杉绘梨衣");
            request.setInfo("04.24,和Sakura去东京天空树,世界上最暖和的地方在天空树的顶上。");
            future.channel().writeAndFlush(request).sync();
    
            Request request2 = new Request();
            request2.setId("2");
            request2.setName("上杉绘梨衣");
            request2.setInfo("04.26,和Sakura去明治神宫,有人在那里举办婚礼。");
            future.channel().writeAndFlush(request2);
    
            Request request3 = new Request();
            request3.setId("3");
            request3.setName("上杉绘梨衣");
            request3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。");
            future.channel().writeAndFlush(request3);
    
            Request request4 = new Request();
            request4.setId("4");
            request4.setName("上杉绘梨衣");
            request4.setInfo("Sakura最好了。");
            future.channel().writeAndFlush(request4);
    
            future.channel().closeFuture().sync();
    
        }
    }

    ClientHandler

    package com.wk.test.nettyTest.jdk;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Response resp = (Response)msg;
                System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
        
    }

    JBoss Marshalling

    这种序列化效率比JDK快三倍左右,这里暂不介绍。

    protobuf

    谷歌开源的一种二进制数据格式,是目前序列化最快的。

    相较于json和xml来说,序列化后体积小,传输速率快。序列化后不可读,必须反序列化才可读。

    使用

    1.下载

    下载地址:https://github.com/google/protobuf/releases

    这里下载protoc-3.11.4-win64,windows系统使用的protoc.exe

    2.编写proto格式文件

    我们需要编写一个.proto格式的协议文件,通过该协议文件来生产java类,具体的语法和规则可以参考官方文档。这里只举个例子:

    Request.proto

    syntax = "proto3";
    
    option java_package = "com.wk.test.nettyTest.proto";
    
    option java_outer_classname = "Request";
    
    message MessageRequest{
        uint64 id = 1;
        string name = 2;
        string info = 3;
    }
    syntax = "proto3";是使用的协议版本是3
    java_package 是生成文件的包路径
    java_outer_classname 是类名
    message MessageRequest{
        uint64 id = 1;
        string name = 2;
        string info = 3;
    }
    消息体内容:
    64 int类型的id
    string 姓名和内容
    后面的数字代表一个应答序号,同一级别下不可重复

    3.生成协议文件对应的消息类

    CMD命令到我们下载好的protoc.exe目录下,执行命令

    protoc.exe ./Request.proto --java_out=./

    生成Requst.java

    4.编写代码

    准备工作已经结束了,我们将.proto文件和生成的java文件放入相对应的程序中就可以开始开发了

    开发

    pom.xml

            <!-- protobuf -->
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>3.11.4</version>
            </dependency>

    这里注意要跟下载的protoc.exe版本一致

    实体类

    就是生成的java和proto文件

    服务端

    NettyServerTest

    package com.wk.test.nettyTest.proto;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyServerTest {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class);
    
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //设置日志
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            sc.pipeline().addLast(new ProtobufDecoder(Request.MessageRequest.getDefaultInstance()));
    
                            sc.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            sc.pipeline().addLast(new ProtobufEncoder());
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));
                            sc.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            ChannelFuture cf = b.bind(8090).sync();
    
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }

    ServerHandler

    package com.wk.test.nettyTest.proto;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request.MessageRequest request = (Request.MessageRequest)msg;
            System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo());
            ctx.writeAndFlush(request);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        
        
    }

    客户端

    NettyClientTest

    package com.wk.test.nettyTest.proto;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyClientTest {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class);
    
        private static class SingletonHolder {
            static final NettyClientTest instance = new NettyClientTest();
        }
    
        public static NettyClientTest getInstance() {
            return SingletonHolder.instance;
        }
    
        private EventLoopGroup group;
        private Bootstrap b;
        private ChannelFuture cf;
    
        private NettyClientTest() {
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            sc.pipeline().addLast(new ProtobufDecoder(Request.MessageRequest.getDefaultInstance()));
    
                            sc.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            sc.pipeline().addLast(new ProtobufEncoder());
                            //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));
                            sc.pipeline().addLast(new ClientHandler());
                        }
                    });
        }
    
        public void connect() {
            try {
                this.cf = b.connect("127.0.0.1", 8090).sync();
                System.out.println("远程服务器已经连接, 可以进行数据交换..");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public ChannelFuture getChannelFuture() {
    
            if (this.cf == null) {
                this.connect();
            }
            if (!this.cf.channel().isActive()) {
                this.connect();
            }
    
            return this.cf;
        }
    
        public static void main(String[] args) throws InterruptedException {
            final NettyClientTest c = NettyClientTest.getInstance();
            ChannelFuture future = c.getChannelFuture();
    
            Request.MessageRequest.Builder builder =Request.MessageRequest.newBuilder();
            builder.setId(1);
            builder.setName("上杉绘梨衣");
            builder.setInfo("04.24,和Sakura去东京天空树,世界上最暖和的地方在天空树的顶上。");
            future.channel().writeAndFlush(builder.build()).sync();
    
            Request.MessageRequest.Builder builder2 =Request.MessageRequest.newBuilder();
            builder2.setId(2);
            builder2.setName("上杉绘梨衣");
            builder2.setInfo("04.26,和Sakura去明治神宫,有人在那里举办婚礼。");
            future.channel().writeAndFlush(builder2.build());
    
            Request.MessageRequest.Builder builder3 =Request.MessageRequest.newBuilder();
            builder3.setId(3);
            builder3.setName("上杉绘梨衣");
            builder3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。");
            future.channel().writeAndFlush(builder3.build());
    
            Request.MessageRequest.Builder builder4 =Request.MessageRequest.newBuilder();
            builder4.setId(4);
            builder4.setName("上杉绘梨衣");
            builder4.setInfo("Sakura最好了。");
            future.channel().writeAndFlush(builder4.build());
    
            future.channel().closeFuture().sync();
    
        }
    }

    ClientHandler

    package com.wk.test.nettyTest.proto;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Request.MessageRequest request = (Request.MessageRequest)msg;
                System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo());
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
        
    }

    优缺点

    优点:protobuf是目前序列化最快的没有之一,较json,xml传输体积小,速率高,适合高性能通讯的应用场景

    缺点:如果修改消息内容,则需要重新生成java类。proto文件和java文件不对应则报错。

    Kryo(推荐使用)

    kryo是基于proto的序列化框架,目前的dubbo中就是使用的它,速率仅次于protobuf,体积小,且不用通过proto文件生成java类。

    pom.xml

    <!-- kryo -->
            <dependency>
                <groupId>com.esotericsoftware</groupId>
                <artifactId>kryo</artifactId>
                <version>5.0.0-RC5</version>
            </dependency>

    实体类 Request

    package com.wk.test.nettyTest.kryo;
    
    import java.io.Serializable;
    
    public class Request implements Serializable {
        private String id;
        private String name;
        private String info;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getInfo() {
            return info;
        }
    
        public void setInfo(String info) {
            this.info = info;
        }
    }

    封装kryo

    因为kryo是线程不安全的,因此我们要对kryo进行一层封装

    Serializer

    序列化接口类

    package com.wk.test.nettyTest.kryo;
    
    public interface Serializer {
        //序列化接口
        byte[] serialize(Object object);
        //反序列化接口
        <T> T deserialize(byte[] bytes);
    }

    KryoSerializer

    序列化实现类,通过ThreadLocal 使每个kryo都有一个线程副本,不会相互影响。

    package com.wk.test.nettyTest.kryo;
    
    import com.esotericsoftware.kryo.Kryo;
    import com.esotericsoftware.kryo.io.Input;
    import com.esotericsoftware.kryo.io.Output;
    import com.esotericsoftware.kryo.serializers.BeanSerializer;
    import org.apache.commons.io.IOUtils;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    
    public class KryoSerializer implements Serializer {
    
        private final Class<?> clazz;
    
        public KryoSerializer(Class<?> clazz){
            this.clazz = clazz;
        }
    
    
        final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>(){
            @Override
            protected Kryo initialValue(){
                Kryo kryo = new Kryo();
                kryo.register(clazz, new BeanSerializer(kryo,clazz));
                return kryo;
            }
        };
    
        private Kryo getKryo(){
            return kryoThreadLocal.get();
        }
    
        @Override
        public byte[] serialize(Object object) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Output output = new Output(byteArrayOutputStream);
            try {
                Kryo kryo = getKryo();
                kryo.writeObjectOrNull(output,object,object.getClass());
                output.flush();
                return byteArrayOutputStream.toByteArray();
            }finally {
                IOUtils.closeQuietly(output);
                IOUtils.closeQuietly(byteArrayOutputStream);
            }
    
        }
    
        @Override
        public <T> T deserialize(byte[] bytes) {
            if(bytes ==null){
                return null;
            }
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            Input input = new Input(byteArrayInputStream);
            try {
                Kryo kryo = getKryo();
                return (T) kryo.readObjectOrNull(input,clazz);
            }finally {
                IOUtils.closeQuietly(input);
                IOUtils.closeQuietly(byteArrayInputStream);
            }
        }
    }

    KryoSerializerFactory

    工厂类,通过传入class来获取相对应的序列化工具类

    package com.wk.test.nettyTest.kryo;
    
    public class KryoSerializerFactory {
        public static Serializer getSerializer(Class<?> clazz){
            return new KryoSerializer(clazz);
        }
    }

    编码、解码类(也可以称为序列化、反序列化类)

    KryoMsgEncoder

    package com.wk.test.nettyTest.kryo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class KryoMsgEncoder extends MessageToByteEncoder<Request> {
    
        private Serializer serializer = KryoSerializerFactory.getSerializer(Request.class);
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Request request, ByteBuf byteBuf) throws Exception {
            byte[] body = serializer.serialize(request);
            int headLength = body.length;
            //相当于消息头
            byteBuf.writeInt(headLength);
            //相当于消息体
            byteBuf.writeBytes(body);
        }
    }

    KryoMsgDecoder

    package com.wk.test.nettyTest.kryo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    public class KryoMsgDecoder extends ByteToMessageDecoder {
    
        private Serializer serializer = KryoSerializerFactory.getSerializer(Request.class);
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            //标记读取的指针的位置
            byteBuf.markReaderIndex();
            //获取消息头,也就是长度
            int dataLength = byteBuf.readInt();
            if(dataLength <=0){
                //长度不对则当前消息有问题,关闭通道
                channelHandlerContext.close();
            }
            //长度小于真实长度则重新加载读取指针
            if(byteBuf.readableBytes() < dataLength){
                byteBuf.resetReaderIndex();
                return;
            }
            byte[] body = new byte[dataLength];
            byteBuf.readBytes(body);
            Request request = serializer.deserialize(body);
            list.add(request);
        }
    }

    服务端

    NettyKryoServer

    package com.wk.test.nettyTest.kryo;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyKryoServer {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyKryoServer.class);
    
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //设置日志
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new KryoMsgDecoder());
                            sc.pipeline().addLast(new KryoMsgEncoder());
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));
                            sc.pipeline().addLast(new KryoServerHandler());
                        }
                    });
    
            ChannelFuture cf = b.bind(8090).sync();
    
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }

    KryoServerHandler

    package com.wk.test.nettyTest.kryo;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class KryoServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request request = (Request)msg;
            System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo());
    
            ctx.writeAndFlush(request);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        
        
    }

    客户端

    NettyKryoClient

    package com.wk.test.nettyTest.kryo;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyKryoClient {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyKryoClient.class);
    
        private static class SingletonHolder {
            static final NettyKryoClient instance = new NettyKryoClient();
        }
    
        public static NettyKryoClient getInstance() {
            return SingletonHolder.instance;
        }
    
        private EventLoopGroup group;
        private Bootstrap b;
        private ChannelFuture cf;
    
        private NettyKryoClient() {
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new KryoMsgDecoder());
                            sc.pipeline().addLast(new KryoMsgEncoder());
                            //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));
                            sc.pipeline().addLast(new KryoClientHandler());
                        }
                    });
        }
    
        public void connect() {
            try {
                this.cf = b.connect("127.0.0.1", 8090).sync();
                System.out.println("远程服务器已经连接, 可以进行数据交换..");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public ChannelFuture getChannelFuture() {
    
            if (this.cf == null) {
                this.connect();
            }
            if (!this.cf.channel().isActive()) {
                this.connect();
            }
    
            return this.cf;
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            final NettyKryoClient c = NettyKryoClient.getInstance();
            ChannelFuture future = c.getChannelFuture();
            Request request = new Request();
            request.setId("1");
            request.setName("上杉绘梨衣");
            request.setInfo("04.24,和Sakura去东京天空树,世界上最暖和的地方在天空树的顶上。");
            future.channel().writeAndFlush(request).sync();
    
            Request request2 = new Request();
            request2.setId("2");
            request2.setName("上杉绘梨衣");
            request2.setInfo("04.26,和Sakura去明治神宫,有人在那里举办婚礼。");
            future.channel().writeAndFlush(request2);
    
            Request request3 = new Request();
            request3.setId("3");
            request3.setName("上杉绘梨衣");
            request3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。");
            future.channel().writeAndFlush(request3);
    
            Request request4 = new Request();
            request4.setId("4");
            request4.setName("上杉绘梨衣");
            request4.setInfo("Sakura最好了。");
            future.channel().writeAndFlush(request4);
    
            future.channel().closeFuture().sync();
        }
    }

    KryoClientHandler

    package com.wk.test.nettyTest.kryo;
    
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    public class KryoClientHandler extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Request resp = (Request)msg;
                System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getInfo());
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
        
    }
  • 相关阅读:
    ASP.NET中在一般处理程序中使用session的简单介绍
    oracle 11gR2安装图文教程
    ORACEL 创建表空间
    Echarts使用心得总结(二)
    SqlServer 2008无法远程连接到服务器
    浅谈HTTP中Get与Post的区别
    java设计模式-观察者模式学习
    读牛人博客有感
    mysql的with rollup
    java中的枚举enum
  • 原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12851501.html
Copyright © 2011-2022 走看看