zoukankan      html  css  js  c++  java
  • NIO框架之MINA源代码解析(一):背景

    

    “你们的agent占了好多系统的port。把我们的非常多业务系统都给整死了,给我们造成了非常大的损失。要求你们的相关领导下周过来道歉”   --   来自我们的一个客户。


     怎么可能呢,我们都不相信,我们的agent仅仅占一个port啊!


    事实胜过雄辩。经过查证。确实是因为我们的agent占了好多系统的port。我看了一下日志。基本把系统可用的port占完了!




    为什么呢?MINA框架私自开的!


    因为我们的agent端使用了NIO通信框架MINA,但并没有使用好,造成了这一差点儿毁灭行的灾难。


    还是先看代码吧。

    /**
     * 异步发送消息
     * @param agent
     * @param request
     */
    public void sendMessageToAgent(Agent agent, HyRequest request) {
    	IoSession session = null;
    	IoConnector connector=null;
    	long startTime = System.currentTimeMillis();
    	try {
    		// 创建一个非堵塞的客户端程序
    		 connector = new NioSocketConnector();
    		// 设置链接超时时间
    		connector.setConnectTimeoutMillis(connectTimeoutMillis);
    
    		ObjectSerializationCodecFactory objsCodec = new ObjectSerializationCodecFactory();
    		objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER);
    		objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER);
    		ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(
    				objsCodec);
    		// 数据转换。编码设置
    		connector.getFilterChain()
    				.addLast("codec", codecFilter);
    		// 消息
    		connector.setHandler(clientHandler);
    		
    		SocketAddress socketAddress = new InetSocketAddress(
    				agent.getIpAddr(), agent.getAgentPort());
    		ConnectFuture future = connector.connect(socketAddress);
    		future.awaitUninterruptibly();
    		session = future.getSession();
    		String json = mapper.writeValueAsString(request);
    		session.write(json);
    		
    		long endTime = System.currentTimeMillis();
    		
    		logerr.debug("send-time:" + (endTime - startTime));
    		
    	} catch (Exception e) {
    		logerr.error("host:" + agent.getIpAddr() + ", AgentPORT:" + agent.getAgentPort()
    				+ ", 连接异常..."+e.getMessage());
    		clientHandler.handlerConnectError(agent, request);
    		
    	}
    }


    public class MinaClientHandler extends IoHandlerAdapter {
    	// 日志
    	private Logger log = Logger.getLogger(getClass());
    	
    	private MinaResponseProcesser minaResponseProcesser;
    	
    	ObjectMapper mapper=null;
    	
    	@Override
    	public void messageReceived(IoSession session, Object message)
    			throws Exception {
    		String msg = message.toString();
    		log.info("receive message from " + session.getRemoteAddress().toString() + ",message:" + message);
    		if(null == mapper){
    			 mapper = new ObjectMapper();
    		}
    		//请求消息转换为HyResponse对象
    		HyResponse response = mapper.readValue(msg, HyResponse.class);		
    		String remoteIp= ((InetSocketAddress)session.getRemoteAddress()).getAddress().getHostAddress();		
    		response.setRemoteIp(remoteIp);
    		HyRequest request = minaResponseProcesser.processResponse(response);
    		if(request == null){
    			//关闭当前session
    			closeSessionByServer(session,response);
    		}else{
    			session.write(mapper.writeValueAsString(request));
    		}
    	}
    }
    


    上面的逻辑就是,当要发送一个消息时,创建一个新的connector,并获取一个session发送消息后直接返回,在MinaClientHandler类的messageReceived里面处理接受到的响应数据,并进行业务处理。最后假设不须要再次发送请求,则关闭当前session。


    事实上出现本文一開始的问题就是在这里造成的。


    在出现我们的agent占用大量port后,我们这边的project人员就迅速定位到了这个问题,并非常快修复了。但修复并不理想,但修复过后的代码。


    /**
     * 异步发送消息
     * @param agent
     * @param request
     */
    public void sendMessageToAgent(Agent agent, HyRequest request) {
    	IoSession session = null;
    	IoConnector connector=null;
    	long startTime = System.currentTimeMillis();
    	try {
    		// 创建一个非堵塞的客户端程序
    		 connector = new NioSocketConnector();
    		// 设置链接超时时间
    		connector.setConnectTimeoutMillis(connectTimeoutMillis);
    
    		ObjectSerializationCodecFactory objsCodec = new ObjectSerializationCodecFactory();
    		objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER);
    		objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER);
    		ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(
    				objsCodec);
    		// 数据转换,编码设置
    		connector.getFilterChain()
    				.addLast("codec", codecFilter);
    		// 消息
    		connector.setHandler(clientHandler);
    		
    		SocketAddress socketAddress = new InetSocketAddress(
    				agent.getIpAddr(), agent.getAgentPort());
    		ConnectFuture future = connector.connect(socketAddress);
    		future.awaitUninterruptibly();
    		session = future.getSession();
    		String json = mapper.writeValueAsString(request);
    		session.write(json);
    		// 等待断开连接
    		session.getCloseFuture().awaitUninterruptibly();
    		long endTime = System.currentTimeMillis();
    		
    		logerr.debug("send-time:" + (endTime - startTime));
    		//connector.dispose();
    	} catch (Exception e) {
    		logerr.error("host:" + agent.getIpAddr() + ", AgentPORT:" + agent.getAgentPort()
    				+ ", 连接异常..."+e.getMessage());
    		clientHandler.handlerConnectError(agent, request);
    		
    	}finally{
    		if(null!=session){
    			session.close(true);
    			session=null;
    		}
    		if(null !=connector){
    			connector.dispose();
    		}
    	}
    }


    仅仅改了一个地方。就是在发送完消息后,加了一个等待断开连接语句和finally语句块-关闭session和connector。


    尽管不会出现程序占用大量的系统port这个问题。但会造成另外一个问题-当有一个消息队列须要异步调用上面语句发送消息时,有原来的异步(发送完直接返回,相当于高速并发发送)变成伪异步(发送完消息后并等待消息返回处理后返回,相当于顺序处理队列里面的消息)。


    上面的改动并非我们想要的结果,但至少修复了占用大量port的问题。


    因为怀着想彻底修复这个问题的想法,我想还是深入了解一下MINA源代码吧。



  • 相关阅读:
    UVa OJ 120
    ACM--string常见用法
    log4j
    总结13.11.9
    Java 动态生成 PDF 文件
    Linux_CentOS-服务器搭建 <七>
    关于dao层的封装和前端分页的结合(文章有点长,耐心点哦)
    Linux_CentOS-服务器搭建 <六>
    Linux_CentOS-服务器搭建 <五> 补充
    JQ获取CKeditor的值
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/8447912.html
Copyright © 2011-2022 走看看