zoukankan      html  css  js  c++  java
  • Java网关服务-AIO(三)

    Java网关服务-AIO(三)

    概述

    前两节中,我们已经获取了body的总长度,剩下的就是读出body,处理请求

    ChannelServerHandler

    ChannelServerHandler即从channel中读取请求,也向channle输出结果,因此它实现了InboundHandler, OutboundHandler

    /**
     * 读取请求的内容,业务处理
     */
    public class ChannelServerHandler implements CompletionHandler<Integer, ByteBuffer>, InboundHandler, OutboundHandler {
    
    	private final static Logger LOGGER = LoggerFactory.getLogger(ChannelServerHandler.class);
    
    	private AsynchronousSocketChannel channel;
    
    	public ChannelServerHandler(AsynchronousSocketChannel channel) {
    		this.channel = channel;
    	}
    
    	public void completed(Integer result, ByteBuffer attachment) {
    		//如果条件成立,说明客户端主动终止了TCP套接字,这时服务端终止就可以了
    		if (result == -1) {
    			System.out.println("remote is close");
    			closeChannel();
    			return;
    		}
    
    		Object resultData;
    		String req = (String) read(channel, attachment);
    		if (req == null) {
    			closeChannel();
    			return;
    		}
    
    		try {
    			LOGGER.info("socket:{}", channel.getRemoteAddress());
    
    			//同步处理请求
    			RequestHandler requestHandler = ApplicationUtils.getBean(RequestHandler.class);
    			resultData = requestHandler.execute(req);
    
    		} catch (Throwable t) {
    			resultData = Result.error("ERROR", Utils.error(t));
    			LOGGER.error("调用接口失败", t);
    		}
    
    		if (resultData == null) {
    			resultData = Result.failure("FAILURE", "调用失败,数据为空.");
    		}
    		try {
    			String resultContent =  resultData instanceOf String ? (String) resultData : JSON.toJSONString(resultData);
    			byte[] bytes = resultContent.getBytes("UTF-8");
    			ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    			writeBuffer.put(bytes);
    			writeBuffer.flip();
    
    			write(channel, writeBuffer);
    		} catch (Exception e) {
    			LOGGER.error("对象转JSON失败,对象:{}", resultData, e);
    		}
    
    		closeChannel();
    	}
    
    	@Override
    	public Object read(AsynchronousSocketChannel socketChannel, ByteBuffer in) {
    		in.flip();
    		byte[] body = new byte[in.remaining()];
    		in.get(body);
    
    		String req = null;
    		try {
    			req = new String(body, "UTF-8");
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    		return req;
    	}
    
    	@Override
    	public Object write(AsynchronousSocketChannel socketChannel, ByteBuffer out) {
    		//write,write操作结束后关闭通道
    		channel.write(out, out, new CompletionHandler<Integer, ByteBuffer>() {
    			@Override
    			public void completed(Integer result, ByteBuffer attachment) {
    				closeChannel();
    			}
    
    			@Override
    			public void failed(Throwable exc, ByteBuffer attachment) {
    				closeChannel();
    			}
    		});
    		return null;
    	}
    
    	public void failed(Throwable exc, ByteBuffer attachment) {
    		closeChannel();
    	}
    
    	private void closeChannel() {
    		try {
    			this.channel.close();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    

    读取body

    		in.flip();
    		byte[] body = new byte[in.remaining()];
    		in.get(body);
    
    		String req = null;
    		try {
    			req = new String(body, "UTF-8");
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    		return req;	
    
    in.remaining()

    buffer中含有的字节数

    客户端、服务端由于跨语言和经验问题,没有使用复杂的跨语言序列化技术,双方约定使用UTF-8编码,通过将body转换为String,最终获得了客户端传递的字符串。

    处理请求

    经过自定义的请求处理逻辑,同步处理,最终将响应编码后,发送给客户端,write操作结束后,关闭连接

    总结

    使用AIO开发服务端时,主要涉及

    • 配置I/O事件完成的回调线程池
    • 从accept -> read 到 向client端响应 write -> close,尽量使用CompletionHanlder来异步处理,不要在处理某个事件完成的线程中,同步的调用,如future.get()
    • 如果是短连接,则需在write操作时注册write结束后的handler,在handler中关闭连接

    扩展

    长连接该如何处理

    • 长连接意味着client可以发多次请求,由于多次请求被server执行的顺序是不可控的,可能后发的请求先响应,因此需要在请求和响应时,加requestId,据此对应到请求的结果
    • 长连接不需要在write后关闭连接
    • 长连接需要开发定时的ping-pong心跳消息
    • 长连接在响应时比现在更复杂,也需要一个和请求类似或相同的协议来标识body长度

    测试

    测试用例

    	/**
    	 * mvn -Dtest=com.jd.jshop.web.sdk.test.ClientTest#pingReqSocket test
    	 *
    	 * @throws IOException
    	 */
    	@Test
    	@PerfTest(invocations = 20000, threads = 50)
    	public void pingReqSocket() throws IOException {
    		
    		byte[] content = "ping".getBytes("UTF-8");
    		String result = sendReq(content);
    
    		//断言 是否和预期一致
    		Assert.assertEquals("pong", result);
    	}
    
    	private String sendReq(byte[] content) throws IOException {
    		ByteBuffer writeBuffer = ByteBuffer.allocate(4 + content.length);
    		writeBuffer.putInt(content.length);
    		writeBuffer.put(content);
    		writeBuffer.flip();
    
    
    		Socket socket = new Socket();
    		socket.connect(new InetSocketAddress("127.0.0.1", 9801));
    		socket.getOutputStream().write(writeBuffer.array());
    		socket.getOutputStream().flush();
    		byte[] buf = new byte[1024];
    		int len = 0;
    		String result = null;
    		while ((len = socket.getInputStream().read(buf)) != -1) {
    			result = new String(buf, 0, len);
    			System.out.println(result);
    		}
    		return result;
    	}
    

    测试的方法是,在服务器上建立socket连接,向server发送ping,server返回pong
    测试服务器:centos, 2个物理核,4个逻辑核,内存16G

    分析aio的实现:
    在ping-pong测试中性能极高,优于并甩开netty

    以下是使用Netty开发的server端的测试用例,可以和上面的图片对比一下

    Measured invocations:	10,000	
    Thread Count:	10	
    		 
    	Measured
    (system)	Required
    Execution time:	1,646 ms	
    Throughput:	6,075 / s	
    Min. latency:	0 ms	
    Average latency:	1 ms	
    Median:	2 ms	
    90%:	2 ms	
    Max latency:	26 ms	
    
    ============================
    
    
    Started at:	Oct 16, 2018 5:27:03 PM
    Measured invocations:	20,000	
    Thread Count:	20	
    		 
    	Measured
    (system)	Required
    Execution time:	3,293 ms	
    Throughput:	6,073 / s	
    Min. latency:	0 ms	
    Average latency:	3 ms	
    Median:	3 ms	
    90%:	5 ms	
    Max latency:	54 ms	
    
    ============================
    
    Started at:	Oct 16, 2018 5:28:24 PM
    Measured invocations:	20,000	
    Thread Count:	10	
    		 
    	Measured
    (system)	Required
    Execution time:	3,051 ms	
    Throughput:	6,555 / s	
    Min. latency:	0 ms	
    Average latency:	1 ms	
    Median:	1 ms	
    90%:	2 ms	
    Max latency:	44 ms	
    
    ============================
    
    Started at:	Oct 16, 2018 5:30:06 PM
    Measured invocations:	20,000	
    Thread Count:	50	
    		 
    	Measured
    (system)	Required
    Execution time:	3,167 ms	
    Throughput:	6,315 / s	
    Min. latency:	0 ms	
    Average latency:	7 ms	
    Median:	7 ms	
    90%:	10 ms	
    Max latency:	64 ms	
    

    分析基于Netty的实现:
    吞吐量:6000+/s
    10个线程时:90%低于2ms,平均1ms
    20个线程时:90%低于5ms,平均3ms
    50个线程时:90%低于10ms,平均7ms

    线程越多,性能越低

    当前测试用例不太依赖内存
    执行10000+次请求,建立10000+连接,要求服务器对单个进程fd限制打开,防止报too many open files导致测试用例执行失败

        ulimit -n 20240
  • 相关阅读:
    C#和JAVA的RSA密钥、公钥转换
    JAVA RSA私钥 加密(签名) 对应 C# RSA私钥 加密(签名)
    Senparc.Weixin.MP SDK 微信公众平台开发教程(九):自定义菜单接口说明
    微信资料
    OAuth2.0实战之微信授权篇
    URL中出现了%E2%80%8E(Zero-Width Space)
    AntDesign vue学习笔记(六)Table 显示图片
    NPOI导出 The maximum column width for an individual cell is 255 characters
    AntDesign vue学习笔记(五)导航菜单动态加载
    AntDesign vue学习笔记(四)使用组件切换
  • 原文地址:https://www.cnblogs.com/windliu/p/9804561.html
Copyright © 2011-2022 走看看