zoukankan      html  css  js  c++  java
  • netty: 编解码之jboss marshalling, 用marshalling进行对象传输

    jboss marshalling是jboss内部的一个序列化框架,速度也十分快,这里netty也提供了支持,使用十分方便。

    TCP在网络通讯的时候,通常在解决TCP粘包、拆包问题的时候,一般会用以下几种方式:

      1、 消息定长 例如每个报文的大小固定为200个字节,如果不够,空位补空格;

      2、 在消息尾部添加特殊字符进行分割,如添加回车;

      3、 将消息分为消息体和消息头,在消息头里面包含表示消息长度的字段,然后进行业务逻辑的处理。

      在Netty中我们主要利用对象的序列化进行对象的传输,虽然Java本身的序列化也能完成,但是Java序列化有很多问题,如后字节码流太大,以及序列化程度太低等。Jboss的序列化有程度较高、序列化后码流较小。这里利用Jboss的Marshalling测试一个简单的对象序列化。

    引入marshalling

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    	<dependency>
    	    <groupId>io.netty</groupId>
    	    <artifactId>netty-all</artifactId>
    	    <version>5.0.0.Alpha2</version>
    	</dependency>
    	
    	<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
    	<dependency>
    	    <groupId>org.jboss.marshalling</groupId>
    	    <artifactId>jboss-marshalling</artifactId>
    	    <version>2.0.0.CR1</version>
    	</dependency>
        
        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
    	<dependency>
    	    <groupId>org.jboss.marshalling</groupId>
    	    <artifactId>jboss-marshalling-serial</artifactId>
    	    <version>2.0.0.CR1</version>	    
    	</dependency>
    

      

    server:

    public class Server {
    
    	public static void main(String[] args) throws InterruptedException {
    		//1.第一个线程组是用于接收Client端连接的  
            EventLoopGroup bossGroup = new NioEventLoopGroup();   
            //2.第二个线程组是用于实际的业务处理的  
            EventLoopGroup workerGroup = new NioEventLoopGroup();  
            ServerBootstrap b = new ServerBootstrap();  
            b.group(bossGroup, workerGroup);//绑定两个线程池  
            b.channel(NioServerSocketChannel.class);//指定NIO的模式,如果是客户端就是NioSocketChannel  
    //        b.option(ChannelOption.SO_BACKLOG, 1024);//TCP的缓冲区设置  
    //        b.option(ChannelOption.SO_SNDBUF, 32*1024);//设置发送缓冲的大小  
    //        b.option(ChannelOption.SO_RCVBUF, 32*1024);//设置接收缓冲区大小  
    //        b.option(ChannelOption.SO_KEEPALIVE, true);//保持连续  
            b.childHandler(new ChannelInitializer<SocketChannel>() {  
                protected void initChannel(SocketChannel ch) throws Exception {
                	//设置Marshalling的编码和解码
                    ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    ch.pipeline().addLast(new ServerHandler());
                }
            });
            ChannelFuture future = b.bind(8765).sync();//绑定端口  
            future.channel().closeFuture().sync();//等待关闭(程序阻塞在这里等待客户端请求)  
            bossGroup.shutdownGracefully();//关闭线程  
            workerGroup.shutdownGracefully();//关闭线程 
    
    	}
    }
    

      

    ServerHandler

    public class ServerHandler extends ChannelHandlerAdapter{
    
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
        }
     
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
        	Send send = (Send) msg;
        	System.out.println("client发送:"+send);
        	
        	Receive receive = new Receive();
        	receive.setId(send.getId());
        	receive.setMessage(send.getMessage());
        	receive.setName(send.getName());
        	ctx.writeAndFlush(receive);
        }
    	
    }
    

      

    client

    public class Client {
    
    	public static void main(String[] args) throws InterruptedException {
    		EventLoopGroup worker = new NioEventLoopGroup();
    		Bootstrap b = new Bootstrap();
    		b.group(worker)
    		.channel(NioSocketChannel.class)
    		.handler(new ChannelInitializer<SocketChannel>() {
    			@Override
    			protected void initChannel(SocketChannel sc) throws Exception {
    				//ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); 
    				//sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
    				//sc.pipeline().addLast(new StringDecoder());
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    				sc.pipeline().addLast(new ClientHandler());
    			}
    		});
    		ChannelFuture f=b.connect("127.0.0.1",8765).sync();
    		for(int i=1;i<=5;i++){
    			Send send = new Send();
    	        send.setId(i);
    	        send.setMessage("message"+i);
    	        send.setName("name"+i);
    	        f.channel().writeAndFlush(send);
    		}
    		f.channel().closeFuture().sync();
    		worker.shutdownGracefully();
    
    	}
    }
    

      

    clientHandler

    public class ClientHandler extends ChannelHandlerAdapter{
    	
    	@Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
        	Receive receive = (Receive) msg;
            System.out.println("server反馈:"+receive);
    
        }
    	
    }
    

      

    send

    public class Send implements Serializable {
    	 
    	/**
    	 * serialVersionUID:TODO(用一句话描述这个变量表示什么)
    	 * 
    	 * @since 1.0.0
    	 */
     
    	private static final long serialVersionUID = 1L;
     
    	private Integer id;
    	private String name;
    	private String message;
     
    	public Integer getId() {
    		return id;
    	}
     
    	public void setId(Integer id) {
    		this.id = id;
    	}
     
    	public String getName() {
    		return name;
    	}
     
    	public void setName(String name) {
    		this.name = name;
    	}
     
    	public String getMessage() {
    		return message;
    	}
     
    	public void setMessage(String message) {
    		this.message = message;
    	}
     
    	@Override
    	public String toString() {
    		return "Send [id=" + id + ", name=" + name + ", message=" + message + "]";
    	}
     
    }
    

      

    receive

    public class Receive implements Serializable{
     
    	/**
    	 * serialVersionUID:TODO(用一句话描述这个变量表示什么)
    	 * @since 1.0.0
    	 */
    	
    	private static final long serialVersionUID = 1L;
    	private Integer id;
    	private String name;
    	private String message;
    	private byte[] sss;
    	
    	public byte[] getSss() {
    		return sss;
    	}
    	public void setSss(byte[] sss) {
    		this.sss = sss;
    	}
    	public Integer getId() {
    		return id;
    	}
    	public void setId(Integer id) {
    		this.id = id;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public String getMessage() {
    		return message;
    	}
    	public void setMessage(String message) {
    		this.message = message;
    	}
    	@Override
    	public String toString() {
    		return "Receive [id=" + id + ", name=" + name + ", message=" + message + ", sss=" + Arrays.toString(sss) + "]";
    	}
    	
    }
    

      

    marshalling工厂类

    public final class MarshallingCodeCFactory {
    
    	 /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
        	//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		//创建了MarshallingConfiguration对象,配置了版本号为5 
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		//根据marshallerFactory和configuration创建provider
    		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
    		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
    		return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
    		MarshallingEncoder encoder = new MarshallingEncoder(provider);
    		return encoder;
        }
    }
    

      

    运行结果

    server反馈:Receive [id=1, name=name1, message=message1, sss=null]
    server反馈:Receive [id=2, name=name2, message=message2, sss=null]
    server反馈:Receive [id=3, name=name3, message=message3, sss=null]
    server反馈:Receive [id=4, name=name4, message=message4, sss=null]
    server反馈:Receive [id=5, name=name5, message=message5, sss=null]
    

      

  • 相关阅读:
    ListView之setEmptyView的问题
    AdapterView的相关知识。
    分享一个程序猿的真实的爱情故事
    C#中的explicit和implicit了解一下吧
    通俗易懂,什么是.NET Core以及.NET Core能做什么
    目前下载VS2017你可能会遇到这个坑
    .NET Core中使用Dapper操作Oracle存储过程最佳实践
    分享一个.NET平台开源免费跨平台的大数据分析框架.NET for Apache Spark
    从ASP.NET Core2.2到3.0你可能会遇到这些问题
    What?VS2019创建新项目居然没有.NET Core3.0的模板?Bug?
  • 原文地址:https://www.cnblogs.com/achengmu/p/10945547.html
Copyright © 2011-2022 走看看