zoukankan      html  css  js  c++  java
  • 分布式开发

    概念

    网络编程知识

    1. DNS域名系统(属于应用层):用于将域名解析成IP(单域名解析多IP可实现负载均衡)
    2. IP网际互连协议:用于实现广域网中的数据传输(ip是根据网络拓扑分配的,解决我在哪的问题)
    3. MAC:用于实现局域网中的数据传输(mac是由网卡制造商分配的,解决我是谁的问题)
    4. ARP地址解析协议:映射Mac地址与IP地址的协议 - 核心①映射缓存 ②ARP请求广播(用于构建映射缓存)
    5. TCP协议:三次握手和四次挥手
    6. SSL安全套接字协议:加密 ①客户端hello ②服务端hello+证书+验证性加密数据 ③客户端验证 + 密钥交换 + 验证性加密数据 ④服务端验证
    7. HTTP协议(应用层协议):

    Socket 和 IO多路复用

    Socket编程 - BIO(同步阻塞IO):每一个连接对应一个线程,执行Socket的读写操作均会阻塞线程
    SocketChannel编程 - NIO(同步非阻塞IO):轮询 - 每一个请求对应一个线程,线程会阻塞在selector.select(),当出现活跃请求时,需轮询查找活跃请求并处理,O(n) 同步IO(需等待IO完成)
    AsynchronousSocketChannel编程 - AIO(异步非阻塞IO):回调 - 每一个有效请求对应一个线程,当出现活跃请求时,直接调用请求对应的回调方法实现处理,O(1) 异步IO(Future实现)

    select: 当事件发生,但不知道具体是哪个流,通过轮询查找,时间复杂度O(N)
    poll: 作用等同于select,链表结构存储(无限大),通过轮询查找, 时间复杂度O(N)
    epoll: 事件驱动,callback方式,时间复杂度O(1) – 采用mmap共享内存实现内核-用户空间的数据传递

    demo

    BIO

    核心组件:①ServerSocket建立服务器监听,②Socket代表一个连接
    阻塞IO,socket调用读写方法时,会进入阻塞,直到数据到达

    public class ServerSocketDemo {
        private static ExecutorService executors = Executors.newFixedThreadPool(4);
    
        public static void main(String[] args) {
            // 服务端开启Socket监听
            try (ServerSocket serverSocket = new ServerSocket(8080);){
                while(true){
                    // 当请求到达时,新建线程处理对应socket
                    Socket socket = serverSocket.accept();
                    executors.execute(()->ServerSocketHandler.handler(socket));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ServerSocketHandler {
        public static void handler(Socket socket){
            // server中socket流的获取与client中流的获取顺序需相反(否则可能资源死等)
            try (InputStream is = socket.getInputStream();
                 ObjectInputStream ois = new ObjectInputStream(is);
    
                 OutputStream os = socket.getOutputStream();
                 ObjectOutputStream oos = new ObjectOutputStream(os);)
            {
                // 读操作(在数据未到达之前将阻塞)
                User msg = (User) ois.readObject();
                System.out.println(Thread.currentThread().getName() + " - 读入:" + msg);
    
                // 写操作
                oos.writeObject(msg);
                oos.flush();
            } catch (IOException | ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ClientSocketDemo {
        public static void main(String[] args) {
            // 客户端直接采用Socket建立连接
            try(Socket socket = new Socket("localhost",8080);){
                ClientSocketHandler.handler(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ClientSocketHandler {
        public static void handler(Socket socket){
            // server中socket流的获取与client中流的获取顺序需相反(否则可能资源死等)
            try (OutputStream os = socket.getOutputStream();
                 ObjectOutputStream oos = new ObjectOutputStream(os);
    
                 InputStream is = socket.getInputStream();
                 ObjectInputStream ois = new ObjectInputStream(is);
            ){
                // 写操作 - 执行操作也需相反。。。
                oos.writeObject(new User("kiqi",18));
                oos.flush();
    
                // 读操作
                User msg = (User) ois.readObject();
                System.out.println("客户端 - 读入:" + msg);
    
                Thread.sleep(1000);
            } catch (IOException | ClassNotFoundException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    NIO

    核心组件:①selector,用于判断是否存在活跃事件 ②ServerSocketChannel,用于连接事件通道 ③SocketChannel,用于读写事件通道 ④register(selector, SelectionKey.OP_ACCEPT);
    非阻塞IO,将socketChannel与相应事件注册到selector,当出现活跃事件时,selector.select()将解除阻塞,通过轮询获取并处理已经到达的事件(只能做到存在活跃事件,不知道具体是哪一个)

    public class ServerSocketChannelDemo {
        private static Selector selector;
    
        public static void main(String[] args) {
            try {
                // 开启selector功能(底层由操作系统提供支持 select,poll或epoll)
                selector = Selector.open();
                
                // ServerSocketChannel,开启服务端口监听
                ServerSocketChannel channel = ServerSocketChannel.open();
                channel.configureBlocking(false);
                channel.socket().bind(new InetSocketAddress(8080));
    
                // 将ServerSocketChannel与对应事件绑定到selector,当事件发生时,selector.select()将接触阻塞
                channel.register(selector, SelectionKey.OP_ACCEPT);
                while (true){
                    // 阻塞,直到出现活跃事件
                    selector.select();
                    
                    // 无法获知具体事件,需轮询判断
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                    while (iterator.hasNext()){
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        // 链接事件
                        if(selectionKey.isAcceptable()){
                            handleAccept(selectionKey);
                        // 读事件
                        }else if(selectionKey.isReadable()){
                            handleRead(selectionKey);
                            ((SocketChannel)selectionKey.channel()).write(ByteBuffer.wrap("server_msg - 啥玩意啊,这么难用".getBytes()));
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static void handleAccept(SelectionKey selectionKey) {
            // 处理连接事件,并将该连接通道的读事件注册到Selector
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            try {
                SocketChannel channel = serverSocketChannel.accept();
                channel.configureBlocking(false);
                channel.register(selector,SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static void handleRead(SelectionKey selectionKey) {
            // 处理读事件
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            try {
                channel.read(byteBuffer);
                System.out.println("Server - 嘛玩意啊,超级难用 - " + new String(byteBuffer.array()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ClientSocketChannelDemo {
        private static Selector selector;
    
        public static void main(String[] args) {
            try {
                // 开启selector功能(底层由操作系统提供支持 select,poll或epoll)
                selector = Selector.open();
    
                // SocketChannel,建立连接通道
                SocketChannel channel = SocketChannel.open();
                channel.configureBlocking(false);
                channel.connect(new InetSocketAddress("localhost",8080));
    
                // 将连接事件注册到selector
                channel.register(selector, SelectionKey.OP_CONNECT);
                while (true) {
                    // 阻塞,直到连接事件成功
                    selector.select();
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeySet.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        // 连接就绪,处理连接事件
                        if (selectionKey.isConnectable()) {
                            handleConnect(selectionKey);
                            ((SocketChannel)selectionKey.channel()).write(ByteBuffer.wrap("server_msg - 啥玩意啊,这么难用".getBytes()));
                        // 处理读事件
                        } else if (selectionKey.isReadable()) {
                            handleRead(selectionKey);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static void handleConnect(SelectionKey selectionKey) {
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            try {
                // 未调用channel.finishConnect()的连接,处于连接中,还无法进行读写
                if(channel.isConnectionPending()){
                    channel.finishConnect();
                }
                channel.configureBlocking(false);
                // 注册读事件
                channel.register(selector,SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static void handleRead(SelectionKey selectionKey) {
            // 处理读事件
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            try {
                channel.read(byteBuffer);
                System.out.println("Client - 嘛玩意啊,超级难用 - " + new String(byteBuffer.array()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    AIO

    核心组件:①AsynchronousServerSocketChannel,用于建立服务器监听。 ②channel.accept(),接受连接, ③AsynchronousSocketChannel,用于处理连接的读写事件
    ④Future,用于异步等待IO完成 ⑤CompletionHandler接口,当IO完成后,执行的回调接口方法
    异步非阻塞式IO,①提供了回调式处理,当活跃事件发生时,直接回调接口方法O(1) ②Future<?>异步IO(内部建立了内核空间与用户空间的内存)。

    // AsynchronousSocketChannel代表一个连接,通过连接完成读写事件处理
    // 方式一:CompletionHandler接口回调
    public class AysnServerSocketChannelDemo {
        public static void main(String[] args) throws InterruptedException {
            try {
                // 服务端建立端口监听
                AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
                channel.bind(new InetSocketAddress(8080));
    
                // 接受连接事件,当事件发生并且执行完IO后,会回调CompletionHandler接口
                channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                    @Override
                    public void completed(AsynchronousSocketChannel result, Object attachment) {
                        // 需再次开启接受连接事件
                        channel.accept(null, this);
    
                        // 处理已连接通道
                        handle(result);
                    }
    
                    // 失败时执行逻辑
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        exc.printStackTrace();
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
            Thread.sleep(100000);
        }
        
        public static void handle(AsynchronousSocketChannel channel) {
            try {
                final ByteBuffer buffer = ByteBuffer.allocate(1024);
                final long timeout = 10L;
                // 读事件,完成IO后回调CompletionHandler接口方法
                channel.read(buffer, timeout, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("read:" + result);
                        if (result == -1) {
                            try {
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            return;
                        }
                        buffer.flip();
                        System.out.println("received message:" + Charset.forName("UTF-8").decode(buffer));
                        buffer.clear();
                        channel.read(buffer, timeout, TimeUnit.SECONDS, null, this);
                    }
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        exc.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    // 方式二:Future<?>.get()异步方法等待结果
    public class AsynClientSocketChannelDemo {
    
        private static AsynchronousSocketChannel client;
    
        public static void main(String[] args) throws InterruptedException {
            try {
                // 创建连接通道并建立连接
                client = AsynchronousSocketChannel.open();
                Future<?> future = client.connect(new InetSocketAddress("localhost",8080));
                // get()方法,等待连接完成
                future.get();
                
                // 执行读写时间
                client.write(ByteBuffer.wrap("???".getBytes()));
            } catch (IOException | InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            Thread.sleep(100000);
        }
    }
    

    欢迎疑问、期待评论、感谢指点 -- kiqi,愿同您为友

    -- 星河有灿灿,愿与之辉

  • 相关阅读:
    IO模型
    Java NIO概述
    消息系统避免分布式事务
    JVM调优总结
    设计模式的六大原则
    Java 内存区域与内存溢出
    windows go安装
    ZooKeeper原理及使用
    再谈HashMap
    Html5 播放实时音频流
  • 原文地址:https://www.cnblogs.com/kiqi/p/14428146.html
Copyright © 2011-2022 走看看