zoukankan      html  css  js  c++  java
  • RPC基于http协议通过netty支持文件上传下载

    本人在中间件研发组(主要开发RPC),近期遇到一个需求:RPC基于http协议通过netty支持文件上传下载

    经过一系列的资料查找学习,终于实现了该功能

    通过netty实现文件上传下载,主要在编解码时处理,具体的代码如下:

    ①文件上传

     @Override
        public RPCRequest doDecodeRequest(FullHttpRequest request) {
            RPCRequest rpcRequest = new RPCRequest();
            String biz_prefix = BIZ_PREFIX;
            String rpc_prefix = RPC_PREFIX;
            if("true".equals(request.headers().get(RPCParamType.header_compatible.getName()))){
                biz_prefix = "";
                rpc_prefix = "";
            }
            for (Entry<String, String> entry : request.headers().entries()) {
                if(entry.getKey().startsWith(rpc_prefix)){
                    rpcRequest.addRpcHeader(entry.getKey().substring(rpc_prefix.length()), entry.getValue());
                }
                if(entry.getKey().startsWith(biz_prefix)){
                    rpcRequest.addBizHeader(entry.getKey().substring(biz_prefix.length()), entry.getValue());
                }
            }
            QueryStringDecoder decoderQuery = new QueryStringDecoder(request.uri());
            // serviceId
            String serviceId = decoderQuery.path();
            if (serviceId.startsWith(RPCConstant.PATH_SEPARATOR)) {
                serviceId = serviceId.substring(1);
            }
            if (serviceId.endsWith(RPCConstant.PATH_SEPARATOR)) {
                serviceId = serviceId.substring(0, serviceId.length() - 1);
            }
            rpcRequest.addRpcHeader(RPCParamType.service_id.getName(), serviceId);
    
            // requestId
            String requestId = request.headers().get(rpc_prefix + RPCConstant.RequestId);
            if (null == requestId || "".equals(requestId)) {
                requestId = "0";
            }
            rpcRequest.setId(Long.valueOf(requestId));
            // add by wangzp in 2017/3/30
            if (HttpMethod.GET == request.method()) {
                rpcRequest.addBizHeader("HTTP_METHOD", "GET");
                Map<String, String> map = new HashMap<String, String>();
                for (Map.Entry<String, List<String>> paramsEntry : decoderQuery.parameters().entrySet()) {
                    map.put(paramsEntry.getKey(), paramsEntry.getValue().get(0));
                    rpcRequest.getBizHeader().put(paramsEntry.getKey(), paramsEntry.getValue().get(0));
                }
                //TODO 需要引入序列化机制,否则data部分找不到相匹配的序列化
                //TODO 暂时用json
                rpcRequest.setData(JSON.toJSONString(map));
            } else {
            	try {
            		decoder = new HttpPostRequestDecoder(factory, request);
            		readingChunks = HttpUtil.isTransferEncodingChunked(request); 
            		if(decoder != null && readingChunks){
            			HttpContent chunk = (HttpContent) request; 
            			  try{  
                              decoder.offer(chunk);  
                          } catch (ErrorDataDecoderException e) {  
                        	  decoder.destroy();
                          }  
            		}
            		File file = readHttpDataChunkByChunk(); //从解码器decoder中读出数据
    				rpcRequest.addBizHeader(RPCParamType.upload_file.getName(), file.getPath());
    			} catch (Exception e) {
    				decoder.destroy();
    			}
            }
            return rpcRequest;
        }
    

     ②文件下载

     
        @Override
        public FullHttpResponse doEncodeResponse(RPCResponse response) {
        	FullHttpResponse httpResponse ;
        	String biz_prefix = BIZ_PREFIX;
    	    String rpc_prefix = RPC_PREFIX;
            if(response.getBizHeader(RPCParamType.header_compatible.getName()) == null ? RPCParamType.header_compatible.getBooleanValue()
                    : Boolean.parseBoolean(response.getBizHeader(RPCParamType.header_compatible.getName()))){
                biz_prefix = "";
                rpc_prefix = "";
            }
            String downloadFile = response.getBizHeader(RPCParamType.download_file.getName());
            if(!StringUtils.isEmpty(downloadFile)){
               	File file = new File(downloadFile);
        		String fileName = file.getName();
    		 	ByteBuf buffer;
    			try {
    				buffer = Unpooled.copiedBuffer(Files.readAllBytes(file.toPath()));
    				httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,buffer);
    			} catch (IOException e) {
    				throw new RuntimeException(fileName + "is not exist");
    			}
    			httpResponse.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream; charset=utf-8");
    		    try {
    				httpResponse.headers().add(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename="" + new String(fileName.getBytes("gb2312"),"ISO-8859-1") + """);
    			} catch (UnsupportedEncodingException e) {
    				throw new RuntimeException(fileName + "download fail");
    			}
        	}else{
        		//serialize
    	        byte[] body = (byte[]) SerializerUtils.serialize(getProtocolUrl().getParameter(RPCParamType.serializer_name.getName(), RPCParamType.serializer_name.getValue()), response.getData());
    	
    	        String reasonPhrase = response.getBizHeader(RPCParamType.http_reasonPhrase.getName());
    	        response.getBizHeader().remove(RPCParamType.http_reasonPhrase.getName());    //reasonPhrase不需要放入RPCResponse中传输
    	        httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
    	                new HttpResponseStatus(Integer.valueOf(response.getBizHeader(RPCConstant.Code) == null ? response.getRpcHeader(RPCConstant.Code) : response.getBizHeader(RPCConstant.Code)),
    	                        reasonPhrase == null ? RPCParamType.http_reasonPhrase.getValue() : reasonPhrase),
    	                Unpooled.wrappedBuffer(body));
        	}
    	 	for (Entry<String, String> entry : response.getBizHeader().entrySet()) {
                httpResponse.headers().add(biz_prefix + entry.getKey(), entry.getValue());
            }
            for (Entry<String, String> entry : response.getRpcHeader().entrySet()) {
                httpResponse.headers().add(rpc_prefix + entry.getKey(), entry.getValue());
            }
            httpResponse.headers().add(rpc_prefix + RPCConstant.RequestId, response.getRequestId());
            httpResponse.headers().add(CLZNAME, response.getClzName());
            httpResponse.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            httpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes());
        	return httpResponse;
        }
    

     ③其他辅助方法

    public class HTTPExCodec extends HTTPCodec {
    
        private boolean readingChunks;  
        private static final HttpDataFactory factory =  
                new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE) ; 
      
        private HttpPostRequestDecoder decoder; 
        
        //此处包含上面两个方法
        。。。。
        private File readHttpDataChunkByChunk() {  
            try {  
                while (decoder.hasNext()) {  
                    InterfaceHttpData data = decoder.next();  
                    if (data != null) {  
                        try {  
                            File file = writeHttpData(data);  
                            return file;
                        } finally {  
                            data.release();  
                        }  
                    }  
                }  
            } catch (EndOfDataDecoderException e1) {  
            	throw new RuntimeException("write file error");  
            }
            return null;
        }  
          
        private File writeHttpData(InterfaceHttpData data) {  
            String uploadFileName = getUploadFileName(data);  
            FileUpload fileUpload = (FileUpload) data;  
            if (fileUpload.isCompleted()) {  
                File dir = new File(System.getProperty("user.dir") + File.separator);  
                if (!dir.exists()) {  
                    dir.mkdir();  
                }  
                File dest = new File(dir, uploadFileName);  
                try {  
                    fileUpload.renameTo(dest);  
                    return dest;
                } catch (IOException e) {  
                    throw new RuntimeException("write file error");  
                }  
            } 
            return null;
        }  
          
        private String getUploadFileName(InterfaceHttpData data) {  
            String content = data.toString();  
            String temp = content.substring(0, content.indexOf("
    "));  
            content = temp.substring(temp.lastIndexOf("=") + 2, temp.lastIndexOf("""));  
            return content;  
        }  
    }
      
    
  • 相关阅读:
    java Spring 基于注解的配置(一)
    java Spring 在WEB应用中的实例化
    java Spring 生命周期
    java Spring bean作用域
    java Spring集合
    java Spring配置数据单元
    Velocity 入门(一)
    java strtus2 DynamicMethodInvocation配置(二)
    Android WindowManager 小结
    Android 快速选择联系人
  • 原文地址:https://www.cnblogs.com/cowboys/p/7605745.html
Copyright © 2011-2022 走看看