zoukankan      html  css  js  c++  java
  • Socket+NIO实现客户端与服务器的通信的Demo

    使用NIO的一个最大优势就是客户端于服务器自己的不再是阻塞式的,也就意味着服务器无需通过为每个客户端的链接而开启一个线程。而是通过一个叫Selector的轮循器来不断的检测那个Channel有消息处理。当发现有消息要处理时,通过selectedKeys()方法就可以获取所有有消息要处理的Set集合了。由于select操作只管对selectedKeys的集合进行添加而不负责移除,所以当某个消息被处理后我们需要从该集合里去掉。

    我们首先编写服务器端的代码:

    public class NioService {
        public  void init() throws IOException {
            Charset charset = Charset.forName("UTF-8");
            // 创建一个选择器,可用close()关闭,isOpen()表示是否处于打开状态,他不隶属于当前线程
            Selector selector = Selector.open();
            // 创建ServerSocketChannel,并把它绑定到指定端口上
            ServerSocketChannel server = ServerSocketChannel.open();
            server.socket().bind(new InetSocketAddress(7777),1024);
            // 设置为非阻塞模式, 这个非常重要
            server.configureBlocking(false);
            // 在选择器里面注册关注这个服务器套接字通道的accept事件
            // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel
            server.register(selector, SelectionKey.OP_ACCEPT);
    
    
            while (true) {
                selector.select(1000);
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    //如果key对应的Channel包含客户端的链接请求
                    // OP_ACCEPT 这个只有ServerSocketChannel才有可能触发
                    key=it.next();
                    // 由于select操作只管对selectedKeys进行添加,所以key处理后我们需要从里面把key去掉
                    it.remove();
                    if (key.isAcceptable()) {
                        ServerSocketChannel ssc  = (ServerSocketChannel) key.channel();
                        // 得到与客户端的套接字通道
                        SocketChannel channel = ssc.accept();
                        channel.configureBlocking(false);
                        channel.register(selector, SelectionKey.OP_READ);
                        //将key对应Channel设置为准备接受其他请求
                        key.interestOps(SelectionKey.OP_ACCEPT);
                    }
                    if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        String content = "";
                        try {
                            int readBytes = channel.read(byteBuffer);
                            if (readBytes > 0) {
                                byteBuffer.flip(); //为write()准备
                                byte[] bytes = new byte[byteBuffer.remaining()];
                                byteBuffer.get(bytes);
                                content+=new String(bytes);
                                System.out.println(content);
                                //回应客户端
                                doWrite(channel);
                            }
                            // 写完就把状态关注去掉,否则会一直触发写事件(改变自身关注事件)
                            key.interestOps(SelectionKey.OP_READ);
                        } catch (IOException i) {
                            //如果捕获到该SelectionKey对应的Channel时出现了异常,即表明该Channel对于的Client出现了问题
                            //所以从Selector中取消该SelectionKey的注册
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        }
        private  void doWrite(SocketChannel sc) throws IOException{
            byte[] req ="服务器已接受".getBytes();
            ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
            byteBuffer.put(req);
            byteBuffer.flip();
            sc.write(byteBuffer);
            if(!byteBuffer.hasRemaining()){
                System.out.println("Send 2 Service successed");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    启动该类的init方法后程序进入死循环,现在我们编写客户端的代码:

    public class NioClient {
        // 创建一个套接字通道,注意这里必须使用无参形式
        private Selector selector = null;
        static  Charset charset = Charset.forName("UTF-8");
        private volatile boolean stop = false;
        public  ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(8);
        public  void  init() throws IOException{
            selector = Selector.open();
            SocketChannel channel = SocketChannel.open();
            // 设置为非阻塞模式,这个方法必须在实际连接之前调用(所以open的时候不能提供服务器地址,否则会自动连接)
            channel.configureBlocking(false);
            if(channel.connect(new InetSocketAddress("127.0.0.1",7777))){
                channel.register(selector, SelectionKey.OP_READ);
               //发送消息
              doWrite(channel, "66666666");
            }else {
                channel.register(selector, SelectionKey.OP_CONNECT);
            }
    
    
            //启动一个接受服务器反馈的线程
          //  new Thread(new ReceiverInfo()).start();
    
            while (!stop){
                selector.select(1000);
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while (it.hasNext()){
                    key = it.next();
                    it.remove();
                    SocketChannel sc = (SocketChannel) key.channel();
                    // OP_CONNECT 两种情况,链接成功或失败这个方法都会返回true
                    if (key.isConnectable()){
                        // 由于非阻塞模式,connect只管发起连接请求,finishConnect()方法会阻塞到链接结束并返回是否成功
                        // 另外还有一个isConnectionPending()返回的是是否处于正在连接状态(还在三次握手中)
                        if (channel.finishConnect()) {
                           /* System.out.println("准备发送数据");
                            // 链接成功了可以做一些自己的处理
                            channel.write(charset.encode("I am Coming"));
                            // 处理完后必须吧OP_CONNECT关注去掉,改为关注OP_READ
                            key.interestOps(SelectionKey.OP_READ);*/
                              sc.register(selector,SelectionKey.OP_READ);
                        //    new Thread(new DoWrite(channel)).start();
                          doWrite(channel, "66666666");
                        }else {
                            //链接失败,进程推出
                            System.exit(1);
                        }
                    } if(key.isReadable()){
                    //读取服务端的响应
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                         int readBytes = sc.read(buffer);
                        String content = "";
                        if (readBytes>0){
                            buffer.flip();
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            content+=new String(bytes);
                            stop=true;
                        }else if(readBytes<0) {
                            //对端链路关闭
                            key.channel();
                            sc.close();
                        }
                        System.out.println(content);
                        key.interestOps(SelectionKey.OP_READ);
                    }
                }
            }
        }
    private  void doWrite(SocketChannel sc,String data) throws IOException{
            byte[] req =data.getBytes();
            ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
            byteBuffer.put(req);
            byteBuffer.flip();
            sc.write(byteBuffer);
            if(!byteBuffer.hasRemaining()){
                System.out.println("Send 2 client successed");
            }
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    启动客户端类的init()方法之后就会向服务器发送一个字符串,服务器接受到之后会向客户端回应一个。运行如上代码,结果正确。

    直接在使用阻塞式IO的时候,在客户端与服务器之间传输时使用了一个经典范式,客户端使用维护一个队列来发送数据给服务器.

    代码示例如下:

    private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(8);
     public void connect() throws IOException {
            // 三次握手
            if (socket == null || socket.isClosed()) {
                socket = new Socket(InfoUtils.SOCKET_IP, InfoUtils.SOCKET_PORT);
            }
            //发送消息
           new Thread(new SendMessage()).start();
    
    }
     public class SendMessage implements Runnable{
            @Override
            public void run() {
                try {
                    OutputStream os = socket.getOutputStream();
                    while (true){
                        String content = queue.take();
                        os.write(content.getBytes());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
  • 相关阅读:
    C库函数中字符串处理函数集合(转)
    浅谈C++底层机制
    vc2008快捷键
    Delphi格式输出的用法
    Devenv 命令行开关
    DX皮肤控制
    C#格式化
    linq查询DataView
    WCF中的session用法
    VS2012clickonce发布问题
  • 原文地址:https://www.cnblogs.com/songjy2116/p/7719834.html
Copyright © 2011-2022 走看看