zoukankan      html  css  js  c++  java
  • socket的NIO操作

    一、前言

          Java中直接使用socket进行通信的场景应该不是很多,在公司的一个项目中有这种需求,所以根据自己的理解和相关资料的参考,基于NIO 实现了一组工具类库,具体的协议还未定义,后续再整理

     

    二、实现思路

    包结构如下:

      

     

    Listener: 事件监听接口

    AcceptListener(请求事件接口),TCPServerProtocol实现类中当服务端接收到连接请求并成功建立通信之后通知注册的此事件集合;

    ReadListener(读取事件接口),TCPProtocol实现类中读取接收到的信息完成之后通知注册的此事件集合;

    SendListener(发送事件接口),TCPProtocol实现类中调用发送信息方法之后通知注册的此事件集合;

     

    Protocol:TCP处理接口

    TCPProtocol(读取、输出TCP处理接口),定义了NIO中关于输入、输出处理以及相关监听事件的维护

    TCPServerProtocol(接收TCP请求处理接口),定义了NIO中TCP请求处理以及相关监听事件的维护

     

    Util:辅助类

    HelperUtil(一些基本操作工具类),定义获取KEY值,获取本地IP,核对结束帧等

    SocketConfig(socket连接配置类),定义IP地址、端口、处理接口、是否自动重置等配置信息

    SocketStat(socket连接状态管理类),管理socket的生命周期,提供socket控制方法

    SocketLogicException(socket异常类),定义此类库中可能出现的异常

     

    Socket:对外服务类

    NSocketBlockClient(socket阻塞客户端类),提供阻塞的客户端实现

    NSocketClient(socket非阻塞客户端类),提供非阻塞的客户端实现

    NSocketService(socket非阻塞服务端类),提供非阻塞的服务端实现

    SelectOptionListener(发送事件接口的实现),用于改变selector的interestOps为OP_WRITE

    SocketClientTask(客户端线程任务),用于管理客户端的输入、输出事件以及异常处理

    SocketServiceTask(服务端线程任务),用于管理服务端的输入、输出、请求事件以及异常处理

     

    核心类图:

     

    三、使用方式

    (1) 通过实现AcceptListener、ReadListener、SendListener事件接口来注入发送、读取、接收业务

    (2) 通过设置SocketConfig对象属性来进行SOCKET通信配置

    (3) 通过NSocketBlockClient、NSocketClient、NSocketService对象来进行信息发送

      服务端例子:

     

    SocketConfig config=new SocketConfig(true, "10.33.6.178", 8899);
    TCPServerProtocol protocol=new DefaultServerProtocol(new AcceptListener(){
                @Override
                public void handleEvent(SocketStat socket) {
                    socket.getConfig().getProtocol().addReadListener(new TestRead());
                }
                
             });
       NSocketService server=new NSocketService(config, protocol);
    View Code

      其中 TestRead 为实现ReadListener的类 

    public void handleEvent(byte[] message,TCPProtocol tcpProtocol) {
            
            System.out.println(new String(message,Charset.forName("GBK")));
            
            tcpProtocol.sendMessage("hello".getBytes(Charset.forName("GBK")));                
    }
    View Code

      非阻塞客户端例子:

    SocketConfig config=new SocketConfig(true, "10.33.6.178", 8899);
      config.getProtocol().addReadListener(new ReadListener() {
                
                @Override
                public void handleEvent(byte[] message, TCPProtocol tcpProtocol) {
                    
                    System.out.println("recive from server:"+ new String(message));                
                }
       });
       NSocketClient client=new NSocketClient(config);
       
       client.sendMessage("test nsocket".getBytes());
    View Code

     关于NIO处理的核心类:

    public class DefaultServerProtocol implements TCPServerProtocol{ 
        
        private LinkedList<AcceptListener> acceptList=new LinkedList<AcceptListener>();       
        
        public DefaultServerProtocol(AcceptListener... acceptColl){
            for(AcceptListener al:acceptColl){
                this.addAcceptListener(al);
            }
        }
    
        @Override
        public SocketStat handleAccept(SelectionKey key) throws IOException {
            SocketChannel channel=((ServerSocketChannel)key.channel()).accept();  
            channel.configureBlocking(false);  
            Socket socket=channel.socket();
            SocketConfig config=new SocketConfig(false,socket.getInetAddress().getHostAddress(),socket.getPort(),this.createProtocol());        
            SocketStat stat=new SocketStat(config, key.selector(), channel);
            System.out.println("远程客户端地址:".concat(socket.getInetAddress().getHostAddress()));
            notifyAccept(stat);
            return stat;
        }
    
        @Override
        public void addAcceptListener(AcceptListener al) {
            this.acceptList.add(al);       
        }
    
        @Override
        public void notifyAccept(SocketStat socket) {
            for(AcceptListener al: acceptList){
                al.handleEvent(socket);
            }
        }
        
        /**
         * 
         * 获取关于TCP 的读取和写入操作协议,可以override 返回自己的实现
         *
         * @return      
         * @since  Ver 1.0
         */
        public TCPProtocol createProtocol(){
            return new DefaultTCPProtocol();
        }
    }
    View Code
    public class DefaultTCPProtocol implements TCPProtocol {       
        
        /**读取缓存区*/
        private ByteBuffer readBuff;        
        
        /**待发送消息队列*/
        protected Queue<ByteBuffer> messageQueue=new LinkedBlockingQueue<ByteBuffer>();
        
        /***/
        private Object lockObje=new Object();
        
        /**读取监听*/
        private LinkedList<ReadListener> readList=new LinkedList<ReadListener>();
        
        /**发送监听*/
        private LinkedList<SendListener> sendList=new LinkedList<SendListener>();
        
        public DefaultTCPProtocol(ReadListener... readColl){        
            readBuff=ByteBuffer.allocate(1024);          
            for(ReadListener rl : readColl){
                this.addReadListener(rl);
            }
        }   
        
    
        @Override
        public void handleRead(SelectionKey key) throws IOException {        
            SocketChannel clientChn=(SocketChannel)key.channel();
            ByteArrayOutputStream out=new ByteArrayOutputStream();
            try{   
                synchronized(this.readBuff){
                    readBuff.clear();
                    int bytesRead=clientChn.read(readBuff);    
                    if(bytesRead==-1){
                        throw new IOException("远程已关闭");
                    }
                    while(bytesRead>0){
                        readBuff.flip();                
                        out.write(readBuff.array(), 0, readBuff.limit());
                        readBuff.clear();
                        bytesRead=clientChn.read(readBuff);
                    }            
                }                               
                key.interestOps(SelectionKey.OP_READ);
                notifyRead(out.toByteArray());
            }finally{
                out.close();
            }
        }       
        
    
        @Override
        public  void handleWrite(SelectionKey key) throws IOException {
            SocketChannel clientChn=(SocketChannel)key.channel();
            ByteBuffer message=null;
            while(!messageQueue.isEmpty()){
                synchronized(lockObje){
                    if(!messageQueue.isEmpty()){
                        message=this.messageQueue.peek();                                        
                        clientChn.write(message);
                        if(message.hasRemaining()){                        
                           break; 
                        }                    
                        messageQueue.poll();
                    }                                      
                }               
            }        
            key.interestOps(SelectionKey.OP_READ);
        }
    
        /**
         * 
         * 发送消息(此时不是真正发送,而是放在一个待发送的队列中)
         *
         * @param message 信息      
         * @since  Ver 1.0
         */
        @Override
        public void sendMessage(byte[] message) {        
            messageQueue.add(ByteBuffer.wrap(message));
            this.notifySend(message);
        }
    
    
        @Override
        public void addReadListener(ReadListener rl) {        
            readList.add(rl);        
        }
    
    
        @Override
        public void addSendListener(SendListener sl) {
            sendList.add(sl);
        }
    
    
        @Override
        public void removeReadListener(ReadListener rl) {
            
            readList.remove(rl);
            
        }
    
    
        @Override
        public void removeSendListener(SendListener rl) {
            
             sendList.remove(rl);
            
        }
        
        private void notifyRead(byte[] message) {               
            for(ReadListener rl : readList){
                rl.handleEvent(message,this);
            }
        }
    
        private void notifySend(byte[] message) {
            
            for(SendListener rl : sendList){
                rl.handleEvent(message);
            }
            
        }
    
    }
    View Code

     

  • 相关阅读:
    压缩与解压缩 ZipHelper
    ESFramework介绍之(15)-- IRAS
    ESFramework介绍之(8)-- 客户端插件IPassiveAddin
    使用Eclipse开发Jsp
    深入探讨 java.lang.ref 包
    java多线程总结二
    模拟弹子台球java多线程应用
    简单的邮件客户端
    最近创办了一个java学习邮件列表
    优秀的XML~~Altova XMLSpy 2010英文企业版+有效破解方法
  • 原文地址:https://www.cnblogs.com/WGZ_Home/p/3400389.html
Copyright © 2011-2022 走看看