zoukankan      html  css  js  c++  java
  • NIO客户端和服务端的一些例子

    转自https://www.oschina.net/question/54100_33531

    客户端

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    
    /**
     * NIO TCP 客户端
     * 
     * @date    2010-2-3
     * @time    下午03:33:26
     * @version 1.00
     */
    public class TCPClient{
      // 信道选择器
      private Selector selector;
      
      // 与服务器通信的信道
      SocketChannel socketChannel;
      
      // 要连接的服务器Ip地址
      private String hostIp;
      
      // 要连接的远程服务器在监听的端口
      private int hostListenningPort;
      
      /**
       * 构造函数
       * @param HostIp
       * @param HostListenningPort
       * @throws IOException
       */
      public TCPClient(String HostIp,int HostListenningPort)throws IOException{
        this.hostIp=HostIp;
        this.hostListenningPort=HostListenningPort;   
        
        initialize();
      }
      
      /**
       * 初始化
       * @throws IOException
       */
      private void initialize() throws IOException{
        // 打开监听信道并设置为非阻塞模式
        socketChannel=SocketChannel.open(new InetSocketAddress(hostIp, hostListenningPort));
        socketChannel.configureBlocking(false);
        
        // 打开并注册选择器到信道
        selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_READ);
        
        // 启动读取线程
        new TCPClientReadThread(selector);
      }
      
      /**
       * 发送字符串到服务器
       * @param message
       * @throws IOException
       */
      public void sendMsg(String message) throws IOException{
        ByteBuffer writeBuffer=ByteBuffer.wrap(message.getBytes("UTF-16"));
        socketChannel.write(writeBuffer);
      }
      
      public static void main(String[] args) throws IOException{
        TCPClient client=new TCPClient("192.168.0.1",1978);
        
        client.sendMsg("你好!Nio!醉里挑灯看剑,梦回吹角连营");
      }
    }
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    
    public class TCPClientReadThread implements Runnable{
      private Selector selector;
      
      public TCPClientReadThread(Selector selector){
        this.selector=selector;
        
        new Thread(this).start();
      }
      
      public void run() {
        try {
          while (selector.select() > 0) {
            // 遍历每个有可用IO操作Channel对应的SelectionKey
            for (SelectionKey sk : selector.selectedKeys()) {
              
              // 如果该SelectionKey对应的Channel中有可读的数据
              if (sk.isReadable()) {
                // 使用NIO读取Channel中的数据
                SocketChannel sc = (SocketChannel) sk.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                sc.read(buffer);
                buffer.flip();
                
                // 将字节转化为为UTF-16的字符串   
                String receivedString=Charset.forName("UTF-16").newDecoder().decode(buffer).toString();
                
                // 控制台打印出来
                System.out.println("接收到来自服务器"+sc.socket().getRemoteSocketAddress()+"的信息:"+receivedString);
                
                // 为下一次读取作准备
                sk.interestOps(SelectionKey.OP_READ);
              }
              
              // 删除正在处理的SelectionKey
              selector.selectedKeys().remove(sk);
            }
          }
        } catch (IOException ex) {
          ex.printStackTrace();
        }   
      }
    }
    View Code

    服务端

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.util.Iterator;
    
    /**
     * TCP服务器端 
     * 
     * @date    2010-2-3
     * @time    上午08:39:48
     * @version 1.00
     */
    public class TCPServer{
      // 缓冲区大小
      private static final int BufferSize=1024;
      
      // 超时时间,单位毫秒
      private static final int TimeOut=3000;
      
      // 本地监听端口
      private static final int ListenPort=1978;
      
      public static void main(String[] args) throws IOException{
        // 创建选择器
        Selector selector=Selector.open();
        
        // 打开监听信道
        ServerSocketChannel listenerChannel=ServerSocketChannel.open();
        
        // 与本地端口绑定
        listenerChannel.socket().bind(new InetSocketAddress(ListenPort));
        
        // 设置为非阻塞模式
        listenerChannel.configureBlocking(false);
        
        // 将选择器绑定到监听信道,只有非阻塞信道才可以注册选择器.并在注册过程中指出该信道可以进行Accept操作
        listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        // 创建一个处理协议的实现类,由它来具体操作
        TCPProtocol protocol=new TCPProtocolImpl(BufferSize);
        
        // 反复循环,等待IO
        while(true){
          // 等待某信道就绪(或超时)
          if(selector.select(TimeOut)==0){
            System.out.print("独自等待.");
            continue;
          }
          
          // 取得迭代器.selectedKeys()中包含了每个准备好某一I/O操作的信道的SelectionKey
          Iterator<SelectionKey> keyIter=selector.selectedKeys().iterator();
          
          while(keyIter.hasNext()){
            SelectionKey key=keyIter.next();
            
            try{
              if(key.isAcceptable()){
                // 有客户端连接请求时
                protocol.handleAccept(key);
              }
              
              if(key.isReadable()){
                // 从客户端读取数据
                protocol.handleRead(key);
              }
              
              if(key.isValid() && key.isWritable()){
                // 客户端可写时
                protocol.handleWrite(key);
              }
            }
            catch(IOException ex){
              // 出现IO异常(如客户端断开连接)时移除处理过的键
              keyIter.remove();
              continue;
            }
            
            // 移除处理过的键
            keyIter.remove();
          }
        }
      }
    }
    
    import java.io.IOException;
    import java.nio.channels.SelectionKey;
    
    /**
     * TCPServerSelector与特定协议间通信的接口
     * 
     * @date    2010-2-3
     * @time    上午08:42:42
     * @version 1.00
     */
    public interface TCPProtocol{
      /**
       * 接收一个SocketChannel的处理
       * @param key
       * @throws IOException
       */
      void handleAccept(SelectionKey key) throws IOException;
      
      /**
       * 从一个SocketChannel读取信息的处理
       * @param key
       * @throws IOException
       */
      void handleRead(SelectionKey key) throws IOException;
      
      /**
       * 向一个SocketChannel写入信息的处理
       * @param key
       * @throws IOException
       */
      void handleWrite(SelectionKey key) throws IOException;
    }
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Date;
    
    /**
     * TCPProtocol的实现类
     * 
     * @date    2010-2-3
     * @time    上午08:58:59
     * @version 1.00
     */
    public class TCPProtocolImpl implements TCPProtocol{
      private int bufferSize;
      
      public TCPProtocolImpl(int bufferSize){
        this.bufferSize=bufferSize;
      }
    
      public void handleAccept(SelectionKey key) throws IOException {
        SocketChannel clientChannel=((ServerSocketChannel)key.channel()).accept();
        clientChannel.configureBlocking(false);
        clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocate(bufferSize));
      }
    
      public void handleRead(SelectionKey key) throws IOException {
        // 获得与客户端通信的信道
        SocketChannel clientChannel=(SocketChannel)key.channel();
        
        // 得到并清空缓冲区
        ByteBuffer buffer=(ByteBuffer)key.attachment();
        buffer.clear();
        
        // 读取信息获得读取的字节数
        long bytesRead=clientChannel.read(buffer);
        
        if(bytesRead==-1){
          // 没有读取到内容的情况
          clientChannel.close();
        }
        else{
          // 将缓冲区准备为数据传出状态
          buffer.flip();
          
          // 将字节转化为为UTF-16的字符串   
          String receivedString=Charset.forName("UTF-16").newDecoder().decode(buffer).toString();
          
          // 控制台打印出来
          System.out.println("接收到来自"+clientChannel.socket().getRemoteSocketAddress()+"的信息:"+receivedString);
          
          // 准备发送的文本
          String sendString="你好,客户端. @"+new Date().toString()+",已经收到你的信息"+receivedString;
          buffer=ByteBuffer.wrap(sendString.getBytes("UTF-16"));
          clientChannel.write(buffer);
          
          // 设置为下一次读取或是写入做准备
          key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }
      }
    
      public void handleWrite(SelectionKey key) throws IOException {
        // do nothing
      }
    }

    服务端 例子2

    package com.stevex.app.nio;
     
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
     
    public class XiaoNa {
        private ByteBuffer readBuffer;
        private Selector selector;
         
        public static void main(String[] args){
            XiaoNa xiaona = new XiaoNa();
            xiaona.init();
            xiaona.listen();
        }
         
        private void init(){
            readBuffer = ByteBuffer.allocate(1024);
            ServerSocketChannel servSocketChannel;
             
            try {
                servSocketChannel = ServerSocketChannel.open();
                servSocketChannel.configureBlocking(false);
                //绑定端口
                servSocketChannel.socket().bind(new InetSocketAddress(8383));
                 
                selector = Selector.open();
                servSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }      
        }
     
        private void listen() {
            while(true){
                try{
                    selector.select();             
                    Iterator ite = selector.selectedKeys().iterator();
                     
                    while(ite.hasNext()){
                        SelectionKey key = (SelectionKey) ite.next();                  
                        ite.remove();//确保不重复处理
                         
                        handleKey(key);
                    }
                }
                catch(Throwable t){
                    t.printStackTrace();
                }                          
            }              
        }
     
        private void handleKey(SelectionKey key)
                throws IOException, ClosedChannelException {
            SocketChannel channel = null;
             
            try{
                if(key.isAcceptable()){
                    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                    channel = serverChannel.accept();//接受连接请求
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);
                }
                else if(key.isReadable()){
                    channel = (SocketChannel) key.channel();
                    readBuffer.clear();
                    /*当客户端channel关闭后,会不断收到read事件,但没有消息,即read方法返回-1
                     * 所以这时服务器端也需要关闭channel,避免无限无效的处理*/              
                    int count = channel.read(readBuffer);
                     
                    if(count > 0){
                        //一定需要调用flip函数,否则读取错误数据
                        readBuffer.flip();
                        /*使用CharBuffer配合取出正确的数据
                        String question = new String(readBuffer.array());  
                        可能会出错,因为前面readBuffer.clear();并未真正清理数据
                        只是重置缓冲区的position, limit, mark,
                        而readBuffer.array()会返回整个缓冲区的内容。
                        decode方法只取readBuffer的position到limit数据。
                        例如,上一次读取到缓冲区的是"where", clear后position为0,limit为 1024,
                        再次读取“bye"到缓冲区后,position为3,limit不变,
                        flip后position为0,limit为3,前三个字符被覆盖了,但"re"还存在缓冲区中,
                        所以 new String(readBuffer.array()) 返回 "byere",
                        而decode(readBuffer)返回"bye"。            
                        */
                        CharBuffer charBuffer = CharsetHelper.decode(readBuffer); 
                        String question = charBuffer.toString(); 
                        String answer = getAnswer(question);
                        channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
                    }
                    else{
                        //这里关闭channel,因为客户端已经关闭channel或者异常了
                        channel.close();               
                    }                      
                }
            }
            catch(Throwable t){
                t.printStackTrace();
                if(channel != null){
                    channel.close();
                }
            }      
        }
         
        private String getAnswer(String question){
            String answer = null;
             
            switch(question){
            case "who":
                answer = "我是小娜
    ";
                break;
            case "what":
                answer = "我是来帮你解闷的
    ";
                break;
            case "where":
                answer = "我来自外太空
    ";
                break;
            case "hi":
                answer = "hello
    ";
                break;
            case "bye":
                answer = "88
    ";
                break;
            default:
                    answer = "请输入 who, 或者what, 或者where";
            }
             
            return answer;
        }
    }
  • 相关阅读:
    docker 安装部署 mysql(配置文件启动)
    mac Charles抓包
    docker 安装部署 redis(配置文件启动)
    安装mysql5.6
    Linux命令
    git命令汇总
    Java程序占用的内存可能会大于Xmx
    排序
    二分查找
    Elasticsearch核心技术(一):Elasticsearch环境搭建
  • 原文地址:https://www.cnblogs.com/legion/p/9234741.html
Copyright © 2011-2022 走看看