zoukankan      html  css  js  c++  java
  • NIO 源码分析(02-2) BIO 源码分析 Socket

    NIO 源码分析(02-2) BIO 源码分析 Socket

    Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

    在上一篇文章中详细分析了 ServerSocket 的源码,Socket 和 ServerSocket 一样也只是一个门面模式,真正的实现也是 SocksSocketImpl,所以关于 setImpl、createImpl、new、bind、listen 都是类似的,本文重点关注其 connect 和 IO 流的读取方法。

    一、BIO 最简使用姿势

    //1. 连接服务器
    Socket socket = new Socket();
    socket.connect(new InetSocketAddress(HOST, PORT), 0);
    BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    PrintWriterout = new PrintWriter(socket.getOutputStream(), true);
    
    //2. 发送请求数据
    out.println("客户端发送请求数据...");
    
    //3. 接收服务端数据
    String response = in.readLine();
    System.out.println("Client: " + response);
    

    ok,代码已经完成!!!下面的源码分析都会基于这个 demo。

    二、connect 方法

    Socket.connect 方法

    2.1 Socket.connect 方法

    // timeout=0 表示永久阻塞,timeout>0 则指定超时时间
    public void connect(SocketAddress endpoint, int timeout) throws IOException {
      
        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        InetAddress addr = epoint.getAddress ();
        int port = epoint.getPort();
    
        // 1. 创建底层 socket 套接字
        if (!created)
            createImpl(true);
    
        // 2. oldImpl 默认为 false,也就是进入第一个 if 条件
        //    checkOldImpl 会判断 impl 中有没有 connect(SocketAddress address, int port) 方法
        //    来设置 oldImpl 的值
        if (!oldImpl)
            impl.connect(epoint, timeout);
        else if (timeout == 0) {
            if (epoint.isUnresolved())
                impl.connect(addr.getHostName(), port);
            else
                impl.connect(addr, port);
        } else
            throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
        connected = true;
        bound = true;
    }
    

    总结: Socket 首先和 ServerSocket 一样调用 createImpl 创建底层 socket 对象,然后委托给 impl 完成连接操作

    2.2 AbstractPlainSocketImpl.connect 方法

    protected void connect(SocketAddress address, int timeout) throws IOException {
        boolean connected = false;
        try {
            InetSocketAddress addr = (InetSocketAddress) address;
            this.port = addr.getPort();
            this.address = addr.getAddress();
    
            connectToAddress(this.address, port, timeout);
            connected = true;
        } finally {
            if (!connected) {
                close();
            }
        }
    }
    
    private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
        if (address.isAnyLocalAddress()) {
            doConnect(InetAddress.getLocalHost(), port, timeout);
        } else {
            doConnect(address, port, timeout);
        }
    }
    

    总结: connect 将连接具体由 doConnect 完成

    synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
        synchronized (fdLock) {
            if (!closePending && (socket == null || !socket.isBound())) {
                NetHooks.beforeTcpConnect(fd, address, port);
            }
        }
        try {
            acquireFD();
            try {
                socketConnect(address, port, timeout);
                /* socket may have been closed during poll/select */
                synchronized (fdLock) {
                    if (closePending) {
                        throw new SocketException ("Socket closed");
                    }
                }
                if (socket != null) {
                    socket.setBound();
                    socket.setConnected();
                }
            } finally {
                releaseFD();
            }
        } catch (IOException e) {
            close();
            throw e;
        }
    }
    

    2.3 DualStackPlainSocketImpl.socketConnect 方法

    void socketConnect(InetAddress address, int port, int timeout)
        throws IOException {
        int nativefd = checkAndReturnNativeFD();
    
        if (address == null)
            throw new NullPointerException("inet address argument is null.");
    
        int connectResult;
        if (timeout <= 0) {
            connectResult = connect0(nativefd, address, port);
        } else {
            configureBlocking(nativefd, false);
            try {
                connectResult = connect0(nativefd, address, port);
                if (connectResult == WOULDBLOCK) {
                    waitForConnect(nativefd, timeout);
                }
            } finally {
                configureBlocking(nativefd, true);
            }
        }
       
        if (localport == 0)
            localport = localPort0(nativefd);
    }
    
    补充1:connect0 在 JVM 中的实现
    JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0
      (JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port) {
        SOCKETADDRESS sa;
        int rv;
        int sa_len = sizeof(sa);
    
        if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
                                     &sa_len, JNI_TRUE) != 0) {
          return -1;
        }
    
        rv = connect(fd, (struct sockaddr *)&sa, sa_len);
        if (rv == SOCKET_ERROR) {
            int err = WSAGetLastError();
            if (err == WSAEWOULDBLOCK) {
                return java_net_DualStackPlainSocketImpl_WOULDBLOCK;
            } else if (err == WSAEADDRNOTAVAIL) {
                JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException",
                    "connect: Address is invalid on local machine, or port is not valid on remote machine");
            } else {
                NET_ThrowNew(env, err, "connect");
            }
            return -1;  // return value not important.
        }
        return rv;
    }
    

    总结: rv = connect(fd, (struct sockaddr *)&sa, sa_len) 建立远程连接。

    补充2:waitForConnect 在 JVM 中的实现

    和 ServerSocket.waitForNewConnection 一样,也是通过 Winsock 库的 select 函数来实现超时的功能。

    JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
      (JNIEnv *env, jclass clazz, jint fd, jint timeout) {
        int rv, retry;
        int optlen = sizeof(rv);
        fd_set wr, ex;
        struct timeval t;
    
        FD_ZERO(&wr);
        FD_ZERO(&ex);
        FD_SET(fd, &wr);
        FD_SET(fd, &ex);
        t.tv_sec = timeout / 1000;
        t.tv_usec = (timeout % 1000) * 1000;
    
        rv = select(fd+1, 0, &wr, &ex, &t);
        if (rv == 0) {
            JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                            "connect timed out");
            shutdown( fd, SD_BOTH );
            return;
        }
        if (!FD_ISSET(fd, &ex)) {
            return;         /* connection established */
        }
    
        for (retry=0; retry<3; retry++) {
            NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
                           (char*)&rv, &optlen);
            if (rv) {
                break;
            }
            Sleep(0);
        }
    
        if (rv == 0) {
            JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                            "Unable to establish connection");
        } else {
            NET_ThrowNew(env, rv, "connect");
        }
    }
    

    总结: rv = select(fd+1, 0, &wr, &ex, &t) 轮询会阻塞程序。

    三、SocketInputStream

    3.1 构造方法

    SocketInputStream(AbstractPlainSocketImpl impl) throws IOException {
        super(impl.getFileDescriptor());
        this.impl = impl;
        socket = impl.getSocket();
    }
    

    总结: SocketInputStream 内部实现上也是对 impl 的封装。SocketInputStream.read 其实也是调用底层 socket 的 read 方法。

    3.2 read 方法

    int read(byte b[], int off, int length, int timeout) throws IOException {
        int n;
    
        // EOF already encountered
        if (eof) {
            return -1;
        }
    
        // connection reset
        if (impl.isConnectionReset()) {
            throw new SocketException("Connection reset");
        }
    
        // bounds check
        if (length <= 0 || off < 0 || off + length > b.length) {
            if (length == 0) {
                return 0;
            }
            throw new ArrayIndexOutOfBoundsException();
        }
    
        boolean gotReset = false;
    
        // acquire file descriptor and do the read
        FileDescriptor fd = impl.acquireFD();
        try {
            n = socketRead(fd, b, off, length, timeout);
            if (n > 0) {
                return n;
            }
        } catch (ConnectionResetException rstExc) {
            gotReset = true;
        } finally {
            impl.releaseFD();
        }
    
        /*
         * We receive a "connection reset" but there may be bytes still
         * buffered on the socket
         */
        if (gotReset) {
            impl.setConnectionResetPending();
            impl.acquireFD();
            try {
                n = socketRead(fd, b, off, length, timeout);
                if (n > 0) {
                    return n;
                }
            } catch (ConnectionResetException rstExc) {
            } finally {
                impl.releaseFD();
            }
        }
    
        /*
         * If we get here we are at EOF, the socket has been closed,
         * or the connection has been reset.
         */
        if (impl.isClosedOrPending()) {
            throw new SocketException("Socket closed");
        }
        if (impl.isConnectionResetPending()) {
            impl.setConnectionReset();
        }
        if (impl.isConnectionReset()) {
            throw new SocketException("Connection reset");
        }
        eof = true;
        return -1;
    }
    
    private int socketRead(FileDescriptor fd, byte b[], int off, int len,
            int timeout) throws IOException {
        return socketRead0(fd, b, off, len, timeout);
    }
    
    补充2:socketRead0 在 JVM 中的实现
    // src/windows/native/java/net/SocketInputStream.c
    JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
            jobject fdObj, jbyteArray data, jint off, jint len, jint timeout) {
        char *bufP;
        char BUF[MAX_BUFFER_LEN];
        jint fd, newfd;
        jint nread;
    
        if (IS_NULL(fdObj)) {
            JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
            return -1;
        }
        fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
        if (fd == -1) {
            NET_ThrowSocketException(env, "Socket closed");
            return -1;
        }
    
        /*
         * If the caller buffer is large than our stack buffer then we allocate
         * from the heap (up to a limit). If memory is exhausted we always use
         * the stack buffer.
         */
        if (len <= MAX_BUFFER_LEN) {
            bufP = BUF;
        } else {
            if (len > MAX_HEAP_BUFFER_LEN) {
                len = MAX_HEAP_BUFFER_LEN;
            }
            bufP = (char *)malloc((size_t)len);
            if (bufP == NULL) {
                /* allocation failed so use stack buffer */
                bufP = BUF;
                len = MAX_BUFFER_LEN;
            }
        }
    
        if (timeout) {
            if (timeout <= 5000 || !isRcvTimeoutSupported) {
                int ret = NET_Timeout (fd, timeout);
    
                if (ret <= 0) {
                    if (ret == 0) {
                        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                                        "Read timed out");
                    } else if (ret == JVM_IO_ERR) {
                        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
                    } else if (ret == JVM_IO_INTR) {
                        JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
                                        "Operation interrupted");
                    }
                    if (bufP != BUF) {
                        free(bufP);
                    }
                    return -1;
                }
    
                /*check if the socket has been closed while we were in timeout*/
                newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
                if (newfd == -1) {
                    NET_ThrowSocketException(env, "Socket Closed");
                    if (bufP != BUF) {
                        free(bufP);
                    }
                    return -1;
                }
            }
        }
    
        // 最关键的代码,recv 从 socketfd 中读取数据
        nread = recv(fd, bufP, len, 0);
    
        if (nread > 0) {
            (*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
        } else {
            if (nread < 0) {
                // Check if the socket has been closed since we last checked.
                // This could be a reason for recv failing.
                if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) {
                    NET_ThrowSocketException(env, "Socket closed");
                } else {
                    switch (WSAGetLastError()) {
                        case WSAEINTR:
                            JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                                "socket closed");
                            break;
                        case WSAECONNRESET:
                        case WSAESHUTDOWN:
                            /*
                             * Connection has been reset - Windows sometimes reports
                             * the reset as a shutdown error.
                             */
                            JNU_ThrowByName(env, "sun/net/ConnectionResetException", "");
                            break;
                        case WSAETIMEDOUT :
                            JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out");
                            break;
                        default:
                            NET_ThrowCurrent(env, "recv failed");
                    }
                }
            }
        }
        if (bufP != BUF) {
            free(bufP);
        }
        return nread;
    }
    

    总结: socketRead0 实现很长,其实我们只用关注核心的实现 nread = recv(fd, bufP, len, 0); 即可,毕竟我们不是专门做 c++。

    四、SocketInputStream

    和 SocketInputStream 类似,就不继续分析了。


    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    windows中dos命令指南
    HDU 2084 数塔 (dp)
    HDU 1176 免费馅饼 (dp)
    HDU 1004 Let the Balloon Rise (map)
    变态杀人狂 (数学)
    HDU 2717 Catch That Cow (深搜)
    HDU 1234 开门人和关门人 (模拟)
    HDU 1070 Milk (模拟)
    HDU 1175 连连看 (深搜+剪枝)
    HDU 1159 Common Subsequence (dp)
  • 原文地址:https://www.cnblogs.com/binarylei/p/11144334.html
Copyright © 2011-2022 走看看