zoukankan      html  css  js  c++  java
  • 分布式架构基础 一、分布式架构的基石之远程通信协议

      分布式架构的特点是:服务的分布式(分布在不同的计算机节点)、以及远程通信实现数据的交互。
    那么就意味着,原本在一个war包下的:用户、订单、商品、库存等业务模块,按照业务维度拆分成用户服务、订单服务、商品服务、库存服务等独立的进程,部署在不同的计算机节点上。而拆分之后的服务,必然会涉及到远程通信的需要,比如用户服务,需要查询订单列表,则会调用订单服务的远程接口,获得订单信息。
    在这个过程中,会涉及到远程通信,所以远程通信的技术是整个分布式架构的一个基础,如果没有远程通信,那么分布式架构也就不存在了。

    对于内部服务通信来说,我们一般会采用RPC通信,而在RPC通信的实现中,必然会涉及到TCP传输协议、三次握手、四次挥手、TCP的四层网络模型、BIO、NIO、AIO、序列化、反序列化等。

    一、一次http请求通信的完整过程

    域名解析 -->
    发起TCP的3次握手 -->
    TCP连接建立后 发起http请求 -->
    服务器响应http请求,浏览器得到html代码 -->
    浏览器解析html代码,并请求html代码中的资源(如js、css、图片等) -->
    浏览器对页面进行渲染呈现给用户

    7层网络协议

    TCP三次握手、四次挥手

    为什么四次挥手
    TCP协议是一种面向连接的、可靠的、基于字节流的运输层通信协议。TCP是全双工模式,这就意味着,当主机1发出FIN报文段时,只是表示主机1已经没有数据要发送了,主机1告诉主机2,它的数据已经全部发送完毕了;但是,这个时候主机1还是可以接受来自主机2的数据;
    当主机2返回ACK报文段时,表示它已经知道主机1没有数据发送了,但是主机2还是可以发送数据到主机1的;
    当主机2也发送了FIN报文段时,这个时候就表示主机2也没有数据要发送了,就会告诉主机1,我也没有数据要发送了,之后彼此就会愉快的中断这次TCP连接。

    二、TCP/IP分层管理

    三、Java中使用TCP协议进行通信

    四、BIO、NIO机制

    BIO:同步阻塞式IO,服务器实现模式为一个连接一个线程,即客户端有socket连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善。
    NIO:同步非阻塞式IO,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。

    1. BIO

    a. 一个简单的通信样例

    先以一个简单的例子来演示进程或服务之间的通信:
    使用ServerSocket和Socket来演示一次简单的通信,端口号为8081,首先 server启动; client --发送数据至--> server; server接受; server --发送数据至--> client; client接收。
    ServerDemo

    import java.io.*;
    import java.net.InetAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class ServerDemo {
    
        public static void main(String[] args) {
    
            ServerSocket serverSocket = null;
            try {
    
                serverSocket = new ServerSocket(8081);
    
                Socket socket = serverSocket.accept();  // 阻塞
                InetAddress client = socket.getInetAddress();
                System.out.println(String.format("Server: 接收到来自 [%s] 的建立连接请求", client));
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();  // 阻塞
                System.out.println(String.format("Server: 接收到来自 [%s] 的消息 [%s]", client, receiveMsg));
    
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
                writer.write(sendMsg + "
    "); // 末尾加 
     来告诉通信对手本次消息已发送完成
                writer.flush();
                System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s]", client, sendMsg));
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != serverSocket) { serverSocket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
    

    ClientDemo

    import java.io.*;
    import java.net.Socket;
    
    public class ClientADemo {
    
        public static void main(String[] args) {
    
            Socket socket = null;
            try {
                socket = new Socket("127.0.0.1", 8081);
    
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hello this's clientA";
                bw.write(sendMsg + "
    ");
                bw.flush();
                System.out.println("ClientA: 发送消息 " + sendMsg);
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();  // 阻塞
                System.out.println("ClientA: 接收到回执 " + receiveMsg);
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != socket) { socket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    

    执行结果:

    server端:
    Server: 接收到来自 [/127.0.0.1] 的建立连接请求
    Server: 接收到来自 [/127.0.0.1] 的消息 [Hello this's clientA]
    ServerDemo: 发送回执至 [/127.0.0.1], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]]
    
    client端:
    ClientA: 发送消息 Hello this's clientA
    ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]
    

    b. socket的阻塞

    其实我们以debug方式启动上例中的server和client时会发现,当没有client建立连接,server在serverSocket.accept();处是处于阻塞状态的; 当client没有发消息,server在reader.readLine(); 读取消息时,也是处于阻塞状态的。
    下面以两个client连接server方式来演示通信的阻塞,模拟场景:
    创建两个客户端ClientA,ClientB,由ClientA建立与server的连接,clientA建立连接后等待10s再发送消息至客户端; 再此期间clientB与server通信;
    为了保证服务端Server在一次通信完成后继续通信,Server改为自旋。

    Server:

    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketAddress;
    import java.util.Date;
    
    public class ServerDemo {
    
        public static void main(String[] args) {
    
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(8081);
    
                while (!Thread.currentThread().isInterrupted()) { // 自旋
    
                    Socket socket = serverSocket.accept();  // A01. 第一次while循环内,ClientA先建立连接; A03. 当与clientA通信完毕,进入第二次while循环,才能收到ClientB建立的连接请求,开始与B的通信
                    SocketAddress client = socket.getRemoteSocketAddress();
                    System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));
    
                    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    String receiveMsg = reader.readLine();  // A02. 之后由于ClientA等待了10s才发消息,所以server读取不到ClientA的消息,阻塞了10s,之后才能继续执行
                    System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));
    
                    BufferedWriter writer = new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())));
                    String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
                    writer.write(sendMsg + " 
    ");  // 末尾加 
     来告诉通信对手本次消息已发送完成
                    writer.flush();
                    System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s] at[%s]", client, sendMsg, new Date()));
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != serverSocket) { serverSocket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    
    }
    

    ClientA:

    import java.io.*;
    import java.net.Socket;
    import java.util.concurrent.TimeUnit;
    import java.util.Date;
    
    public class ClientADemo {
    
        public static void main(String[] args) {
    
            Socket socket = null;
            try {
                socket = new Socket("127.0.0.1", 8081);
    
                TimeUnit.SECONDS.sleep(10);
    
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hello this's clientA";
                bw.write(sendMsg + "
    ");
                bw.flush();
                System.out.println("ClientA: 发送消息 " + sendMsg + " " + new Date());
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();
                System.out.println("ClientA: 接收到回执 " + receiveMsg + " " + new Date());
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != socket) { socket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    

    ClientB:

    import java.io.*;
    import java.net.Socket;
    import java.util.Date;
    
    public class ClientBDemo {
    
        public static void main(String[] args) {
    
            Socket socket = null;
            try {
                socket = new Socket("127.0.0.1", 8081);
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                String sendMsg = "Hello this's clientB";
                bw.write(sendMsg + "
    ");
                bw.flush();
                System.out.println("ClientB: 发送消息 " + sendMsg + " " + new Date());
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();
                System.out.println("ClientB: 接收到回执 " + receiveMsg + " " + new Date());
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != socket) { socket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
    

    执行结果:

    服务端:
    ServerDemo: 接收到来自 [/127.0.0.1:9892] 的建立连接请求 at[Sun Jun 14 10:57:13 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:9892] 的消息 [Hello this's clientA] at[Sun Jun 14 10:57:23 CST 2019]
    ServerDemo: 发送回执至 [/127.0.0.1:9892], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]] at[Sun Jun 14 10:57:23 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:9898] 的建立连接请求 at[Sun Jun 14 10:57:23 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:9898] 的消息 [Hello this's clientB] at[Sun Jun 14 10:57:23 CST 2019]
    ServerDemo: 发送回执至 [/127.0.0.1:9898], 消息为 [Hi this's server, has accept your's msg[Hello this's clientB]] at[Sun Jun 14 10:57:23 CST 2019]
    
    客户端A: 
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 10:57:23 CST 2019
    ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]  Sun Jun 14 10:57:23 CST 2019
    
    客户端B:
    ClientB: 发送消息 Hello this's clientB Sun Jun 14 10:57:14 CST 2019
    ClientB: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientB]  Sun Jun 14 10:57:23 CST 2019
    
    

    通过Server的运行结果可以看到,在57分13秒时收到A建立连接的请求,之后等待了10s到57分23时才收到A发出的请求之后完成与A的通信;在与A通信完成后,才继续执行了与B的通信;
    而B其实在57分14秒就已经把信息发送给了Server,等到57分23秒才完成这次通信。可见ServerSocket在处理请求时时阻塞并且串行的,这也就是所谓的BIO。
    而之所以服务端会等待10s才给B响应,是因为Server是在一个线程内执行的,Server在执行到String receiveMsg = reader.readLine(); 时会一直等待A发来的消息才会唤醒,所以不会accept B的建立连接请求。
    此时我们就可以进行些许优化,为每个socket单独建立一个线程,异步响应每个请求。

    c. 每个socket建立一个线程来执行

    一般我们使用线程池来对BIO进行优化。
    ServerDemo:

    
    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketAddress;
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ServerDemo {
    
        static ExecutorService pool = Executors.newFixedThreadPool(20);
    
        public static void main(String[] args) {
    
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(8081);
    
                while (!Thread.currentThread().isInterrupted()) { // 自旋
    
                    Socket socket = serverSocket.accept();  // 阻塞
    
                    pool.execute(new SocketThread(socket));
    
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != serverSocket) { serverSocket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
    class SocketThread implements Runnable {
    
        private Socket socket;
    
        public SocketThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
    
            try {
                SocketAddress client = socket.getRemoteSocketAddress();
                System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));
    
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String receiveMsg = reader.readLine();  // 阻塞
                System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));
    
                BufferedWriter writer = new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())));
                String sendMsg = "Hi this's server, has accept your's msg[" + receiveMsg + "]";
                writer.write(sendMsg + " 
    ");  // 末尾加 
     来告诉通信对手本次消息已发送完成
                writer.flush();
                System.out.println(String.format("ServerDemo: 发送回执至 [%s], 消息为 [%s] at[%s]", client, sendMsg, new Date()));
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    }
    

    我们为每个socket建立单独的线程来响应,这样每个socket的交互是异步的,也就不存在之前建立连接时的阻塞问题了。
    仍然执行上例子中的A、B客户端

    Server:
    ServerDemo: 接收到来自 [/127.0.0.1:10363] 的建立连接请求 at[Sun Jun 14 11:34:20 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:10369] 的建立连接请求 at[Sun Jun 14 11:34:22 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:10369] 的消息 [Hello this's clientB] at[Sun Jun 14 11:34:22 CST 2019]
    ServerDemo: 发送回执至 [/127.0.0.1:10369], 消息为 [Hi this's server, has accept your's msg[Hello this's clientB]] at[Sun Jun 14 11:34:22 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:10363] 的消息 [Hello this's clientA] at[Sun Jun 14 11:34:30 CST 2019]
    ServerDemo: 发送回执至 [/127.0.0.1:10363], 消息为 [Hi this's server, has accept your's msg[Hello this's clientA]] at[Sun Jun 14 11:34:30 CST 2019]
    
    ClientA:
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 11:34:30 CST 2019
    ClientA: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientA]  Sun Jun 14 11:34:30 CST 2019
    
    ClientB:
    ClientB: 发送消息 Hello this's clientB Sun Jun 14 11:34:22 CST 2019
    ClientB: 接收到回执 Hi this's server, has accept your's msg[Hello this's clientB]  Sun Jun 14 11:34:22 CST 2019
    

    可见ClientB无需等待Server与A通信完成后再与Server通信。

    d. 服务端每次接收到数据异步执行

    通过加入线程池的改造,我们使得每个socket可以单独与服务端进行通信,但是实际场景中可能Server在接收到client的消息后,需要进行相应的业务逻辑处理,也就是String receiveMsg = reader.readLine(); 之后的逻辑,如果这段逻辑处理起来耗时较长,那么下次client发送的数据就不能及时接受。此时也可以借助阻塞队列BlockIngQueue来帮助异步执行。

    模拟场景:
    ClientA每次3秒往Server发送一次消息;
    Server处理收到的消息需要10秒;每次server收到消息后直接放入阻塞队列总,由另一个线程异步处理,不影响下次消息的接受。

    ServerDemo :

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketAddress;
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class ServerDemo {
    
        private int port = 8081;
        private ExecutorService pool = Executors.newFixedThreadPool(20);
        private LinkedBlockingQueue<String> receiveMsgQueue = new LinkedBlockingQueue();
    
        public static void main(String[] args) {
    
            new ServerDemo();
    
        }
    
        public ServerDemo() {
    
            try {
                ServerSocket serverSocket = new ServerSocket(port);
    
                // 为每个客户端创建线程
                pool.execute(new ConnectorThread(serverSocket));
    
                // 客户端发来的消息,单独线程处理
                pool.execute(new ProcessMsgThread(receiveMsgQueue));
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    
        }
    
        class ConnectorThread implements Runnable {
    
            private ServerSocket serverSocket;
    
            public ConnectorThread(ServerSocket serverSocket) {
                this.serverSocket = serverSocket;
            }
    
            @Override
            public void run() {
    
                while (!Thread.currentThread().isInterrupted()) {
    
                    try {
                        Socket socket = serverSocket.accept();
                        pool.execute(new ReceiveMsgThread(socket));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
    
                }
    
            }
    
        }
    
        class ReceiveMsgThread implements Runnable {
    
            private Socket socket;
    
            public ReceiveMsgThread(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run() {
    
                try {
                    SocketAddress client = socket.getRemoteSocketAddress();
                    System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的建立连接请求 at[%s]", client, new Date()));
    
                    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    
                    while (!Thread.currentThread().isInterrupted()) {
                        String receiveMsg = reader.readLine();  // 阻塞
                        System.out.println(String.format("ServerDemo: 接收到来自 [%s] 的消息 [%s] at[%s]", client, receiveMsg, new Date()));
    
                        // 服务端处理消息需要20s,直接放入阻塞队列异步执行,然后继续接受下次消息
                        //TimeUnit.SECONDS.sleep(20);
                        receiveMsgQueue.put(receiveMsg);
    
                    }
    
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
        }
    
        class ProcessMsgThread implements Runnable {
    
            private LinkedBlockingQueue<String> blockingQueue;
    
            public ProcessMsgThread(LinkedBlockingQueue receiveMsgQueue) {
                this.blockingQueue = receiveMsgQueue;
            }
    
            @Override
            public void run() {
    
                while (!Thread.currentThread().isInterrupted()) {
    
                    try {
                        String msg = blockingQueue.take();
                        // TODO do something, process msg
                        TimeUnit.SECONDS.sleep(10);
                        System.out.println("消息处理完成");
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
    
            }
        }
    
    }
    
    
    

    ClientDemo:

    
    import java.io.*;
    import java.net.Socket;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    public class ClientADemo {
    
        public static void main(String[] args) {
    
            Socket socket = null;
            try {
                socket = new Socket("127.0.0.1", 8081);
    
                while (true) {
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                    String sendMsg = "Hello this's clientA";
                    bw.write(sendMsg + "
    ");
                    bw.flush();
                    System.out.println("ClientA: 发送消息 " + sendMsg + " " + new Date());
    
                    TimeUnit.SECONDS.sleep(3);
    
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != socket) { socket.close(); }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
    

    运行结果:

    Server:
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的建立连接请求 at[Sun Jun 14 12:23:54 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:23:54 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:23:57 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:00 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:03 CST 2019]
    消息处理完成
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:06 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:09 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:12 CST 2019]
    消息处理完成
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:15 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:18 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:21 CST 2019]
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:24 CST 2019]
    消息处理完成
    ServerDemo: 接收到来自 [/127.0.0.1:11049] 的消息 [Hello this's clientA] at[Sun Jun 14 12:24:27 CST 2019]
    
    ClientA:
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:23:54 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:23:57 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:00 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:03 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:06 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:09 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:12 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:15 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:18 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:21 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:24 CST 2019
    ClientA: 发送消息 Hello this's clientA Sun Jun 14 12:24:27 CST 2019
    

    可见,通过阻塞队列来辅助处理消息,不会影响server端接受消息。

    2. NIO

    在使用BIO通信的过程中,在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身也会避免频繁创建销毁线程的开销。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。

    NIO是一种同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架,对应 java.nio 包,提供了 Channel , Selector,Buffer等抽象。NIO中的N可以理解为Non-blocking,不单纯是New。它支持面向缓冲的,基于通道的I/O操作方法。 NIO提供了与传统BIO模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

    NIOServer

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class NIOServer {
    
        static Selector selector;
    
        public static void main(String[] args) {
    
            try {
    
                selector = Selector.open();
    
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false); // 设为非阻塞
                serverSocketChannel.bind(new InetSocketAddress(8085));
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //把连接事件注册到多路复用器上
    
                while (true) {
                    selector.select(); // 阻塞机制
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isAcceptable()) { // 连接事件
                            handleAccept(key);
                        } else if (key.isReadable()) {  // 读事件
                            handleRead(key);
                        }
                    }
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private static void handleAccept(SelectionKey key) {
    
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
    
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                String sendMsg = "Hi this is NIO server.";
                socketChannel.write(ByteBuffer.wrap(sendMsg.getBytes()));
                socketChannel.register(selector, SelectionKey.OP_READ); // 注册读事件
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private static void handleRead(SelectionKey key) {
    
            SocketChannel socketChannel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer =ByteBuffer.allocate(1024);
            try {
                socketChannel.read(byteBuffer);
                System.out.println("Server: receive mgs[" + new String(byteBuffer.array()) + "]");
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
    
    }
    
    

    NIOClient

    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class NIOClient {
    
        static Selector selector;
    
        public static void main(String[] args) {
    
            try {
                selector = Selector.open();
                SocketChannel socketChannel =SocketChannel.open();
                socketChannel.configureBlocking(false);
                socketChannel.connect(new InetSocketAddress("127.0.0.1", 8085));
                socketChannel.register(selector, SelectionKey.OP_CONNECT); // 注册连接事件
                while (true) {
                    selector.select();
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isConnectable()) { // 连接事件
                            handleConnect(key);
                        } else if (key.isReadable()){// 读取事件
                            handleRead(key);
                        }
                    }
    
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private static void handleConnect(SelectionKey key) throws IOException {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            if (socketChannel.isConnectionPending()) {
                socketChannel.finishConnect();
            }
            socketChannel.configureBlocking(false);
            String sendMsg = "Hi this's client.";
            socketChannel.write(ByteBuffer.wrap(sendMsg.getBytes()));
            socketChannel.register(selector, SelectionKey.OP_READ);// 注册读事件
        }
    
        private static void handleRead(SelectionKey key) throws IOException {
    
            SocketChannel socketChannel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            System.out.println("Client: receive msg[" + new String(byteBuffer.array()) + "]");
    
        }
    
    }
    

    多路复用器: Selector
    多路复用机制:多路指的时多个网络连接,复用指复用同一个线程。
    多路复用技术类似于使用多个鱼竿钓鱼,只有有鱼的时候鱼竿响动,我们才提起对应的鱼竿。

  • 相关阅读:
    数字滤波器
    PCL点云库:Kd树
    KNN算法与Kd树
    分布式锁与实现(一)——基于Redis实现
    Redis分布式锁的正确实现方式
    redis常用命令大全
    使用 Redis 实现排行榜功能
    RabbitMQ下的生产消费者模式与订阅发布模式
    java高级精讲之高并发抢红包~揭开Redis分布式集群与Lua神秘面纱
    Java进阶面试题大集合-offer不再是问题
  • 原文地址:https://www.cnblogs.com/Qkxh320/p/distributed_basic01_NIO.html
Copyright © 2011-2022 走看看