zoukankan      html  css  js  c++  java
  • Netty编解码技术和UDP实现

    背景

    作为网络传输框架,免不了传输对象,对象在传输之前就要序列化,这个序列化的过程就是编码过程。接收到编码后的数据就需要解码,还原传输的数据。

    编解码技术就是java序列化技术,序列化的目的有两个,一是进行网络传输,二是对象持久化。

    但是Java的序列化缺点很多,如无法跨语言,序列化后码流太大,序列化性能太低

    主流的序列化框架:

      JBoss的Marshalling包

      google的Protobuf

      基于Protobuf的Kyro

      MessagePack框架

    JBoss Marshalling的实现

    代码示例:

    public class Server {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    // 设置日志
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            sc.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            ChannelFuture cf = b.bind(8765).sync();
    
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
    
        }
    }
    
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Req req = (Req) msg;
            System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
            byte[] attachment = GzipUtils.ungzip(req.getAttachment());
    
            String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg";
            FileOutputStream fos = new FileOutputStream(path);
            fos.write(attachment);
            fos.close();
    
            Resp resp = new Resp();
            resp.setId(req.getId());
            resp.setName("resp" + req.getId());
            resp.setResponseMessage("响应内容" + req.getId());
            ctx.writeAndFlush(resp);// .addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
    
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
    
            for (int i = 0; i < 5; i++) {
                Req req = new Req();
                req.setId("" + i);
                req.setName("pro" + i);
                req.setRequestMessage("数据信息" + i);
                String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar
                        + "001.jpg";
                File file = new File(path);
                FileInputStream in = new FileInputStream(file);
                byte[] data = new byte[in.available()];
                in.read(data);
                in.close();
                req.setAttachment(GzipUtils.gzip(data));
                cf.channel().writeAndFlush(req);
            }
    
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Resp resp = (Resp) msg;
                System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    
    
    /**
     * Marshalling工厂
     * 
     */
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * 
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            // 创建了MarshallingConfiguration对象,配置了版本号为5
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            // 根据marshallerFactory和configuration创建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
            return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * 
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    
    public class Req implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String id;
        private String name;
        private String requestMessage;
        private byte[] attachment;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getRequestMessage() {
            return requestMessage;
        }
    
        public void setRequestMessage(String requestMessage) {
            this.requestMessage = requestMessage;
        }
    
        public byte[] getAttachment() {
            return attachment;
        }
    
        public void setAttachment(byte[] attachment) {
            this.attachment = attachment;
        }
    }
    
    public class Resp implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String id;
        private String name;
        private String responseMessage;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getResponseMessage() {
            return responseMessage;
        }
    
        public void setResponseMessage(String responseMessage) {
            this.responseMessage = responseMessage;
        }
    
    }
    View Code

     工具类:

    public class GzipUtils {
    
        public static byte[] gzip(byte[] data) throws Exception {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            GZIPOutputStream gzip = new GZIPOutputStream(bos);
            gzip.write(data);
            gzip.finish();
            gzip.close();
            byte[] ret = bos.toByteArray();
            bos.close();
            return ret;
        }
    
        public static byte[] ungzip(byte[] data) throws Exception {
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            GZIPInputStream gzip = new GZIPInputStream(bis);
            byte[] buf = new byte[1024];
            int num = -1;
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            while ((num = gzip.read(buf, 0, buf.length)) != -1) {
                bos.write(buf, 0, num);
            }
            gzip.close();
            bis.close();
            byte[] ret = bos.toByteArray();
            bos.flush();
            bos.close();
            return ret;
        }
    
        public static void main(String[] args) throws Exception {
    
            // 读取文件
            String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar
                    + "006.jpg";
            File file = new File(readPath);
            FileInputStream in = new FileInputStream(file);
            byte[] data = new byte[in.available()];
            in.read(data);
            in.close();
    
            System.out.println("文件原始大小:" + data.length);
            // 测试压缩
    
            byte[] ret1 = GzipUtils.gzip(data);
            System.out.println("压缩之后大小:" + ret1.length);
    
            byte[] ret2 = GzipUtils.ungzip(ret1);
            System.out.println("还原之后大小:" + ret2.length);
    
            // 写出文件
            String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar
                    + "006.jpg";
            FileOutputStream fos = new FileOutputStream(writePath);
            fos.write(ret2);
            fos.close();
        }
    }
    View Code

     UDP的实现

    代码示例:

    public class Server {
        public void run(int port) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
                        .handler(new ServerHandler());
                b.bind(port).sync().channel().closeFuture().await();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new Server().run(8765);
            new Server().run(8764);
        }
    }
    
    
    public class ServerHandler extends
        SimpleChannelInboundHandler<DatagramPacket> {
    
        // 谚语列表
        private static final String[] DICTIONARY = { 
            "只要功夫深,铁棒磨成针。",
            "旧时王谢堂前燕,飞入寻常百姓家。", 
            "洛阳亲友如相问,一片冰心在玉壶。",
            "一寸光阴一寸金,寸金难买寸光阴。",
            "老骥伏枥,志在千里。烈士暮年,壮心不已!"
        };
    
        private String nextQuote() {
            int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);
            return DICTIONARY[quoteId];
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet)
            throws Exception {
            String req = packet.content().toString(CharsetUtil.UTF_8);
            System.out.println(req);
            if ("谚语字典查询?".equals(req)) {
                ctx.writeAndFlush(
                        new DatagramPacket(Unpooled.copiedBuffer("谚语查询结果: " + nextQuote(),
                        CharsetUtil.UTF_8), packet.sender()));
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
            ctx.close();
            cause.printStackTrace();
        }
    }
    
    
    public class Client {
    
        public void run(int port) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
                        .handler(new ClientHandler());
                Channel ch = b.bind(0).sync().channel();
                // 向网段内的所有机器广播UDP消息
                ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语字典查询?", CharsetUtil.UTF_8),
                        new InetSocketAddress("255.255.255.255", port))).sync();
                if (!ch.closeFuture().await(15000)) {
                    System.out.println("查询超时!");
                }
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new Client().run(8765);
        }
    }
    
    
    public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
            String response = msg.content().toString(CharsetUtil.UTF_8);
            if (response.startsWith("谚语查询结果: ")) {
                System.out.println(response);
                ctx.close();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    View Code
  • 相关阅读:
    跨浏览器OCX
    安装QT5.02
    Ubuntu登陆密码忘记
    QTableView
    VMware安装时Error 1324. The path My Documents contains a invalid character的原因和解决方法
    VS2005 与虚拟机的那点事
    创建掩码位图来实现透明绘图
    【转载】spring注解整理
    记录spring test类无法插入数据问题
    Uniapp 原生开发uniapp.arr 新老兼容问题
  • 原文地址:https://www.cnblogs.com/lostyears/p/8482158.html
Copyright © 2011-2022 走看看