zoukankan      html  css  js  c++  java
  • Java网关服务-AIO(三)

    Java网关服务-AIO(三)

    概述

    前两节中,我们已经获取了body的总长度,剩下的就是读出body,处理请求

    ChannelServerHandler

    ChannelServerHandler即从channel中读取请求,也向channle输出结果,因此它实现了InboundHandler, OutboundHandler

    /**
     * 读取请求的内容,业务处理
     */
    public class ChannelServerHandler implements CompletionHandler<Integer, ByteBuffer>, InboundHandler, OutboundHandler {
    
    	private final static Logger LOGGER = LoggerFactory.getLogger(ChannelServerHandler.class);
    
    	private AsynchronousSocketChannel channel;
    
    	public ChannelServerHandler(AsynchronousSocketChannel channel) {
    		this.channel = channel;
    	}
    
    	public void completed(Integer result, ByteBuffer attachment) {
    		//如果条件成立,说明客户端主动终止了TCP套接字,这时服务端终止就可以了
    		if (result == -1) {
    			System.out.println("remote is close");
    			closeChannel();
    			return;
    		}
    
    		Object resultData;
    		String req = (String) read(channel, attachment);
    		if (req == null) {
    			closeChannel();
    			return;
    		}
    
    		try {
    			LOGGER.info("socket:{}", channel.getRemoteAddress());
    
    			//同步处理请求
    			RequestHandler requestHandler = ApplicationUtils.getBean(RequestHandler.class);
    			resultData = requestHandler.execute(req);
    
    		} catch (Throwable t) {
    			resultData = Result.error("ERROR", Utils.error(t));
    			LOGGER.error("调用接口失败", t);
    		}
    
    		if (resultData == null) {
    			resultData = Result.failure("FAILURE", "调用失败,数据为空.");
    		}
    		try {
    			String resultContent =  resultData instanceOf String ? (String) resultData : JSON.toJSONString(resultData);
    			byte[] bytes = resultContent.getBytes("UTF-8");
    			ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    			writeBuffer.put(bytes);
    			writeBuffer.flip();
    
    			write(channel, writeBuffer);
    		} catch (Exception e) {
    			LOGGER.error("对象转JSON失败,对象:{}", resultData, e);
    		}
    
    		closeChannel();
    	}
    
    	@Override
    	public Object read(AsynchronousSocketChannel socketChannel, ByteBuffer in) {
    		in.flip();
    		byte[] body = new byte[in.remaining()];
    		in.get(body);
    
    		String req = null;
    		try {
    			req = new String(body, "UTF-8");
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    		return req;
    	}
    
    	@Override
    	public Object write(AsynchronousSocketChannel socketChannel, ByteBuffer out) {
    		//write,write操作结束后关闭通道
    		channel.write(out, out, new CompletionHandler<Integer, ByteBuffer>() {
    			@Override
    			public void completed(Integer result, ByteBuffer attachment) {
    				closeChannel();
    			}
    
    			@Override
    			public void failed(Throwable exc, ByteBuffer attachment) {
    				closeChannel();
    			}
    		});
    		return null;
    	}
    
    	public void failed(Throwable exc, ByteBuffer attachment) {
    		closeChannel();
    	}
    
    	private void closeChannel() {
    		try {
    			this.channel.close();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    

    读取body

    		in.flip();
    		byte[] body = new byte[in.remaining()];
    		in.get(body);
    
    		String req = null;
    		try {
    			req = new String(body, "UTF-8");
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    		return req;	
    
    in.remaining()

    buffer中含有的字节数

    客户端、服务端由于跨语言和经验问题,没有使用复杂的跨语言序列化技术,双方约定使用UTF-8编码,通过将body转换为String,最终获得了客户端传递的字符串。

    处理请求

    经过自定义的请求处理逻辑,同步处理,最终将响应编码后,发送给客户端,write操作结束后,关闭连接

    总结

    使用AIO开发服务端时,主要涉及

    • 配置I/O事件完成的回调线程池
    • 从accept -> read 到 向client端响应 write -> close,尽量使用CompletionHanlder来异步处理,不要在处理某个事件完成的线程中,同步的调用,如future.get()
    • 如果是短连接,则需在write操作时注册write结束后的handler,在handler中关闭连接

    扩展

    长连接该如何处理

    • 长连接意味着client可以发多次请求,由于多次请求被server执行的顺序是不可控的,可能后发的请求先响应,因此需要在请求和响应时,加requestId,据此对应到请求的结果
    • 长连接不需要在write后关闭连接
    • 长连接需要开发定时的ping-pong心跳消息
    • 长连接在响应时比现在更复杂,也需要一个和请求类似或相同的协议来标识body长度

    测试

    测试用例

    	/**
    	 * mvn -Dtest=com.jd.jshop.web.sdk.test.ClientTest#pingReqSocket test
    	 *
    	 * @throws IOException
    	 */
    	@Test
    	@PerfTest(invocations = 20000, threads = 50)
    	public void pingReqSocket() throws IOException {
    		
    		byte[] content = "ping".getBytes("UTF-8");
    		String result = sendReq(content);
    
    		//断言 是否和预期一致
    		Assert.assertEquals("pong", result);
    	}
    
    	private String sendReq(byte[] content) throws IOException {
    		ByteBuffer writeBuffer = ByteBuffer.allocate(4 + content.length);
    		writeBuffer.putInt(content.length);
    		writeBuffer.put(content);
    		writeBuffer.flip();
    
    
    		Socket socket = new Socket();
    		socket.connect(new InetSocketAddress("127.0.0.1", 9801));
    		socket.getOutputStream().write(writeBuffer.array());
    		socket.getOutputStream().flush();
    		byte[] buf = new byte[1024];
    		int len = 0;
    		String result = null;
    		while ((len = socket.getInputStream().read(buf)) != -1) {
    			result = new String(buf, 0, len);
    			System.out.println(result);
    		}
    		return result;
    	}
    

    测试的方法是,在服务器上建立socket连接,向server发送ping,server返回pong
    测试服务器:centos, 2个物理核,4个逻辑核,内存16G

    分析aio的实现:
    在ping-pong测试中性能极高,优于并甩开netty

    以下是使用Netty开发的server端的测试用例,可以和上面的图片对比一下

    Measured invocations:	10,000	
    Thread Count:	10	
    		 
    	Measured
    (system)	Required
    Execution time:	1,646 ms	
    Throughput:	6,075 / s	
    Min. latency:	0 ms	
    Average latency:	1 ms	
    Median:	2 ms	
    90%:	2 ms	
    Max latency:	26 ms	
    
    ============================
    
    
    Started at:	Oct 16, 2018 5:27:03 PM
    Measured invocations:	20,000	
    Thread Count:	20	
    		 
    	Measured
    (system)	Required
    Execution time:	3,293 ms	
    Throughput:	6,073 / s	
    Min. latency:	0 ms	
    Average latency:	3 ms	
    Median:	3 ms	
    90%:	5 ms	
    Max latency:	54 ms	
    
    ============================
    
    Started at:	Oct 16, 2018 5:28:24 PM
    Measured invocations:	20,000	
    Thread Count:	10	
    		 
    	Measured
    (system)	Required
    Execution time:	3,051 ms	
    Throughput:	6,555 / s	
    Min. latency:	0 ms	
    Average latency:	1 ms	
    Median:	1 ms	
    90%:	2 ms	
    Max latency:	44 ms	
    
    ============================
    
    Started at:	Oct 16, 2018 5:30:06 PM
    Measured invocations:	20,000	
    Thread Count:	50	
    		 
    	Measured
    (system)	Required
    Execution time:	3,167 ms	
    Throughput:	6,315 / s	
    Min. latency:	0 ms	
    Average latency:	7 ms	
    Median:	7 ms	
    90%:	10 ms	
    Max latency:	64 ms	
    

    分析基于Netty的实现:
    吞吐量:6000+/s
    10个线程时:90%低于2ms,平均1ms
    20个线程时:90%低于5ms,平均3ms
    50个线程时:90%低于10ms,平均7ms

    线程越多,性能越低

    当前测试用例不太依赖内存
    执行10000+次请求,建立10000+连接,要求服务器对单个进程fd限制打开,防止报too many open files导致测试用例执行失败

        ulimit -n 20240
  • 相关阅读:
    Android开发 ViewConfiguration View的配置信息类
    Android 开发 倒计时功能 转载
    Android 开发 关于7.0 FileUriExposedException异常 详解
    Android 开发 实现文本搜索功能
    Android 开发 Activity里获取View的宽度和高度 转载
    Android 开发 存储目录的详解
    Android 开发 Fresco框架点击小图显示全屏大图实现 ZoomableDraweeView
    Android 开发 将window变暗
    Android 开发 DisplayMetrics获取Android设备的屏幕高宽与其他信息
    Android 开发 DP、PX、SP转换详解
  • 原文地址:https://www.cnblogs.com/windliu/p/9804561.html
Copyright © 2011-2022 走看看