分布式-通信(NIO&BIO&网络模型&零拷贝)
前面聊到redis和rabbit,那我们是如何他他们进行通信的呢?所以想聊聊这个问题。在我们分布式架构中,一个重要的点就是通信,客户端和服务端的通信、微服务之间、中间件。而通信直接影响到用户的体验,比如我的服务器只能支持100个用户同时和我通信,而这个时候,有1000个用户,那剩下的900的用户,肯定要等待。所以今天会聊到关于通信的一些东西:BIO、NIO、TCP挥手和握手、零拷贝、以及七层网络模型。
Java中通信-Socket
java中提供一种通信模型名为Socket, 我们可以通过Socket去进行一些网络通信。
服务端
public class SocketServer { public static void main(String[] args) { ServerSocket serverSocket= null; try { serverSocket = new ServerSocket(8080); //这里是阻塞等待 Socket socket=serverSocket.accept(); //对客户端进行信息的接受 BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream())); String readLine = bufferedReader.readLine(); System.out.println("接受客户端消息:"+readLine); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我收到了信息 "); bufferedWriter.flush(); } catch (IOException e) { e.printStackTrace(); } } }客户端
public class SocketClient { public static void main(String[] args) { try { Socket socket= new Socket("localhost",8080); //对服务端进行消息的发送 BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客户端一,发送消息! "); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream())); String result = bufferedReader.readLine(); System.out.println("服务端返回消息:"+result); } catch (IOException e) { e.printStackTrace(); } } }socket的简单通信整体流程:
- 服务端:在服务端建立一个监听(这个时候服务端是一个阻塞状态)
- 客户端:建立一个连接,这个时候,服务端的阻塞被唤醒,得到一个socket。然后客户端通过OutputStream进行传输
- 服务端:通过inputStream去获取信息,因为tcp是一个双工的,所以可以通过一个OutputStream去写回到客户端
- 客户端:通过inputStream获取服务端的返回数据
网络分层(当他们建立连接的时候,底层涉及到这个点)不管是tcp还是udp他们的传输都会涉及到七层的网络模型
- 应用层:为我们的应用程序提供服务
- 表示层:对数据进行格式化转换、加密
- 会话层:建立、管理、以及维护会话
- 传输层:建立、管理和维护端到端的连接
- 网络层:IP选址及路由选择
- 数据链路层:提供介质访问和链路管理
- 物理层:底层的物理传输
负载和网络分层
- 二层负载是利用的Mac头(数据链路层)、
- 三层负载是Ip(网络层)我们可以在第三层对ip进行修改进行数据包的路由、
- 四层负载(传输层)TCP属于这一层,我们利用出传输层中的ip和端口号去进行一个负载均衡的计算(Nginx其实就是基于这一层进行的负载)
- 七层负载(应用层):根据URL进行负载,就行controller一样
对于TCP/IP有四层和上面的这七层相对应:为什么叫TCP/IP?因为TCP是传输层,在我们的网络层(IP)层之上
- 应用层:应用层、表示层、会话层
- 传输层:传输层
- 网络层:网络层
- 网络接口层:数据链路层、物理层
当客户端发送一个tcp请求会经过一下几个步骤:
- 发起一个请求,会把tcp头和数据报文放在一起
- 向下走会增加一个ip头,拼接到上面的数据中
- 接着拼接Mac头(服务端的网卡地址),当服务端进行签收的时候会去确认是不是和自己Mac地址一样
- 这些数据就都变成二进制进行传输
服务端经过的步骤
- 接收到二进制数据
- 向上传输对Mac头进行解析,这一步拿传递过来的Mac地址和当前的Mac地址进行匹配,如果匹配继续传输
- 上面一层对Ip头进行解析,如果ip是自己则向上传递,不是则转发到别的地址
- 最上面一层获取tcp头,去匹配服务端的进程,进程中去获取数据报文进行处理。
问题:我们怎么知道服务端的Mac地址呢?
那就是ARP协议(网络层),流程为,他将含目标IP地址的ARP请求广播到网络上的所有主机,想符合的主机则会返回Mac信息,从而得知Mac地址。IP地址表示服务器所在的位置,而Mac地址表示的是唯一的身份证明,每个主机都是唯一的(Mac是刻在网卡上的)。
为什么我们在实际的网络过程中考虑的是Tcp而不是Udp?
因为TCP网络传输的可靠性:
- 三次握手
- 我们的两个节点通信,需要通过三个数据包来确定连接的建立。用服务器A和B举个例子
- 服务器A给B说:我要和你通信 (第一次握手)这一步是确定服务器B是否是可使用状态,如果不可用服务器A会不断重试
- 服务器B相应:一切正常(第二次握手) 这一步是告诉服务器A我这边可以进行连接,防止服务器A不断的重试。
- 服务器A说:那我们开始通信吧(第三次握手)服务器A收到了服务器B的返回消息,他需要告诉服务器B他收到了消息,以为不排除在服务器A在收到B的相应后挂了的可能性,
- 流量控制
- 断开机制(四次挥手):
- A发送关闭消息给B(第一次挥手)这个时候B就知道A没有消息要发送了
- B发送消息给A说:我已经收到关闭消息了,但是等我通知你后,你再进行关闭(第二次挥手)这个时候不直接关闭,因为B可能还有数据没有处理完成,
- B发送消息给A说可以我处理数据完成,可以进行关闭。(第三次挥手)
- A给B说我关闭连接了(第四次挥手)
IO阻塞怎么办【BIO】?
上面我们运行的代码是一个客户端对一个服务端,(服务端在等待获取数据的时候是阻塞状态)但是在真实的场景中,不可能只有一个用户连接服务器,那是不是可以当一个请求过来的时候,开启线程去处理
服务端
View Codepublic class ServerSocketDemo { static ExecutorService executorService= Executors.newFixedThreadPool(20); public static void main(String[] args) { ServerSocket serverSocket=null; try { //localhost: 8080 serverSocket=new ServerSocket(8080); while(true) { Socket socket = serverSocket.accept(); //监听客户端连接(连接阻塞) System.out.println(socket.getPort()); executorService.execute(new SocketThread(socket)); //异步 } } catch (IOException e) { e.printStackTrace(); } finally { //TODO if(serverSocket!=null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } public class SocketThread implements Runnable{ private Socket socket; public SocketThread(Socket socket) { this.socket = socket; } @Override public void run() { try { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//输入流 String s = bufferedReader.readLine(); //被阻塞了 String clientStr = s; //读取客户端的一行数据 System.out.println("接收到客户端的信息:" + clientStr); //写回去 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我收到了信息 "); bufferedWriter.flush(); bufferedReader.close(); bufferedWriter.close(); }catch (Exception e){ } } }客户端
View Codepublic class SocketClientDemo1 { public static void main(String[] args) { try { Socket socket=new Socket("localhost",8080); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客户端1,发送了一个消息 "); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//输入流 String serverLine=bufferedReader.readLine(); //读取服务端返回的数据 System.out.println("服务端返回的数据:"+serverLine); } catch (IOException e) { e.printStackTrace(); } } } public class SocketClientDemo { public static void main(String[] args) { try { Socket socket=new Socket("localhost",8080); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客户端,发送了一个消息 "); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//输入流 String serverLine=bufferedReader.readLine(); //读取服务端返回的数据(被阻塞了) System.out.println("服务端返回的数据:"+serverLine); } catch (IOException e) { e.printStackTrace(); } } }流程如下:
当客户端传递请求到服务端,首先在阻塞在accept这里,然后开启线程去处理IO请求,这里可会阻塞,那线程的数量如何控制?如果连接数大于线程数,势必会有一些请求丢失,那如何提升连接数?这就出现了非阻塞IO(NIO)
有两个阻塞的地方:IO阻塞、连接阻塞
非阻塞IO(NIO)
非阻塞指的是:连接非阻塞、IO非阻塞
服务端:
View Codepublic class NewIoServer1 { public static void main(String[] args) { try { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); while (true){ SocketChannel socketChannel=serverSocketChannel.accept(); if (socketChannel!=null){ ByteBuffer byteBuffer=ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); //读取写到的数据 System.out.println(new String(byteBuffer.array())); //翻转管道 byteBuffer.flip(); socketChannel.write(byteBuffer); }else{ Thread.sleep(1000); System.out.println("连接未就绪"); } } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }客户端
View Codepublic class NioClient { public static void main(String[] args) { try { SocketChannel socketChannel=SocketChannel.open(); /*socketChannel.configureBlocking(false);*/ socketChannel.connect(new InetSocketAddress("localhost",8080)); //如果连接已经建立 if (socketChannel.isConnectionPending()){ //完成连接 socketChannel.finishConnect(); } //这里并不意味着连接已经建立 ByteBuffer byteBuffer=ByteBuffer.allocate(1024); byteBuffer.put("Hi,I am client".getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); //读取服务端返回的数据 (这里其实是阻塞的) // 有一个坑,就是如果上面设置了非阻塞,下面这里在等待服务端返回结果,服务端是不会返回结果的,因为不阻塞的话,这里的连接就已经关闭了 // 所以想要收到服务端的返回结果就注释上面的configureBlocking int read = socketChannel.read(byteBuffer); if (read>0){ System.out.println("服务端的数据::"+new String(byteBuffer.array())); } else { System.out.println("服务端没有数据返回"); } } catch (IOException e) { e.printStackTrace(); } } }
我们可以看出服务端在不断的询问连接,但是采用这种轮询的方式,会不会很消耗性能?所以这里就引出了多路复用机制。
多路复用【selector】
多路复用指的是一个线程管理多个通道,那一个线程就是我们的selector,简单来说就是把channel注册到selector上,注册的事件分为,读事件、写事件、连接事件、接收事件、一旦有一个事件通知的话,我们的selector就由阻塞变成非阻塞,这个时候他就拿到对应的通道进行处理,当他处理的时候一定是一个可以执行的通道,不像我们上图展示的那种(出现连接未就绪的情况发生).流程如下:
代码流程如下:
【客户端】:注册一个连接时间到他的selector中,客户端的selector发现有一个连接事件,然后就处理这个事件发送消息到服务端,并且注册一个读的事件
【服务端】:注册一个接受事件,当客户端发送一个消息过来后,服务端的selector就注册一个接受事件,在这个事件中他给客户端发送一个【收到】的消息,并且也注册一个读的事件。他的selector发现了这个读的事件后就开始读取服务端传递过来的数据。
【客户端】:selector发现了读的事件后,就开始读取服务端传递过来的【收到】的信息。
服务端:
View Codepublic class NIOServer { //多路复用 static Selector selector; public static void main(String[] args) { try { //打开多路复用 selector= Selector.open(); ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); //这是设置非阻塞 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); //把接收事件注册到多路复用器上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true){ //只有事件到达的时候他才会被唤醒,否则是阻塞状态。 selector.select(); //这里是所有可以使用的channel,下面对这些可以使用的事件进行轮询操作 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey next = iterator.next(); // 一旦操作就对把事件进行移除 iterator.remove(); if (next.isAcceptable()){//写事件 copeWithAccept(next); }else if (next.isReadable()){//读事件 copeWithRead(next); } } } } catch (IOException e) { e.printStackTrace(); } } //处理写事件 private static void copeWithAccept(SelectionKey selectionKey){ //可以使用的channel ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); try { //这里就是接受一个连接 SocketChannel accept = channel.accept(); //打开非阻塞 accept.configureBlocking(false); //注册一个事件(因为上面写了一些东西,现在读的话也要注册一个读的事件) //这个时候上面轮询的时候就发现有一个读的事件准备就绪了 accept.register(selector,SelectionKey.OP_READ); accept.write(ByteBuffer.wrap("Hello,I am server".getBytes())); } catch (IOException e) { e.printStackTrace(); } } //处理读事件 private static void copeWithRead(SelectionKey selectionKey){ //这里拿到的通道就是上面注册(copeWithAccept)的要读的通道 SocketChannel socketChannel= (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); try { socketChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array())); } catch (IOException e) { e.printStackTrace(); } } }客户端:
View Codepublic class NioClient { //多路复用 static Selector selector; public static void main(String[] args) { try { //打开多路复用 selector=Selector.open(); SocketChannel socketChannel=SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("localhost",8080)); //注册一个连接事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); while (true){ //还是当有事件发生的时候才触发 selector.select(); //一样,轮询所有注册的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey next = iterator.next(); iterator.remove(); if (next.isConnectable()){//连接事件 copeWithAccept(next); }else if (next.isReadable()){//读事件 copeWithRead(next); } } } } catch (IOException e) { e.printStackTrace(); } } //处理连接事件 private static void copeWithAccept(SelectionKey selectionKey) throws IOException { //可以使用的channel SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.configureBlocking(false); socketChannel.write(ByteBuffer.wrap("Hello Serve,I am client".getBytes())); //注册一个读取的事件 socketChannel.register(selector,SelectionKey.OP_READ); } //处理读事件 private static void copeWithRead(SelectionKey selectionKey) throws IOException { //这里拿到的通道就是上面注册(copeWithAccept)的要读的通道 SocketChannel socketChannel= (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); System.out.println("client receive:"+new String(byteBuffer.array())); } }
零拷贝
当我们要操作一个文件发送给其他服务器的时候,有三次拷贝
- 把文件拷贝到内核空间
- 然后从内核空间拷贝到用户空间
- 然后再从用户空间把这个数据拷贝到内核空间,通过网卡发送到其他的服务器上去
零拷贝的意思是,我们不经过用户空间,直接从磁盘的内核空间进行发送。后面我们聊Netty的时候会聊到这。
实现方式:
- 把内核空间和用户空间映射在一起(MMAP)
- 使用现成的API