netty: marshalling传递对象,传输附件GzipUtils
前端与服务端传输文件时,需要双方需要进行解压缩,也就是Java序列化。可以使用java进行对象序列化,netty去传输,但java序列化硬伤太多(无法跨语言,码流太大,性能太低),所以最好使用主流的编辑码框架来配合netty使用。此处使用的是JBossMarshalling框架。
用到的包:
<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>2.0.0.CR1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.0.CR1</version> </dependency>
用到的压缩包工具类:
gziputils.java
public class GzipUtils { public static byte[] gzip(byte[] data) throws Exception{ ByteArrayOutputStream bos = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(bos); gzip.write(data); gzip.finish(); gzip.close(); byte[] ret = bos.toByteArray(); bos.close(); return ret; } public static byte[] ungzip(byte[] data) throws Exception{ ByteArrayInputStream bis = new ByteArrayInputStream(data); GZIPInputStream gzip = new GZIPInputStream(bis); byte[] buf = new byte[1024]; int num = -1; ByteArrayOutputStream bos = new ByteArrayOutputStream(); while((num = gzip.read(buf, 0 , buf.length)) != -1 ){ bos.write(buf, 0, num); } gzip.close(); bis.close(); byte[] ret = bos.toByteArray(); bos.flush(); bos.close(); return ret; } public static void main(String[] args) throws Exception{ //读取文件 String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "Netty+3.1中文用户手册.doc.jpg"; File file = new File(readPath); FileInputStream in = new FileInputStream(file); byte[] data = new byte[in.available()]; in.read(data); in.close(); System.out.println("文件原始大小:" + data.length); //测试压缩 byte[] ret1 = GzipUtils.gzip(data); System.out.println("压缩之后大小:" + ret1.length); byte[] ret2 = GzipUtils.ungzip(ret1); System.out.println("还原之后大小:" + ret2.length); //写出文件 String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "Netty+3.1中文用户手册.doc.jpg"; FileOutputStream fos = new FileOutputStream(writePath); fos.write(ret2); fos.close(); } }
Request.java类
public class Request implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String id; private String name; private String requestMessage; private byte[] attachment; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } }
Response.java类
public class Response implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
MarshallingCodeCFactory.java
序列号编码解码类
public final class MarshallingCodeCFactory { /** * 解码器 * @return */ public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建MarshallingDecoder对象,两个参数分别为provider和消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024*1); return decoder; } /** * 编码器 * @return */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建MarshallingEncoder对象,参数为provider; MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
开始开发client,server功能
server.java
public class Server { public static void main(String[] args) throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub //设置编码解码 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); boss.shutdownGracefully(); worker.shutdownGracefully(); } }
serverHandler.java
需要继承ChannelHandlerAdapter类
public class ServerHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub //super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub //super.channelRead(ctx, msg); Request request = (Request) msg; System.out.println("Server: " + request.getId() + ","+request.getName()+","+request.getRequestMessage()); //接收附件 写入文件 byte[] attachment = GzipUtils.ungzip(request.getAttachment()); String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + request.getId() +".png"; FileOutputStream outputStream = new FileOutputStream(path); outputStream.write(attachment); outputStream.close(); //返回数据 Response response = new Response(); response.setId(request.getId()); response.setName("response: " + request.getName()); response.setResponseMessage("相应的内容: " + request.getRequestMessage()); ctx.writeAndFlush(response); } }
client.java类
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup worker = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub //设置编码解码 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); for(int i=0; i< 5; i++) { Request request = new Request(); request.setId(i + ""); request.setName( "pro"+ i); request.setRequestMessage("数据信息Client~Server:" + i); //发送附件 String path = System.getProperty("user.dir") + File.separatorChar + "resources" + File.separatorChar + "1.png"; File file = new File(path); FileInputStream inputStream = new FileInputStream(file); byte[] data = new byte[inputStream.available()]; inputStream.read(data); inputStream.close(); request.setAttachment(GzipUtils.gzip(data)); cf.channel().writeAndFlush(request); } System.out.println("user.dir: " + System.getProperty("user.dir") + File.separatorChar + "resources" + File.separatorChar + "1.png" ); cf.channel().closeFuture().sync(); worker.shutdownGracefully(); } }
ClientHandler.java类
需要继承ChannelHandlerAdapter类
public class ClientHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub //super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub //super.channelRead(ctx, msg); try { Response response = (Response) msg; System.out.println("Client : " + response.getId() + ","+response.getName()+","+response.getResponseMessage()); } finally { // TODO: handle finally clause ReferenceCountUtil.release(msg); } } }
目录如下: