zoukankan      html  css  js  c++  java
  • NIO中几个非常重要的技术点

    参考:http://ifeve.com/selectors/

    参考:https://www.ibm.com/developerworks/cn/education/java/j-nio/j-nio.html

    netty的NioEventLoop类的实现也是类似

    这些都是在实践中踩过雷的,今天某应用再次踩雷,把遇到的几个雷都收集一下,给后来者参考。

    1.即使是accept事件,没有真正的read和write,Channel也要关闭,否则unix domain socket会被泄漏(WINDOWS更可怕),因为NIO的每个

    Channel上都有两个FD用来监听事件(接收和发送走不同的FD)。

    2.cancel事件导致CPU占用100%,http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 

    其原因就是调用key.cancel()时底层在下一次seelect前并没有真正的取消。导致等待select事件返回却又没有返回我们注册的key.这个事件不断地

    循环触发,CPU一直处理返回 key为0的select()调用。解决方法有两种,一是在key.cancel()后立即selectNow();但是如果是多线程并发操作,有

    可能这两行语句中间线程被切换,使得key.cancel()后没有立即执行 selectNow().这在多Selector情况下是可能的。另一种就是jetty处理方式,如果

    select()返回0且连续几次出现这样的情况(有事件触发返回,却不是返回我们注册的KEY),就将有效的key重新注册到一个新的selector上。其实

    glassfish在处理多次次次次write返回为0的情况时也是这种策略。

    示例代码:(真实的项目中)

    int selectTimeout = connectionConfig.getSelectTimeout();  
    int allProcessMaxTime = connectionConfig.getAllProcessMaxTime();  
    //selector在实现时有bug,epool底层可能会发送一个错误的信号导致select方法提前返回,但没有  
    //返回注册的事件,而且不断循环造成CPU100%  
    int slelectZeroCount = 0;  
    int maxZeroCount = 20;  
    int fixed = 0;  
      
    while (selector.isOpen() && selector.keys().size() != 0 && allProcessMaxTime > 0) {  
        long start = System.currentTimeMillis();  
        // 查询看是否有已经准备好的通道,指定超时时间  
        int count = selector.select(selectTimeout);  
      
        if (count == 0) {  
            slelectZeroCount++;  
        } else {  
            slelectZeroCount = 0;  
            //保证是连续的count==0时才将slelectZeroCount++,如果其中有一次返回注册事件测已经正常  
        }  
        if (slelectZeroCount > maxZeroCount && fixed == 0) {  
            //没有尝试修复动作,则先进行修复干预  
            for (SelectionKey key : selector.keys()) {  
                if (key.isValid() && key.interestOps() == 0) {  
                    key.cancel();  
                }  
            }  
            fixed = 1;  
        } else if (slelectZeroCount > maxZeroCount && fixed == 1) {  
            //如果已经干预过仍然连续返回0,注意如果不返回0的话slelectZeroCount就被置0.  
            //重新获取一个selector,将当前事件重新注册到新的selector上。并销毁当前selector  
            Selector newSelector = this.getSelector();  
            this.changeSelector(selector, newSelector);  
            selector = newSelector;  
        }  
        //对channel进行正常处理  
    

    重新注册的代码:

    private synchronized void changeSelector(Selector oldSelector, Selector newSelector) {  
            for (SelectionKey key : oldSelector.keys()) {  
                if (!key.isValid() || key.interestOps() == 0) {  
                    continue;  
                }  
                Object att = key.attachment();  
                try {  
                    if (att == null) {  
                        key.channel().register(newSelector, key.interestOps());  
                    } else {  
                        key.channel().register(newSelector, key.interestOps(), att);  
                    }  
                } catch (ClosedChannelException e) {  
                    SocketChannel sc = (SocketChannel) key.channel();  
                    sc.close();  
                }  
            }  
            try {  
                oldSelector.close();  
            } catch (IOException e) {  
                logger.error(e.getMessage());  
            }  
      
        }  
    

     同样对于网络状态不好时,连续写操作返回0的处理: 

     

    private void flushData(Selector selector, SocketChannel socketChannel, ByteBuffer byteBuffer)  
            throws IOException {  
      
        int count = 0;  
        int maxCount = 20;  
      
        while (byteBuffer.hasRemaining()) {  
            int len = socketChannel.write(byteBuffer);  
            if (len < 0) {  
                throw new EOFException("write channel is closed.");  
            }  
            // 如果不对len==0(即当前网络不可用)的情况处理,则while(byteBuffer.hasRemaining())可能一直  
            // 循环下去而消耗大量的CPU.  
            if (len == 0) {  
                count++;  
            } else {  
                count = 0;  
            }  
            if (count > maxCount) {  
                throw new IOException("can't connect to target.");  
            }  
        }  
      
    }  

    我自己写的代码:

    package com.eshore.ismp.hbinterface.crm;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.Set;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import com.eshore.ismp.hbinterface.service.BizCommonService;
    
    /**
     * @author mercy
     *接收CRM优惠工单信息
     */
    public class CrmServerTest {
    	private static final Logger logger = LoggerFactory.getLogger(CrmServerTest.class);  
    	private Selector selector=null;
    	private ServerSocketChannel serverSocketChannel=null;
    	private int port=10003;
    	//private Charset charset=Charset.forName("GBK");//返回一个字符类型对象
    	public CrmServerTest() throws IOException{
    		serverSocketChannel=ServerSocketChannel.open();
    		serverSocketChannel.socket().setReuseAddress(true);
    		serverSocketChannel.configureBlocking(false);//设置无阻塞模式
    		serverSocketChannel.socket().bind(new InetSocketAddress(port));
    		logger.info("服务已启动...");
    	}
    	public void start(BizCommonService bizCommonService) throws IOException{
    		buildSelector();
    		this.service(bizCommonService);
    	}
    	public void buildSelector()throws IOException{
    		logger.info("构建selector");
    		selector=Selector.open();
    		//创建Selector对象
    		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
    	//重建selector
    	public void rebuildSelector(){
    		logger.info("重建selector");
    		Selector oldSelector=selector;
    		if (oldSelector == null) {
                return;
            }
    		try {
    			selector=Selector.open();
    		} catch (IOException e1) {
    			e1.printStackTrace();
    		}
    		int nChannels = 0;
    		for (SelectionKey key: oldSelector.keys()) {
    			//返回key的附加对象
    			 Object a = key.attachment();
    			 try{
    				 if (!key.isValid() || key.channel().keyFor(selector) != null) {
    	                 continue;
    	             }
    				 int interestOps = key.interestOps();
    	             key.cancel();
    	             key.channel().register(selector, interestOps, a);
    	             //就不再更新通道的selectKey
    	             nChannels ++;
    	         } catch (Exception e) {
    	             logger.warn("Failed to re-register a Channel to the new Selector.", e);
    	         }
    		}
    		try {
                // time to close the old selector as everything else is registered to the new one
                oldSelector.close();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", t);
                }
            }
    		  logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    	public void service(BizCommonService bizCommonService) throws IOException{
    		int start=0;
    		while(true){
    			if(selector.select()==0){
    				start++;
    				logger.info("continue....");
    				continue;
    			}
    			if(start==5000){
    				start=0;
    				logger.info("重建selector");
    				rebuildSelector();
    			}
    			Set readyKeys=selector.selectedKeys();
    			Iterator it=readyKeys.iterator();
    			while(it.hasNext()){
    				SelectionKey key=null;
    				try{
    					key=(SelectionKey)it.next();
    					it.remove();//删除集合中的key
    					if(key.isAcceptable()){//是否可以接收客户端的socket连接
    						ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
    						SocketChannel socketChannel=ssc.accept();
    						//logger.info("接收到的客户端连接,来自:"+socketChannel.socket().getInetAddress()+":"+socketChannel.socket().getPort());
    						socketChannel.configureBlocking(false);//设置无阻塞模式
    						ByteBuffer buffer=ByteBuffer.allocate(6000);//创建一个ByteBuffer对象用于存放数据(数据存放缓冲区)
    						socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);//注册事件,Selector会监控事件是否发生
    					}
    					if(key.isReadable()){//key的channel是否可读
    						receive(key);
    					}
    					if(key.isWritable()){//key的channel是否可写
    						send(key,bizCommonService);
    					}
    				}catch(IOException e){
    					logger.info(" crm exception....");
    					e.printStackTrace();
    					try{
    						if(key!=null){
    							key.cancel();
    							key.channel().close();
    							selector.selectNow();
    						}
    					}catch(Exception ex){
    						ex.printStackTrace();
    					}
    				}
    			}
    		}
    
    	}
    	/**
    	 * @param key
    	 * @throws IOException
    	 * @author mercy
    	 * 根据读取的数据处理完返回给客户端
    	 */
    	public void send(SelectionKey key,BizCommonService bizCommonService) throws IOException{
    		ByteBuffer buffer=(ByteBuffer) key.attachment();
    		SocketChannel socketChannel=(SocketChannel) key.channel();
    		buffer.flip();
    		String data=decode(buffer);//解码客户端发过来的数据
    		if(data.length()==0){
    			return ;
    		}
    		String outputData=data;//.substring(0, data.indexOf("
    ")+1);
    		//logger.info("客户端发送的数据:"+outputData+",length:"+outputData.length());
    		//String reply="FFFF02141433570200012400050301IBSS01662       001023CS0214143357*0189086510002001100301420170214143500004003099005007success";
    		//ByteBuffer outputBuffer=encode("echo:"+reply);//返回给客户端的数据
    		boolean result = bizCommonService.sendOperToCacheAysn(String.valueOf(outputData));
    		//创建响应报文
    		String res = bizCommonService.createResponseStr(String.valueOf(outputData),result);
    		ByteBuffer outputBuffer=encode(res);//返回给客户端的数据
    		while(outputBuffer.hasRemaining()){
    			//System.out.println("=="+decode(outputBuffer));
    			socketChannel.write(outputBuffer);
    		}
    		ByteBuffer temp=encode(outputData);
    		buffer.position(temp.limit());
    		buffer.compact();
    		if(outputData.length()==0){
    			key.cancel();
    			socketChannel.close();
    			logger.info("关闭与某客户端的连接");
    		}
    	}
    	/**
    	 * @param key
    	 * @throws IOException
    	 * @author mercy
    	 * 读取客户端发来的数据
    	 */
    	public void receive(SelectionKey key) throws IOException{
    		ByteBuffer buffer=(ByteBuffer) key.attachment();
    		SocketChannel socketChannel=(SocketChannel) key.channel();
    		ByteBuffer readBuffer=ByteBuffer.allocate(6000);//创建自定义内存的buffer(存放读到的数据)
    		socketChannel.read(readBuffer);
    		readBuffer.flip();
    		buffer.limit(buffer.capacity());//设置buffer的极限为buffer的容量
    		buffer.put(readBuffer);//复制到缓存区
    	}
    	public String decode(ByteBuffer buffer){//解码
    		CharBuffer charBuffer=Charset.forName("GBK").decode(buffer);
    		return charBuffer.toString();
    		
    	}
    	public ByteBuffer encode(String str){//编码
    		return Charset.forName("GBK").encode(str);
    	}
    	public static void main(String[] args) throws IOException {
    		final Logger log = LoggerFactory.getLogger(CrmServerTest.class);  
    		try{
    			ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
    					new String[] { "applicationContext.xml" });
    			context.start();
    			BizCommonService bizCommonService = (BizCommonService) context.getBean("bizCommonService");
    			new CrmServerTest().start(bizCommonService);  
    		}catch(Exception e){
    			log.error("start agent interface server error:",e);
    			System.exit(-1);
    		}
    	}
    }
    

    一般来说

     while(true){
                if(selector.select()==0){
                    start++;
                    logger.info("continue....");
                    continue;
                }
    

      没有

    selector.select()>0
    

    好,因为万一 selector.select() 不阻塞了返回0用while(true)会反复执行 if(selector.select()==0) 这一段

  • 相关阅读:
    全栈程工程师
    月薪8000的程序员和月薪2万的程序员差别在哪里?
    原型中的访问
    关于 基本类型和复合类型 用== 判断的小问题
    使用原型解决构造函数问题
    前端工程师学习路线 --书籍
    程序员成长之路
    GIT学习(1) Pull和Fetch
    OO面向对象编程:第四单元总结及课程总结
    OO面向对象编程:第三单元总结
  • 原文地址:https://www.cnblogs.com/JAYIT/p/8250174.html
Copyright © 2011-2022 走看看