zoukankan      html  css  js  c++  java
  • Netty5+Jboss(Marshalling)完成对象序列化传输

      TCP在网络通讯的时候,通常在解决TCP粘包、拆包问题的时候,一般会用以下几种方式:

      1、 消息定长 例如每个报文的大小固定为200个字节,如果不够,空位补空格;

      2、 在消息尾部添加特殊字符进行分割,如添加回车;

      3、 将消息分为消息体和消息头,在消息头里面包含表示消息长度的字段,然后进行业务逻辑的处理。

      在Netty中我们主要利用对象的序列化进行对象的传输,虽然Java本身的序列化也能完成,但是Java序列化有很多问题,如后字节码流太大,以及序列化程度太低等。Jboss的序列化有程度较高、序列化后码流较小。这里利用Jboss的Marshalling测试一个简单的对象序列化。

      新建Maven工程,引入Netty5和Jboss的Marshalling。

      注:这里的Marshalling的版本,如果版本太低,可能会出现消息发送失败的问题。我在测试的时候起先用的是1.3.9,结果就是消息发送失败,打印异常信息发现是空指针的问题。

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>5.0.0.Alpha2</version>
            </dependency>
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling-serial</artifactId>
                <version>2.0.0.Beta2</version>
            </dependency>        

      1、服务端

    package com.netty.parry.ende4;
    
    import com.netty.parry.ende3.MarshallingCodeCFactory;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Server {
    
        public void start(int port) throws Exception {
            // 配置NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            try {
                // 服务器辅助启动类配置
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChildChannelHandler());
                // 绑定端口 同步等待绑定成功
                ChannelFuture f = b.bind(port).sync(); 
                // 等到服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅释放线程资源
                workGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
        /**
         * 网络事件处理器
         */
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 添加Jboss的序列化,编解码工具
                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                // 处理网络IO
                ch.pipeline().addLast(new ServerHandler());
            }
        }
    
        public static void main(String[] args) throws Exception {
            new Server().start(8765);
        }
    }

      2、服务端IO处理类

    package com.netty.parry.ende4;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ServerHandler extends ChannelHandlerAdapter {
        
        // 用于获取客户端发送的信息
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 用于获取客户端发来的数据信息
            Message body = (Message) msg;
            System.out.println("Server接受的客户端的信息 :" + body.toString());
    
            // 写数据给客户端
            Message response = new Message("欢迎您,与服务端连接成功");
            // 当服务端完成写操作后,关闭与客户端的连接
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

      3、客户端

    package com.netty.parry.ende4;
    
    import com.netty.parry.ende3.MarshallingCodeCFactory;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class Client {
        /**
         * 连接服务器
         * 
         * @param port
         * @param host
         * @throws Exception
         */
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                // 客户端辅助启动类 对客户端配置
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new MyChannelHandler());
                // 异步链接服务器 同步等待链接成功
                ChannelFuture f = b.connect(host, port).sync();
                // 等待链接关闭
                f.channel().closeFuture().sync();
    
            } finally {
                group.shutdownGracefully();
                System.out.println("客户端优雅的释放了线程资源...");
            }
    
        }
    
        /**
         * 网络事件处理器
         */
        private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                System.out.println("MyChannelHandler");
                // 添加Jboss的序列化,编解码工具
                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                // 处理网络IO
                ch.pipeline().addLast(new ClientHandler());
            }
        }
    
        public static void main(String[] args) throws Exception {
            new Client().connect(8765, "127.0.0.1");
        }
    }

      4、客户端IO处理类

    package com.netty.parry.ende4;
    
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
        // 客户端与服务端,连接成功的售后
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 发送消息
            Message request1 = new Message("666");
            ctx.writeAndFlush(request1).addListener(new ChannelFutureListener() {
                
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("成功发送到服务端消息");
                    } else {
                        System.out.println("失败服务端消息失败:"+future.cause().getMessage());
                        future.cause().printStackTrace();
                    }
                }
            });
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Message response = (Message) msg;
                System.out.println(response);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

      5、消息体

    package com.netty.parry.ende4;
    
    import java.io.Serializable;
    
    public class Message implements Serializable{
    
        /**
         * 
         */
        private static final long serialVersionUID = -5296315429304117678L;
        
        private String body;
    
        public String getBody() {
            return body;
        }
    
        public void setBody(String body) {
            this.body = body;
        }
    
        public Message(String body) {
            super();
            this.body = body;
        }
    
        public Message() {
            super();
        }
    
        @Override
        public String toString() {
            return "Message [body=" + body + "]";
        }
    }

      

  • 相关阅读:
    MySQL varchar类型数据转tinyint类型
    Spring Boot实战笔记(一)-- Spring简介
    Maven学习(八)-- 使用Nexus搭建Maven私服
    Maven学习(七)-- 使用Maven构建多模块项目
    浮点型 float和double类型的内存结构和精度问题
    Maven学习(六)-- Maven与Eclipse整合
    Maven学习(五)-- 聚合与继承
    关于inline函数
    《The Cg Tutorial》阅读笔记——环境贴图 Environment Mapping
    记录最近工作中遇到的一些坑
  • 原文地址:https://www.cnblogs.com/parryyang/p/8064305.html
Copyright © 2011-2022 走看看