zoukankan      html  css  js  c++  java
  • netty 的 JBoss Marshalling 编码解码

    一. JBoss Marshalling 简介.

      JBoss Marshalling 是一个Java 对象序列化包,对 JDK 默认的序列化框架进行了优化,但又保持跟 Java.io.Serializable 接口的兼容,同时增加了一些可调的参数和附件的特性, 这些参数和附加的特性, 这些参数和特性可通过工厂类进行配置.

    二. JBoss Marshalling 的使用.

    1. 下载  org.jboss.marshalling

    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling-serial</artifactId>
        <version>2.0.0.Beta2</version>
    </dependency>

    2. 定义 POJO 对象,进行编解码.

    SubScriptReq
    package object.server.impl;
    
    import java.io.Serializable;
    
    public class SubScriptReq implements Serializable {
        /**
         * 
         */
        private static final long serialVersionUID = 4686274228090335845L;
        private Integer subReq;
        private String userName;
        private String productName;
        private String address;
    
        public Integer getSubReq() {
            return subReq;
        }
    
        public void setSubReq(Integer subReq) {
            this.subReq = subReq;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public void setUserName(String userName) {
            this.userName = userName;
        }
    
        public String getProductName() {
            return productName;
        }
    
        public void setProductName(String productName) {
            this.productName = productName;
        }
    
        public String getAddress() {
            return address;
        }
    
        public void setAddress(String address) {
            this.address = address;
        }
    
        @Override
        public String toString() {
            return "SubScriptReq [subReq=" + subReq + ", userName=" + userName
                    + ", productName=" + productName + ", address=" + address + "]";
        }
    
    }
    SubscriptResp
    package object.server.impl;
    
    import java.io.Serializable;
    
    public class SubscriptResp implements Serializable {
        /**
         * 
         */
        private static final long serialVersionUID = 4923081103118853877L;
        private Integer subScriptID;
        private String respCode;
        private String desc;
    
        public Integer getSubScriptID() {
            return subScriptID;
        }
    
        public void setSubScriptID(Integer subScriptID) {
            this.subScriptID = subScriptID;
        }
    
        public String getRespCode() {
            return respCode;
        }
    
        public void setRespCode(String respCode) {
            this.respCode = respCode;
        }
    
        public String getDesc() {
            return desc;
        }
    
        public void setDesc(String desc) {
            this.desc = desc;
        }
    
        @Override
        public String toString() {
            return "SubscriptResp [subScriptID=" + subScriptID + ", respCode="
                    + respCode + ", desc=" + desc + "]";
        }
    
    }

    3. Marshalling 构造工具

    package object.server.impl;
    
    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    public class MarshallingCodeCFactory {
        public static MarshallingDecoder buildMarshallingDecoder() {
            /*
             * 通过 Marshalling 工具类的 getProvidedMarshallerFactory
             * 静态方法获取MarshallerFactory 实例, , 参数 serial 表示创建的是 Java 序列化工厂对象.它是由
             * jboss-marshalling-serial 包提供
             */
            final MarshallerFactory marshallerFactory = Marshalling
                    .getProvidedMarshallerFactory("serial");
            /*
             * 创建
             */
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
    
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
                    marshallerFactory, configuration);
            /*
             * provider : 提供商 maxSize : 单个对象最大尺寸
             */
            int maxSize = 1024 << 2;
            MarshallingDecoder decoder = new MarshallingDecoder(provider, maxSize);
            return decoder;
        }
    
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling
                    .getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(
                    marshallerFactory, configuration);
            MarshallingEncoder decoder = new MarshallingEncoder(provider);
            return decoder;
        }
    
    }

    4. Netty 服务端代码:

    package object.server.impl;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    
    public class SubReqServer {
        public void start(int port) {
            NioEventLoopGroup workGroup = new NioEventLoopGroup();
            NioEventLoopGroup bossGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            // 配置 NioServerSocketChannel 的 tcp 参数, BACKLOG 的大小
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            MarshallingCodeCFactory.buildMarshallingDecoder());
                    ch.pipeline().addLast(
                            MarshallingCodeCFactory.buildMarshallingEncoder());
                    ch.pipeline().addLast(new SubReqHandler());
                }
            });
            // 绑定端口,随后调用它的同步阻塞方法 sync 等等绑定操作成功,完成之后 Netty 会返回一个 ChannelFuture
            // 它的功能类似于的 Future,主要用于异步操作的通知回调.
            ChannelFuture channelFuture;
            try {
                channelFuture = bootstrap.bind(port).sync();
                // 等待服务端监听端口关闭,调用 sync 方法进行阻塞,等待服务端链路关闭之后 main 函数才退出.
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            SubReqServer server = new SubReqServer();
            server.start(9091);
        }
    }

    serverHandler

    package object.server.impl;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class SubReqHandler extends ChannelHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
            super.exceptionCaught(ctx, cause);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
    
            System.out.println(msg);
            SubscriptResp sub = new SubscriptResp();
            sub.setDesc("desc");
            sub.setSubScriptID(999);
            sub.setRespCode("0");
            ctx.writeAndFlush(sub);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
    }

    5. client 

    package object.client.impl;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import object.server.impl.MarshallingCodeCFactory;
    
    public class SubReqClient {
        public void connect(String host, int port) {
            NioEventLoopGroup workGroup = new NioEventLoopGroup();
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            MarshallingCodeCFactory.buildMarshallingDecoder());
                    ch.pipeline().addLast(
                            MarshallingCodeCFactory.buildMarshallingEncoder());
                    ch.pipeline().addLast(new SubReqClientHandler());
                }
            });
    
            // 发起异步链接操作
            ChannelFuture future;
            try {
                future = bootstrap.connect(host, port).sync();
                // 等待客户端链路关闭
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                workGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            new SubReqClient().connect("localhost", 9091);
        }
    }

    clientHandler 

    package object.client.impl;
    
    import object.server.impl.SubScriptReq;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class SubReqClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            SubScriptReq req = new SubScriptReq();
            for (int i = 0; i < 100; i++) {
    
                req.setSubReq(999);
                req.setProductName("productName");
                req.setUserName("userName");
                req.setAddress("address");
                ctx.writeAndFlush(req);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
    }

    注 :  MarshallingDecoder 是自带半包处理的.

      

  • 相关阅读:
    CF280C Game on Tree 概率与期望
    bzoj 3420: Poi2013 Triumphal arch 树形dp+二分
    bzoj 2111: [ZJOI2010]Perm 排列计数 Lucas
    bzoj 3709: [PA2014]Bohater 贪心
    bzoj 1396/2865: 识别子串 后缀自动机+线段树
    【教程】如何搭建一个自己的网站
    C#单例设计模式
    C#双缓冲代码
    Hibernate的查询功能
    hibernate事务规范写法
  • 原文地址:https://www.cnblogs.com/mjorcen/p/4545344.html
Copyright © 2011-2022 走看看