zoukankan      html  css  js  c++  java
  • 分布式-通信(NIO&BIO&网络模型&零拷贝)

    分布式-通信(NIO&BIO&网络模型&零拷贝)

    前面聊到redis和rabbit,那我们是如何他他们进行通信的呢?所以想聊聊这个问题。在我们分布式架构中,一个重要的点就是通信,客户端和服务端的通信、微服务之间、中间件。而通信直接影响到用户的体验,比如我的服务器只能支持100个用户同时和我通信,而这个时候,有1000个用户,那剩下的900的用户,肯定要等待。所以今天会聊到关于通信的一些东西:BIO、NIO、TCP挥手和握手、零拷贝、以及七层网络模型。

     Java中通信-Socket

     java中提供一种通信模型名为Socket, 我们可以通过Socket去进行一些网络通信。

    服务端

    public class SocketServer  {
        public static void main(String[] args) {
            ServerSocket serverSocket= null;
            try {
                serverSocket = new ServerSocket(8080);
                //这里是阻塞等待
                Socket socket=serverSocket.accept();
                //对客户端进行信息的接受
                BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String readLine = bufferedReader.readLine();
                System.out.println("接受客户端消息:"+readLine);
                BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("我收到了信息
    ");
                bufferedWriter.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    客户端

    public class SocketClient {
        public static void main(String[] args) {
            try {
                Socket socket= new Socket("localhost",8080);
                //对服务端进行消息的发送
                BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("我是客户端一,发送消息!
    ");
                bufferedWriter.flush();
                BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String result = bufferedReader.readLine();
                System.out.println("服务端返回消息:"+result);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    socket的简单通信整体流程:

    • 服务端:在服务端建立一个监听(这个时候服务端是一个阻塞状态)
    • 客户端:建立一个连接,这个时候,服务端的阻塞被唤醒,得到一个socket。然后客户端通过OutputStream进行传输
    • 服务端:通过inputStream去获取信息,因为tcp是一个双工的,所以可以通过一个OutputStream去写回到客户端
    • 客户端:通过inputStream获取服务端的返回数据

    网络分层(当他们建立连接的时候,底层涉及到这个点)不管是tcp还是udp他们的传输都会涉及到七层的网络模型

    • 应用层:为我们的应用程序提供服务
    • 表示层:对数据进行格式化转换、加密
    • 会话层:建立、管理、以及维护会话
    • 传输层:建立、管理和维护端到端的连接
    • 网络层:IP选址及路由选择
    • 数据链路层:提供介质访问和链路管理
    • 物理层:底层的物理传输

    负载和网络分层

    • 二层负载是利用的Mac头(数据链路层)、
    • 三层负载是Ip(网络层)我们可以在第三层对ip进行修改进行数据包的路由、
    • 四层负载(传输层)TCP属于这一层,我们利用出传输层中的ip和端口号去进行一个负载均衡的计算(Nginx其实就是基于这一层进行的负载)
    • 七层负载(应用层):根据URL进行负载,就行controller一样

    对于TCP/IP有四层和上面的这七层相对应:为什么叫TCP/IP?因为TCP是传输层,在我们的网络层(IP)层之上

    • 应用层:应用层、表示层、会话层
    • 传输层:传输层
    • 网络层:网络层
    • 网络接口层:数据链路层、物理层

    当客户端发送一个tcp请求会经过一下几个步骤:

    • 发起一个请求,会把tcp头和数据报文放在一起
    • 向下走会增加一个ip头,拼接到上面的数据中
    • 接着拼接Mac头(服务端的网卡地址),当服务端进行签收的时候会去确认是不是和自己Mac地址一样 
    • 这些数据就都变成二进制进行传输

    服务端经过的步骤

    • 接收到二进制数据
    • 向上传输对Mac头进行解析,这一步拿传递过来的Mac地址和当前的Mac地址进行匹配,如果匹配继续传输
    • 上面一层对Ip头进行解析,如果ip是自己则向上传递,不是则转发到别的地址
    • 最上面一层获取tcp头,去匹配服务端的进程,进程中去获取数据报文进行处理。

    问题:我们怎么知道服务端的Mac地址呢?

    那就是ARP协议(网络层),流程为,他将含目标IP地址的ARP请求广播到网络上的所有主机,想符合的主机则会返回Mac信息,从而得知Mac地址。IP地址表示服务器所在的位置,而Mac地址表示的是唯一的身份证明,每个主机都是唯一的(Mac是刻在网卡上的)。

    为什么我们在实际的网络过程中考虑的是Tcp而不是Udp?

    因为TCP网络传输的可靠性:

    • 三次握手
      • 我们的两个节点通信,需要通过三个数据包来确定连接的建立。用服务器A和B举个例子
        • 服务器A给B说:我要和你通信 (第一次握手这一步是确定服务器B是否是可使用状态,如果不可用服务器A会不断重试
        • 服务器B相应:一切正常(第二次握手 这一步是告诉服务器A我这边可以进行连接,防止服务器A不断的重试。
        • 服务器A说:那我们开始通信吧(第三次握手)服务器A收到了服务器B的返回消息,他需要告诉服务器B他收到了消息,以为不排除在服务器A在收到B的相应后挂了的可能性,
    • 流量控制
    • 断开机制(四次挥手)
      • A发送关闭消息给B(第一次挥手这个时候B就知道A没有消息要发送了
      • B发送消息给A说:我已经收到关闭消息了,但是等我通知你后,你再进行关闭(第二次挥手这个时候不直接关闭,因为B可能还有数据没有处理完成
      • B发送消息给A说可以我处理数据完成,可以进行关闭。第三次挥手
      • A给B说我关闭连接(第四次挥手)

     IO阻塞怎么办【BIO】?

    上面我们运行的代码是一个客户端对一个服务端,(服务端在等待获取数据的时候是阻塞状态)但是在真实的场景中,不可能只有一个用户连接服务器,那是不是可以当一个请求过来的时候,开启线程去处理

    服务端

    public class ServerSocketDemo {
    
        static ExecutorService executorService= Executors.newFixedThreadPool(20);
    
        public static void main(String[] args) {
            ServerSocket serverSocket=null;
            try {
                //localhost: 8080
                serverSocket=new ServerSocket(8080);
                while(true) {
                    Socket socket = serverSocket.accept(); //监听客户端连接(连接阻塞)
                    System.out.println(socket.getPort());
                    executorService.execute(new SocketThread(socket)); //异步
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                //TODO
                if(serverSocket!=null){
                    try {
                        serverSocket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    public class SocketThread implements Runnable{
    
        private Socket socket;
    
        public SocketThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//输入流
                String s = bufferedReader.readLine(); //被阻塞了
                String clientStr = s; //读取客户端的一行数据
                System.out.println("接收到客户端的信息:" + clientStr);
                //写回去
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("我收到了信息
    ");
                bufferedWriter.flush();
    
                bufferedReader.close();
                bufferedWriter.close();
            }catch (Exception e){
    
            }
        }
    }
    View Code

    客户端

    public class SocketClientDemo1 {
        public static void main(String[] args) {
            try {
                Socket socket=new Socket("localhost",8080);
                BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("我是客户端1,发送了一个消息
    ");
                bufferedWriter.flush();
                BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//输入流
                String serverLine=bufferedReader.readLine(); //读取服务端返回的数据
                System.out.println("服务端返回的数据:"+serverLine);
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public class SocketClientDemo {
        public static void main(String[] args) {
            try {
                Socket socket=new Socket("localhost",8080);
                BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("我是客户端,发送了一个消息
    ");
                bufferedWriter.flush();
                BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//输入流
                String serverLine=bufferedReader.readLine(); //读取服务端返回的数据(被阻塞了)
                System.out.println("服务端返回的数据:"+serverLine);
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

     流程如下:

    当客户端传递请求到服务端,首先在阻塞在accept这里,然后开启线程去处理IO请求,这里可会阻塞,那线程的数量如何控制?如果连接数大于线程数,势必会有一些请求丢失,那如何提升连接数?这就出现了非阻塞IO(NIO)

    有两个阻塞的地方IO阻塞、连接阻塞

     非阻塞IO(NIO)

    非阻塞指的是:连接非阻塞、IO非阻塞

    服务端:

    public class NewIoServer1 {
        public static void main(String[] args) {
            try {
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.socket().bind(new InetSocketAddress(8080));
                while (true){
                    SocketChannel socketChannel=serverSocketChannel.accept();
                    if (socketChannel!=null){
                        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                        socketChannel.read(byteBuffer);
                        //读取写到的数据
                        System.out.println(new String(byteBuffer.array()));
                        //翻转管道
                        byteBuffer.flip();
                        socketChannel.write(byteBuffer);
                    }else{
                        Thread.sleep(1000);
                        System.out.println("连接未就绪");
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }
    View Code

    客户端

    public class NioClient {
        public static void main(String[] args)  {
            try {
                SocketChannel socketChannel=SocketChannel.open();
                /*socketChannel.configureBlocking(false);*/
                socketChannel.connect(new InetSocketAddress("localhost",8080));
                //如果连接已经建立
                if (socketChannel.isConnectionPending()){
                    //完成连接
                    socketChannel.finishConnect();
                }
                //这里并不意味着连接已经建立
                ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                byteBuffer.put("Hi,I am client".getBytes());
                byteBuffer.flip();
                socketChannel.write(byteBuffer);
                byteBuffer.clear();
                //读取服务端返回的数据 (这里其实是阻塞的)
                // 有一个坑,就是如果上面设置了非阻塞,下面这里在等待服务端返回结果,服务端是不会返回结果的,因为不阻塞的话,这里的连接就已经关闭了
                // 所以想要收到服务端的返回结果就注释上面的configureBlocking
                int read = socketChannel.read(byteBuffer);
                if (read>0){
                    System.out.println("服务端的数据::"+new String(byteBuffer.array()));
                }
                else {
                    System.out.println("服务端没有数据返回");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    }
    View Code

     

     我们可以看出服务端在不断的询问连接,但是采用这种轮询的方式,会不会很消耗性能?所以这里就引出了多路复用机制。

    多路复用【selector】

    多路复用指的是一个线程管理多个通道,那一个线程就是我们的selector,简单来说就是把channel注册到selector上,注册的事件分为,读事件、写事件、连接事件、接收事件、一旦有一个事件通知的话,我们的selector就由阻塞变成非阻塞,这个时候他就拿到对应的通道进行处理,当他处理的时候一定是一个可以执行的通道,不像我们上图展示的那种(出现连接未就绪的情况发生).流程如下:

    代码流程如下:

    【客户端】:注册一个连接时间到他的selector中,客户端的selector发现有一个连接事件,然后就处理这个事件发送消息到服务端,并且注册一个读的事件

    【服务端】:注册一个接受事件,当客户端发送一个消息过来后,服务端的selector就注册一个接受事件,在这个事件中他给客户端发送一个【收到】的消息,并且也注册一个读的事件。他的selector发现了这个读的事件后就开始读取服务端传递过来的数据。

    【客户端】:selector发现了读的事件后,就开始读取服务端传递过来的【收到】的信息。

    服务端:

    public class NIOServer {
        //多路复用
        static Selector selector;
        public static void main(String[] args) {
            try {
                //打开多路复用
                selector= Selector.open();
                ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
                //这是设置非阻塞
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.socket().bind(new InetSocketAddress(8080));
                //把接收事件注册到多路复用器上
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                while (true){
                    //只有事件到达的时候他才会被唤醒,否则是阻塞状态。
                    selector.select();
                    //这里是所有可以使用的channel,下面对这些可以使用的事件进行轮询操作
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey next = iterator.next();
                        // 一旦操作就对把事件进行移除
                        iterator.remove();
                        if (next.isAcceptable()){//写事件
                            copeWithAccept(next);
                        }else if (next.isReadable()){//读事件
                            copeWithRead(next);
                        }
                    }
                }
    
            } catch (IOException  e) {
                e.printStackTrace();
            }
        }
    
        //处理写事件
        private static   void  copeWithAccept(SelectionKey selectionKey){
            //可以使用的channel
            ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
            try {
                //这里就是接受一个连接
                SocketChannel accept = channel.accept();
                //打开非阻塞
                accept.configureBlocking(false);
                //注册一个事件(因为上面写了一些东西,现在读的话也要注册一个读的事件)
                //这个时候上面轮询的时候就发现有一个读的事件准备就绪了
                accept.register(selector,SelectionKey.OP_READ);
                accept.write(ByteBuffer.wrap("Hello,I am server".getBytes()));
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
        //处理读事件
        private  static void  copeWithRead(SelectionKey selectionKey){
            //这里拿到的通道就是上面注册(copeWithAccept)的要读的通道
            SocketChannel socketChannel= (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
            try {
               socketChannel.read(byteBuffer);
                System.out.println(new String(byteBuffer.array()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

    客户端:

    public class NioClient {
        //多路复用
        static Selector selector;
        public static void main(String[] args)  {
            try {
                //打开多路复用
                selector=Selector.open();
                SocketChannel socketChannel=SocketChannel.open();
                socketChannel.configureBlocking(false);
                socketChannel.connect(new InetSocketAddress("localhost",8080));
                //注册一个连接事件
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
                while (true){
                    //还是当有事件发生的时候才触发
                    selector.select();
                    //一样,轮询所有注册的事件
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey next = iterator.next();
                        iterator.remove();
                        if (next.isConnectable()){//连接事件
                            copeWithAccept(next);
                        }else if (next.isReadable()){//读事件
                            copeWithRead(next);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
        //处理连接事件
        private static   void  copeWithAccept(SelectionKey selectionKey) throws IOException {
            //可以使用的channel
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            if (socketChannel.isConnectionPending()) {
                socketChannel.finishConnect();
            }
            socketChannel.configureBlocking(false);
            socketChannel.write(ByteBuffer.wrap("Hello Serve,I am  client".getBytes()));
            //注册一个读取的事件
            socketChannel.register(selector,SelectionKey.OP_READ);
        }
        //处理读事件
        private  static void  copeWithRead(SelectionKey selectionKey) throws IOException {
            //这里拿到的通道就是上面注册(copeWithAccept)的要读的通道
            SocketChannel socketChannel= (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            System.out.println("client receive:"+new String(byteBuffer.array()));
        }
    }
    View Code

     零拷贝

    当我们要操作一个文件发送给其他服务器的时候,三次拷贝

    • 把文件拷贝到内核空间
    • 然后从内核空间拷贝到用户空间
    • 然后再从用户空间把这个数据拷贝到内核空间,通过网卡发送到其他的服务器上去

    零拷贝的意思是,我们不经过用户空间,直接从磁盘的内核空间进行发送。后面我们聊Netty的时候会聊到这。

    实现方式:

    • 把内核空间和用户空间映射在一起(MMAP)
    • 使用现成的API

  • 相关阅读:
    Hadoop_33_Hadoop HA的搭建
    Hadoop_32_HDFS高可用机制
    Hadoop_31_MapReduce参数优化
    Hadoop_30_MapReduce_多job串联
    Hadoop_29_MapReduce_计数器应用
    Hadoop_28_MapReduce_自定义 inputFormat
    Hadoop_27_MapReduce_运营商原始日志增强(自定义OutputFormat)
    Hadoop_26_MapReduce_Reduce端使用GroupingComparator求同一订单中最大金额的订单
    Hadoop_25_MapReduce实现日志清洗程序
    干货 | 剑指offer系列文章汇总
  • 原文地址:https://www.cnblogs.com/UpGx/p/15139065.html
Copyright © 2011-2022 走看看