zoukankan      html  css  js  c++  java
  • 分布式架构基础 一、分布式架构的基石之远程通信协议

      分布式架构的特点是:服务的分布式(分布在不同的计算机节点)、以及远程通信实现数据的交互。
    那么就意味着,原本在一个war包下的:用户、订单、商品、库存等业务模块,按照业务维度拆分成用户服务、订单服务、商品服务、库存服务等独立的进程,部署在不同的计算机节点上。而拆分之后的服务,必然会涉及到远程通信的需要,比如用户服务,需要查询订单列表,则会调用订单服务的远程接口,获得订单信息。
    在这个过程中,会涉及到远程通信,所以远程通信的技术是整个分布式架构的一个基础,如果没有远程通信,那么分布式架构也就不存在了。

    对于内部服务通信来说,我们一般会采用RPC通信,而在RPC通信的实现中,必然会涉及到TCP传输协议、三次握手、四次挥手、TCP的四层网络模型、BIO、NIO、AIO、序列化、反序列化等。

    一、一次http请求通信的完整过程

    域名解析 -->
    发起TCP的3次握手 -->
    TCP连接建立后 发起http请求 -->
    服务器响应http请求,浏览器得到html代码 -->
    浏览器解析html代码,并请求html代码中的资源(如js、css、图片等) -->
    浏览器对页面进行渲染呈现给用户

    7层网络协议

    TCP三次握手、四次挥手

    为什么四次挥手
    TCP协议是一种面向连接的、可靠的、基于字节流的运输层通信协议。TCP是全双工模式,这就意味着,当主机1发出FIN报文段时,只是表示主机1已经没有数据要发送了,主机1告诉主机2,它的数据已经全部发送完毕了;但是,这个时候主机1还是可以接受来自主机2的数据;
    当主机2返回ACK报文段时,表示它已经知道主机1没有数据发送了,但是主机2还是可以发送数据到主机1的;
    当主机2也发送了FIN报文段时,这个时候就表示主机2也没有数据要发送了,就会告诉主机1,我也没有数据要发送了,之后彼此就会愉快的中断这次TCP连接。

    二、TCP/IP分层管理

    三、Java中使用TCP协议进行通信

    四、BIO、NIO机制

    BIO:同步阻塞式IO,服务器实现模式为一个连接一个线程,即客户端有socket连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善。
    NIO:同步非阻塞式IO,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。

    1. BIO

    a. 一个简单的通信样例

    先以一个简单的例子来演示进程或服务之间的通信:
    使用ServerSocket和Socket来演示一次简单的通信,端口号为8081,首先 server启动; client --发送数据至--> server; server接受; server --发送数据至--> client; client接收。
    ServerDemo

    import java.io.*;
    import java.net.InetAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class ServerDemo {
    
        public static void main(String[] args) {
    
            ServerSocket serverSocket = null;
            try {
    
                serverSocket = new ServerSocket(8081);
    
                Socket socket = serverSocket.accept();  // 阻塞
                InetAddress client = socket.getInetAddress();
                System.out.println(String.format("Server: 接收到来自 [%s] 的建立连接请求", client));
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();  // 阻塞
                System.out.println(String.format("Server: 接收到来自 [%s] 的消息 [%s]", client, receiveMsg));
    
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
                writer.write(sendMsg + "
    "); // 末尾加 
     来告诉通信对手本次消息已发送完成
                writer.flush();
                System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s]", client, sendMsg));
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != serverSocket) { serverSocket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
    

    ClientDemo

    import java.io.*;
    import java.net.Socket;
    
    public class ClientADemo {
    
        public static void main(String[] args) {
    
            Socket socket = null;
            try {
                socket = new Socket("127.0.0.1", 8081);
    
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hello this's clientA";
                bw.write(sendMsg + "
    ");
                bw.flush();
                System.out.println("ClientA: 发送消息 " + sendMsg);
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();  // 阻塞
                System.out.println("ClientA: 接收到回执 " + receiveMsg);
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != socket) { socket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    

    执行结果:

    server端:
    Server: 接收到来自 [/127.0.0.1] 的建立连接请求
    Server: 接收到来自 [/127.0.0.1] 的消息 [Hello this's clientA]
    ServerDemo: 发送回执至 [/127.0.0.1], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]]
    
    client端:
    ClientA: 发送消息 Hello this's clientA
    ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]
    

    b. socket的阻塞

    其实我们以debug方式启动上例中的server和client时会发现,当没有client建立连接,server在serverSocket.accept();处是处于阻塞状态的; 当client没有发消息,server在reader.readLine(); 读取消息时,也是处于阻塞状态的。
    下面以两个client连接server方式来演示通信的阻塞,模拟场景:
    创建两个客户端ClientA,ClientB,由ClientA建立与server的连接,clientA建立连接后等待10s再发送消息至客户端; 再此期间clientB与server通信;
    为了保证服务端Server在一次通信完成后继续通信,Server改为自旋。

    Server:

    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketAddress;
    import java.util.Date;
    
    public class ServerDemo {
    
        public static void main(String[] args) {
    
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(8081);
    
                while (!Thread.currentThread().isInterrupted()) { // 自旋
    
                    Socket socket = serverSocket.accept();  // A01. 第一次while循环内,ClientA先建立连接; A03. 当与clientA通信完毕,进入第二次while循环,才能收到ClientB建立的连接请求,开始与B的通信
                    SocketAddress client = socket.getRemoteSocketAddress();
                    System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));
    
                    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    String receiveMsg = reader.readLine();  // A02. 之后由于ClientA等待了10s才发消息,所以server读取不到ClientA的消息,阻塞了10s,之后才能继续执行
                    System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));
    
                    BufferedWriter writer = new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())));
                    String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
                    writer.write(sendMsg + " 
    ");  // 末尾加 
     来告诉通信对手本次消息已发送完成
                    writer.flush();
                    System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s] at[%s]", client, sendMsg, new Date()));
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != serverSocket) { serverSocket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    
    }
    

    ClientA:

    import java.io.*;
    import java.net.Socket;
    import java.util.concurrent.TimeUnit;
    import java.util.Date;
    
    public class ClientADemo {
    
        public static void main(String[] args) {
    
            Socket socket = null;
            try {
                socket = new Socket("127.0.0.1", 8081);
    
                TimeUnit.SECONDS.sleep(10);
    
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hello this's clientA";
                bw.write(sendMsg + "
    ");
                bw.flush();
                System.out.println("ClientA: 发送消息 " + sendMsg + " " + new Date());
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();
                System.out.println("ClientA: 接收到回执 " + receiveMsg + " " + new Date());
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != socket) { socket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    

    ClientB:

    import java.io.*;
    import java.net.Socket;
    import java.util.Date;
    
    public class ClientBDemo {
    
        public static void main(String[] args) {
    
            Socket socket = null;
            try {
                socket = new Socket("127.0.0.1", 8081);
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hello this's clientB";
                bw.write(sendMsg + "
    ");
                bw.flush();
                System.out.println("ClientB: 发送消息 " + sendMsg + " " + new Date());
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();
                System.out.println("ClientB: 接收到回执 " + receiveMsg + " " + new Date());
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != socket) { socket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
    

    执行结果:

    服务端:
    ServerDemo: 接收到来自 [/127.0.0.1:9892] 的建立连接请求 at[Sun Jun 14 10:57:13 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:9892] 的消息 [Hello this's clientA] at[Sun Jun 14 10:57:23 CST 2019]
    ServerDemo: 发送回执至 [/127.0.0.1:9892], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]] at[Sun Jun 14 10:57:23 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:9898] 的建立连接请求 at[Sun Jun 14 10:57:23 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:9898] 的消息 [Hello this's clientB] at[Sun Jun 14 10:57:23 CST 2019]
    ServerDemo: 发送回执至 [/127.0.0.1:9898], 消息为 [Hi this's server, has accept your's msg[Hello this's clientB]] at[Sun Jun 14 10:57:23 CST 2019]
    
    客户端A: 
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 10:57:23 CST 2019
    ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]  Sun Jun 14 10:57:23 CST 2019
    
    客户端B:
    ClientB: 发送消息 Hello this's clientB Sun Jun 14 10:57:14 CST 2019
    ClientB: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientB]  Sun Jun 14 10:57:23 CST 2019
    
    

    通过Server的运行结果可以看到,在57分13秒时收到A建立连接的请求,之后等待了10s到57分23时才收到A发出的请求之后完成与A的通信;在与A通信完成后,才继续执行了与B的通信;
    而B其实在57分14秒就已经把信息发送给了Server,等到57分23秒才完成这次通信。可见ServerSocket在处理请求时时阻塞并且串行的,这也就是所谓的BIO。
    而之所以服务端会等待10s才给B响应,是因为Server是在一个线程内执行的,Server在执行到String receiveMsg = reader.readLine(); 时会一直等待A发来的消息才会唤醒,所以不会accept B的建立连接请求。
    此时我们就可以进行些许优化,为每个socket单独建立一个线程,异步响应每个请求。

    c. 每个socket建立一个线程来执行

    一般我们使用线程池来对BIO进行优化。
    ServerDemo:

    
    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketAddress;
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ServerDemo {
    
        static ExecutorService pool = Executors.newFixedThreadPool(20);
    
        public static void main(String[] args) {
    
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(8081);
    
                while (!Thread.currentThread().isInterrupted()) { // 自旋
    
                    Socket socket = serverSocket.accept();  // 阻塞
    
                    pool.execute(new SocketThread(socket));
    
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != serverSocket) { serverSocket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
    class SocketThread implements Runnable {
    
        private Socket socket;
    
        public SocketThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
    
            try {
                SocketAddress client = socket.getRemoteSocketAddress();
                System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();  // 阻塞
                System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));
    
                BufferedWriter writer = new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())));
                String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
                writer.write(sendMsg + " 
    ");  // 末尾加 
     来告诉通信对手本次消息已发送完成
                writer.flush();
                System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s] at[%s]", client, sendMsg, new Date()));
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    }
    

    我们为每个socket建立单独的线程来响应,这样每个socket的交互是异步的,也就不存在之前建立连接时的阻塞问题了。
    仍然执行上例子中的A、B客户端

    Server:
    ServerDemo: 接收到来自 [/127.0.0.1:10363] 的建立连接请求 at[Sun Jun 14 11:34:20 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:10369] 的建立连接请求 at[Sun Jun 14 11:34:22 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:10369] 的消息 [Hello this's clientB] at[Sun Jun 14 11:34:22 CST 2019]
    ServerDemo: 发送回执至 [/127.0.0.1:10369], 消息为 [Hi this's server, has accept your's msg[Hello this's clientB]] at[Sun Jun 14 11:34:22 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:10363] 的消息 [Hello this's clientA] at[Sun Jun 14 11:34:30 CST 2019]
    ServerDemo: 发送回执至 [/127.0.0.1:10363], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]] at[Sun Jun 14 11:34:30 CST 2019]
    
    ClientA:
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 11:34:30 CST 2019
    ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]  Sun Jun 14 11:34:30 CST 2019
    
    ClientB:
    ClientB: 发送消息 Hello this's clientB Sun Jun 14 11:34:22 CST 2019
    ClientB: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientB]  Sun Jun 14 11:34:22 CST 2019
    

    可见ClientB无需等待Server与A通信完成后再与Server通信。

    d. 服务端每次接收到数据异步执行

    通过加入线程池的改造,我们使得每个socket可以单独与服务端进行通信,但是实际场景中可能Server在接收到client的消息后,需要进行相应的业务逻辑处理,也就是String receiveMsg = reader.readLine(); 之后的逻辑,如果这段逻辑处理起来耗时较长,那么下次client发送的数据就不能及时接受。此时也可以借助阻塞队列BlockIngQueue来帮助异步执行。

    模拟场景:
    ClientA每次3秒往Server发送一次消息;
    Server处理收到的消息需要10秒;每次server收到消息后直接放入阻塞队列总,由另一个线程异步处理,不影响下次消息的接受。

    ServerDemo :

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketAddress;
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class ServerDemo {
    
        private int port = 8081;
        private ExecutorService pool = Executors.newFixedThreadPool(20);
        private LinkedBlockingQueue<String> receiveMsgQueue = new LinkedBlockingQueue();
    
        public static void main(String[] args) {
    
            new ServerDemo();
    
        }
    
        public ServerDemo() {
    
            try {
                ServerSocket serverSocket = new ServerSocket(port);
    
                // 为每个客户端创建线程
                pool.execute(new ConnectorThread(serverSocket));
    
                // 客户端发来的消息,单独线程处理
                pool.execute(new ProcessMsgThread(receiveMsgQueue));
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    
        }
    
        class ConnectorThread implements Runnable {
    
            private ServerSocket serverSocket;
    
            public ConnectorThread(ServerSocket serverSocket) {
                this.serverSocket = serverSocket;
            }
    
            @Override
            public void run() {
    
                while (!Thread.currentThread().isInterrupted()) {
    
                    try {
                        Socket socket = serverSocket.accept();
                        pool.execute(new ReceiveMsgThread(socket));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
    
                }
    
            }
    
        }
    
        class ReceiveMsgThread implements Runnable {
    
            private Socket socket;
    
            public ReceiveMsgThread(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run() {
    
                try {
                    SocketAddress client = socket.getRemoteSocketAddress();
                    System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));
    
                    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    
                    while (!Thread.currentThread().isInterrupted()) {
                        String receiveMsg = reader.readLine();  // 阻塞
                        System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));
    
                        // 服务端处理消息需要20s,直接放入阻塞队列异步执行,然后继续接受下次消息
                        //TimeUnit.SECONDS.sleep(20);
                        receiveMsgQueue.put(receiveMsg);
    
                    }
    
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
        }
    
        class ProcessMsgThread implements Runnable {
    
            private LinkedBlockingQueue<String> blockingQueue;
    
            public ProcessMsgThread(LinkedBlockingQueue receiveMsgQueue) {
                this.blockingQueue = receiveMsgQueue;
            }
    
            @Override
            public void run() {
    
                while (!Thread.currentThread().isInterrupted()) {
    
                    try {
                        String msg = blockingQueue.take();
                        // TODO do something, process msg
                        TimeUnit.SECONDS.sleep(10);
                        System.out.println("消息处理完成");
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
    
            }
        }
    
    }
    
    
    

    ClientDemo:

    
    import java.io.*;
    import java.net.Socket;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    public class ClientADemo {
    
        public static void main(String[] args) {
    
            Socket socket = null;
            try {
                socket = new Socket("127.0.0.1", 8081);
    
                while (true) {
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                    String sendMsg = "Hello this's clientA";
                    bw.write(sendMsg + "
    ");
                    bw.flush();
                    System.out.println("ClientA: 发送消息 " + sendMsg + " " + new Date());
    
                    TimeUnit.SECONDS.sleep(3);
    
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != socket) { socket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
    

    运行结果:

    Server:
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的建立连接请求 at[Sun Jun 14 12:23:54 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:23:54 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:23:57 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:00 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:03 CST 2019]
    消息处理完成
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:06 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:09 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:12 CST 2019]
    消息处理完成
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:15 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:18 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:21 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:24 CST 2019]
    消息处理完成
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:27 CST 2019]
    
    ClientA:
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:23:54 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:23:57 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:00 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:03 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:06 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:09 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:12 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:15 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:18 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:21 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:24 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:27 CST 2019
    

    可见,通过阻塞队列来辅助处理消息,不会影响server端接受消息。

    2. NIO

    在使用BIO通信的过程中,在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身也会避免频繁创建销毁线程的开销。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。

    NIO是一种同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架,对应 java.nio 包,提供了 Channel , Selector,Buffer等抽象。NIO中的N可以理解为Non-blocking,不单纯是New。它支持面向缓冲的,基于通道的I/O操作方法。 NIO提供了与传统BIO模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

    NIOServer

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class NIOServer {
    
        static Selector selector;
    
        public static void main(String[] args) {
    
            try {
    
                selector = Selector.open();
    
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false); // 设为非阻塞
                serverSocketChannel.bind(new InetSocketAddress(8085));
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //把连接事件注册到多路复用器上
    
                while (true) {
                    selector.select(); // 阻塞机制
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isAcceptable()) { // 连接事件
                            handleAccept(key);
                        } else if (key.isReadable()) {  // 读事件
                            handleRead(key);
                        }
                    }
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private static void handleAccept(SelectionKey key) {
    
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
    
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                String sendMsg = "Hi this is NIO server.";
                socketChannel.write(ByteBuffer.wrap(sendMsg.getBytes()));
                socketChannel.register(selector, SelectionKey.OP_READ); // 注册读事件
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private static void handleRead(SelectionKey key) {
    
            SocketChannel socketChannel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer =ByteBuffer.allocate(1024);
            try {
                socketChannel.read(byteBuffer);
                System.out.println("Server: receive mgs[" + new String(byteBuffer.array()) + "]");
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
    
    }
    
    

    NIOClient

    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class NIOClient {
    
        static Selector selector;
    
        public static void main(String[] args) {
    
            try {
                selector = Selector.open();
                SocketChannel socketChannel =SocketChannel.open();
                socketChannel.configureBlocking(false);
                socketChannel.connect(new InetSocketAddress("127.0.0.1", 8085));
                socketChannel.register(selector, SelectionKey.OP_CONNECT); // 注册连接事件
                while (true) {
                    selector.select();
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isConnectable()) { // 连接事件
                            handleConnect(key);
                        } else if (key.isReadable()){// 读取事件
                            handleRead(key);
                        }
                    }
    
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private static void handleConnect(SelectionKey key) throws IOException {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            if (socketChannel.isConnectionPending()) {
                socketChannel.finishConnect();
            }
            socketChannel.configureBlocking(false);
            String sendMsg = "Hi this's client.";
            socketChannel.write(ByteBuffer.wrap(sendMsg.getBytes()));
            socketChannel.register(selector, SelectionKey.OP_READ);// 注册读事件
        }
    
        private static void handleRead(SelectionKey key) throws IOException {
    
            SocketChannel socketChannel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            System.out.println("Client: receive msg[" + new String(byteBuffer.array()) + "]");
    
        }
    
    }
    

    多路复用器: Selector
    多路复用机制:多路指的时多个网络连接,复用指复用同一个线程。
    多路复用技术类似于使用多个鱼竿钓鱼,只有有鱼的时候鱼竿响动,我们才提起对应的鱼竿。

  • 相关阅读:
    基本MVVM 和 ICommand用法举例(转)
    WPF C# 命令的运行机制
    628. Maximum Product of Three Numbers
    605. Can Place Flowers
    581. Shortest Unsorted Continuous Subarray
    152. Maximum Product Subarray
    216. Combination Sum III
    448. Find All Numbers Disappeared in an Array
    268. Missing Number
    414. Third Maximum Number
  • 原文地址:https://www.cnblogs.com/Qkxh320/p/distributed_basic01_NIO.html
Copyright © 2011-2022 走看看