zoukankan      html  css  js  c++  java
  • Java IO 与 NIO 服务器&客户端通信小栗子

    本篇包含了入门小栗子以及一些问题的思考

    BIO

    package com.demo.bio;
    
    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Scanner;
    
    /**
     * 问题:开启多个客户端,只有服务端发送足够条数的消息,客户端才会收到
     */
    public class Server {
    
        public static void main(String[] args) throws Exception {
            new Server().startServer();
        }
    
        public void startServer() throws IOException {
            ServerSocket serverSocket = new ServerSocket(9999);
    
            while (true){
                Socket client = serverSocket.accept();
                System.err.println("Client:" + client.getInetAddress().getHostAddress());
                OutputStream out = client.getOutputStream();
                PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, "UTF-8"), true);
                writer.println("Hello!We are already connected!say 'bye' to close");
    
                new Thread(new SocketReadThread(client)).start();
                new Thread(new SocketWriteThread(client)).start();
            }
    
        }
    }
    
    /**
     * 读线程
     */
    class SocketReadThread implements Runnable{
    
        private Socket socket;
    
        public SocketReadThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                InputStream in = socket.getInputStream();
                Scanner scanner = new Scanner(in, "UTF-8");
                boolean bye = false;
                while (!bye && scanner.hasNextLine()){
                    String line = scanner.nextLine();
                    System.out.println("Client Msg[" + socket + "]:" + line);
                    if(line.trim().equals("bye")){
                        bye = true;
                    }
                }
                in.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    /**
     * 写线程
     */
    class SocketWriteThread implements Runnable{
    
        private Socket socket;
    
        public SocketWriteThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                OutputStream out = socket.getOutputStream();
                PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, "UTF-8"), true);
                Scanner scanIn = new Scanner(System.in);
                while (true){
                    String line = scanIn.nextLine();
                    writer.println(line);
                    if (socket.isClosed()){
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    }
    package com.demo.bio;
    
    import java.io.*;
    import java.net.Socket;
    import java.util.Scanner;
    
    /**
     * 客户端
     */
    public class Client {
    
        public static void main(String[] args) throws Exception {
            Socket socket = new Socket("127.0.0.1", 9999);
            OutputStream out = socket.getOutputStream();
            PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, "UTF-8"), true);
    
            new Thread(new SocketReceiveThread(socket)).start();
            Scanner scanIn = new Scanner(System.in);
            while (!socket.isClosed()){
                String line = scanIn.nextLine();
                writer.println(line);
                if(line.trim().equals("bye")){
                    socket.close();
                }
            }
        }
    
    
    }
    class SocketReceiveThread implements Runnable{
    
        private Socket socket;
    
        public SocketReceiveThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                InputStream in = socket.getInputStream();
                Scanner scanner = new Scanner(in, "UTF-8");
                boolean bye = false;
                while (!bye && scanner.hasNextLine()){
                    String line = scanner.nextLine();
                    System.out.println("Server Msg:" + line);
                    if(line.trim().equals("bye")){
                        bye = true;
                    }
                }
                scanner.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    }

    BIO没什么难的,同步阻塞。上面实现的主要就是服务器和客户端你一句我一句,巴拉巴拉巴拉

    NIO

    我要实现一个客户端服务器通信的例子,我的第一个版本

    package com.demo.nio;
    
    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    /**
     * 问题:启动服务器,没有启动客户端的时候,阻塞在selector.select();直到有客户端连接才会向下走。
     *       启动客户端:获取到客户端的消息,并读取显示;然后写一条数据给客户端;然后进入了写操作模块,等待写入,阻塞。
     *                   这个时候,客户端已经经过了读取操作,并且没有读到数据,也进入了写操作模块,等待写入,阻塞。这就解释了为什么客户端收不到服务器的第一条消息。
     *       客户端写入:客户端输入数据,发送给服务器,离开写操作模块,进入下一轮循环,然后进入读操作模块,读取到服务器的第一条消息并显示。
     *       服务器接收:此时服务器并没有收到客户端的消息,因为此时还在写操作模块阻塞,所以想要读取到数据,就要向客户端发送数据,以离开写操作模块,进入下一轮循环。
     *       这就解释了:为什么要先写入才能读取的数据。
     */
    public class Server {
    
        private boolean isFirst = true;
        private ServerSocketChannel ssc = null;
        private Selector selector = null;
    
        public Server(int port) throws IOException {
            ssc = ServerSocketChannel.open();
            selector = Selector.open();
            InetSocketAddress inetAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
            
            ssc.socket().bind(inetAddress);
            ssc.configureBlocking(false);
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            listener(selector);
        }
        
        private void listener(Selector selector) throws IOException{
            while(true){
                System.out.println("等待客户端连接...");
                selector.select();
                System.out.println("捕获客户端连接...");
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
    
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    //连接事件
                    if(key.isAcceptable()){
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        channel.accept().configureBlocking(false).register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        //System.out.println(channel.toString() + "-已连接");
                    }
                    //读数据
                    if(key.isReadable()){
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer bf = ByteBuffer.allocate(1024);
                        channel.read(bf);
                        System.out.println("来自客户端数据:" + new String(bf.array()));
                        // 只有第一次通信返回消息
                        if(isFirst){
                            isFirst = false;
                            ByteBuffer bst = ByteBuffer.wrap("Hi!".getBytes());
                            channel.write(bst);
                        }
                    }
                    //写数据
                    if(key.isWritable()){
                        Scanner scanner = new Scanner(System.in);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String msg = sdf.format(new Date()) + "	" + scanner.nextLine();
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer bst = ByteBuffer.wrap(msg.getBytes());
                        channel.write(bst);
    //                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);// 取消写就绪,否则会一直触发写就绪
                    }
                    iterator.remove();
                    
                }
                
            }
        }
        
        public static void main(String[] args) {
            try {
                Server server = new Server(9999);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        
    }
    package com.demo.nio;
    
    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    public class Client {
    
        
        private SocketChannel sc = null;
        private Selector selector = null;
        
        public Client(int port) throws IOException {
            sc = SocketChannel.open();
            selector = Selector.open();
            sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
            sc.configureBlocking(false);
            sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            ByteBuffer bf = ByteBuffer.wrap("Hello".getBytes());
            sc.write(bf);
            
            listener(selector);
        }
        
        private void listener(Selector selector) throws IOException{
            while(true){
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if(key.isReadable()){
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer dst = ByteBuffer.allocate(1024);
                        channel.read(dst);
                        System.out.println("来自服务器:" + new String(dst.array()));
                    }
                    if(key.isWritable()){
                        Scanner scanner = new Scanner(System.in);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String msg = sdf.format(new Date()) + "	" + scanner.nextLine();
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer bst = ByteBuffer.wrap(msg.getBytes());
                        channel.write(bst);
                    }
                    iterator.remove();
                }
                
            }
        }
        
        public static void main(String[] args) {
            try {
                Client client = new Client(9999);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
    }

    上面例子的问题在注释里已经详细描述了,不信可以运行一下,下面是修正版,把写操作放在一个独立的线程里

    package com.demo.nio;
    
    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    /**
     *  修正版
     */
    public class ServerRevision {
    
        private boolean isFirst = true;
        private ServerSocketChannel ssc = null;
        private Selector selector = null;
    
        public ServerRevision(int port) throws IOException {
            ssc = ServerSocketChannel.open();
            selector = Selector.open();
            InetSocketAddress inetAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
            
            ssc.socket().bind(inetAddress);
            ssc.configureBlocking(false);
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            listener(selector);
        }
        
        private void listener(Selector selector) throws IOException{
            while(true){
                System.out.println("等待客户端连接...");
                selector.select();
                System.out.println("捕获客户端连接...");
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
    
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    //连接事件
                    if(key.isAcceptable()){
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        channel.accept().configureBlocking(false).register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        //System.out.println(channel.toString() + "-已连接");
                    }
                    //读数据
                    if(key.isReadable()){
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer bf = ByteBuffer.allocate(1024);
                        channel.read(bf);
                        System.out.println("来自客户端数据:" + new String(bf.array()));
                        // 只有第一次通信返回消息
                        if(isFirst){
                            isFirst = false;
                            ByteBuffer bst = ByteBuffer.wrap("Hi!".getBytes());
                            channel.write(bst);
                        }
                    }
                    //写数据
                    if(key.isWritable()){
                        System.out.println("[服务器]写就绪...");
                        new Thread(new DealWrite(key)).start();
                        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);// 取消写就绪,否则会一直触发写就绪
                    }
                    iterator.remove();
                    
                }
                
            }
        }
        
        public static void main(String[] args) {
            try {
                ServerRevision server = new ServerRevision(9999);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    class DealWrite implements Runnable{
    
        private SelectionKey key;
    
        public DealWrite(SelectionKey key) {
            this.key = key;
        }
    
        @Override
        public void run() {
            while (true){
                Scanner scanner = new Scanner(System.in);
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String msg = sdf.format(new Date()) + "	" + scanner.nextLine();
                SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer bst = ByteBuffer.wrap(msg.getBytes());
                try {
                    channel.write(bst);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    package com.demo.nio;
    
    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    /**
     * 修正版
     */
    public class ClientRevision {
    
    
        private SocketChannel sc = null;
        private Selector selector = null;
    
        public ClientRevision(int port) throws IOException {
            sc = SocketChannel.open();
            selector = Selector.open();
            sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
            sc.configureBlocking(false);
            sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            ByteBuffer bf = ByteBuffer.wrap("Hello".getBytes());
            sc.write(bf);
            
            listener(selector);
        }
        
        private void listener(Selector selector) throws IOException{
            while(true){
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if(key.isReadable()){
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer dst = ByteBuffer.allocate(1024);
                        channel.read(dst);
                        System.out.println("来自服务器:" + new String(dst.array()));
                    }
                    if(key.isWritable()){
                        System.out.println("[客户端]写就绪...");
                        new Thread(new DealWrite(key)).start();
                        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);// 取消写就绪,否则会一直触发写就绪
                    }
                    iterator.remove();
                }
                
            }
        }
        
        public static void main(String[] args) {
            try {
                ClientRevision client = new ClientRevision(9999);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
    }

    目前只是测试了服务器-客户端一对一的通信,不知道一个服务器对多个客户端会出什么bug

    NIO稍微有些复杂吧,不过核心的就三个Selector、Channel、Buffer,NIO是同步非阻塞的。

  • 相关阅读:
    【题解】P2569 [SCOI2010]股票交易
    【题解】P3354 [IOI2005]Riv 河流
    入职阿里蚂蚁三个月有感
    搞懂G1垃圾收集器
    MySql分库分表与分区的区别和思考
    Kafka源码分析及图解原理之Broker端
    Kafka源码分析及图解原理之Producer端
    Oracle GoldenGate mysql To Kafka上车记录
    从动态代理到Spring AOP(中)
    从动态代理到Spring AOP(上)
  • 原文地址:https://www.cnblogs.com/LUA123/p/11389692.html
Copyright © 2011-2022 走看看