首先明确Socket通信模型
Socket通信实现步骤
- 创建ServerSocket和Socket
- 打开连接到的Socket的输入/输出流
- 按照协议对Socket进行读/写操作
- 关闭输入输出流,以及Socket
BIO编程流程
服务端:
- 创建ServerSocket实例
- 绑定端口
- 通过accept来监听客户端的连接,有客户端连接会返回socket实例
- 进行读写操作
- 关闭资源
客户端:
- 创建socket实例
- 通过connect并指定服务端的IP+端口连接服务端
- 进行读写操作
- 关闭资源
客户端发送一个消息,服务端回复一个同样的消息的实现
服务端:
public class BIOServer { public static void main(String[] args) throws IOException { ServerSocket ssocket = null; BufferedReader bufferedReader = null; OutputStream outputStream = null; Socket accept = null; try { ssocket = new ServerSocket(); //端口绑定 ssocket.bind(new InetSocketAddress(8888)); System.out.println("服务端启动了!"); //监听 accept = ssocket.accept(); System.out.println("客户端:"+accept.getRemoteSocketAddress()+"连接上了"); while (true) { //读取客户端消息 bufferedReader = new BufferedReader(new InputStreamReader(accept.getInputStream())); String msg = bufferedReader.readLine(); if(msg.equals("exit")) break; System.out.println("客户端" + accept.getRemoteSocketAddress() + "发来消息:" + msg); //给客户端恢复消息 outputStream = accept.getOutputStream(); outputStream.write(("回复:" + msg + " ").getBytes()); } } catch (IOException e) { e.printStackTrace(); }finally { //释放资源 if(ssocket != null) ssocket.close(); if(accept != null) accept.close(); if(bufferedReader != null) bufferedReader.close(); if(outputStream != null) outputStream.close(); System.out.println("服务端关闭"); } } }
客户端:
public class BIOClient { public static void main(String[] args) throws IOException { Socket socket = null; OutputStream outputStream = null; BufferedReader bufferedReader = null; try { //绑定端口和地址 socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1",8888)); System.out.println("连接服务端成功!"); while (true) { //给客户端发消息 outputStream = socket.getOutputStream(); Scanner scanner = new Scanner(System.in); System.out.println("请输入消息:"); String msg = scanner.nextLine(); if(msg.equals("exit"))break; outputStream.write((msg+ " ").getBytes()); //刷出缓存 outputStream.flush(); //读取客户端消息 bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println("服务端" + bufferedReader.readLine()); } } catch (IOException e) { e.printStackTrace(); }finally { //释放资源 if(socket != null) socket.close(); if(bufferedReader != null) bufferedReader.close(); if(outputStream != null) outputStream.close(); System.out.println("客户端端关闭"); } } }
BIO是同步阻塞模型,其阻塞体现在:
- accept():阻塞接收客户端的连接
- read() /write()
- connect():和服务端建立连接(三次握手),连接的过程中connect会阻塞
BIO的多个客户端的处理(多线程):
客户端无需改变,服务端如下:
public class TServer { public static void main(String[] args) throws IOException { ServerSocket ssocket = null; Socket accept = null; try { ssocket = new ServerSocket(); //端口绑定 ssocket.bind(new InetSocketAddress(8888)); System.out.println("服务端启动了!"); while (true) { //监听,等待多个客户端连接 accept = ssocket.accept(); System.out.println("客户端:"+accept.getRemoteSocketAddress()+"连接上了"); //将Socket实例交给子线程处理 new TServer().new ServerHanlder(accept).start(); } } catch (IOException e) { e.printStackTrace(); }finally { //释放资源 if(ssocket != null) ssocket.close(); if(accept != null) accept.close(); System.out.println("服务端关闭"); } } class ServerHanlder extends Thread{ private Socket socket; private BufferedReader bufferedReader = null; private OutputStream outputStream = null; public ServerHanlder(Socket socket){ this.socket = socket; } @Override public void run() { //读取客户端消息 try { while (true) { bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String msg = bufferedReader.readLine(); //程序结束条件 if("exit".equals(msg)) break; System.out.println("客户端" + socket.getRemoteSocketAddress() + "发来消息:" + msg); //给客户端恢复消息 outputStream = socket.getOutputStream(); outputStream.write(("回复:" + msg + " ").getBytes()); } } catch (IOException e) { e.printStackTrace(); }finally { if (bufferedReader != null) { try { bufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } } if(outputStream != null) { try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("客户端:"+socket.getRemoteSocketAddress()+"关闭了"); } } } }
在多个用户的连接情况下,为了达到服务端处理的并发量,就需要对每一个用户的连接要分配一个新线程来处理,但是线程资源时有限的,不能无限制的创建出新的资源,那么对于高并发的支持也就大打折扣了。
NIO编程流程
服务端:
- 实例化通道:ServerSocketChannel
- 绑定端口:通过ServerSocketChannel实例调用bindI()方法绑定端口
- 将ServerSocketChannel设置为非阻塞
- 实例化选择器(IO复用器)Selector
- 将ServerSocketChannel注册给选择器,并且关注accept事件
- 监听事件是否完成,selector.select,如果事件未完成则一直阻塞直到事件完成
- 获取已完成事件的集合并遍历,判断是否是accept事件,是,则调用accept方法,获取SocketChannel通道
- 设置SocketChannel为非阻塞,并将SocketChannel注册到选择器Selector,并关注read事件
- 监听事件是否完成,若有事件完成,则判断是否是read读事件
- 通过SocketChannel通道读取数据(Buffer中),读完数据循环事件监听,即步骤6
- 关闭资源:ServerSocketChannel,SocketChannel,Selector
客户端:
- 实例化通道:SocketChannel
- 设置SocketChannel为非阻塞
- 实例化复用器:Selector
- 连接服务器connect()(该方法不会阻塞直接返回结果,返回为Boolean,是否连接成功)
- 若返回为false,则将SocketChannel注册到复用器中,并监听connect可读事件
- 监听复用器事件是否完成(Selector.select),判断完成集合中是否有可连接事件,将可连接事件完成(channel.finishConnet())
- 给服务端发送消息,channel.write()操作
- 关闭资源:selector、SocketChannel
服务端:
public class NIOServer { public static void main(String[] args) { ServerSocketChannel serverSocketChannel = null; try { //创建ServerSocketChannel通道实例 serverSocketChannel = ServerSocketChannel.open(); //绑定端口 serverSocketChannel.bind(new InetSocketAddress(9998)); System.out.println("服务端启动了"); //将serverSocketChannel设置为非阻塞 configureBlocking设置阻塞非阻塞 false:非阻塞 true:阻塞 serverSocketChannel.configureBlocking(false); //创建selector选择器 Selector selector = Selector.open(); //将通道serverSocketChannel注册到选择器selector,关注可接受事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //等待监听结果,调用选择器的select阻塞等待,直到有事件发生才返回 while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); //是否是可接受事件 if (selectionKey.isAcceptable()) { System.out.println("可接受事件"); //有新用户连接 ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel(); //接受客户端的连接,通过accept(不在阻塞)接受一个SocketChannel通道 SocketChannel socketChannel = serverSocketChannel1.accept(); //设置socketChannel为非阻塞 socketChannel.configureBlocking(false); //将socketChannel注册到选择器selector选择器,关注可读事件 socketChannel.register(selector, SelectionKey.OP_READ); } //是否是可读事件 if (selectionKey.isReadable()) { System.out.println("可读事件"); //获取SocketChannel通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //创建Buffer ByteBuffer buffer = allocate(100); //进行读取操作 socketChannel.read(buffer); //进行读写模式的切换 buffer.flip(); //将数据从Buffer中读取 byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes, 0, bytes.length); //打印结果 System.out.println("客户端:"+socketChannel.getRemoteAddress()+msg); if(msg.equals("")){ socketChannel.close(); } } } } } catch (IOException e) { e.printStackTrace(); } finally { //关闭资源 try { if (serverSocketChannel != null) { serverSocketChannel.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
客户端:
public class NIOClient { public static void main(String[] args) { SocketChannel socketChannel = null; try { //创建SocketChannel通道 socketChannel = SocketChannel.open(); //设置socketChannel为非阻塞 socketChannel.configureBlocking(false); //创建Selector选择器 Selector selector = Selector.open(); //主动的进行连接,connect操作不在会阻塞,会直接返回,如果连接成功返回true ,连接还未完成返回false if (!socketChannel.connect(new InetSocketAddress("127.0.0.1",9998))) { //当前连接操作未完成 //将SocketChannel注册到选择器,并关注可连接事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); //等待连接完成 selector.select(); Iterator <SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); //是否是可连接事件 if (selectionKey.isConnectable()) { //可连接事件完成 SocketChannel channel = (SocketChannel) selectionKey.channel(); //连接操作完成 channel.finishConnect(); } } } //连接成功,给服务端发送消息 ByteBuffer buffer = ByteBuffer.allocate(100); //将发送的数据写到Buffer中 buffer.put("hello ".getBytes()); //读写模式的切换 buffer.flip(); socketChannel.write(buffer); //关闭资源 selector.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (socketChannel != null) { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
客户端发送一个消息,服务端回复一个同样的消息的实现
服务端:
public class NIOServer { public static void main(String[] args) { ServerSocketChannel serverSocketChannel = null; try { //创建ServerSocketChannel通道实例 serverSocketChannel = ServerSocketChannel.open(); //绑定端口 serverSocketChannel.bind(new InetSocketAddress(9998)); System.out.println("服务端启动了"); //将serverSocketChannel设置为非阻塞 configureBlocking设置阻塞非阻塞 false:非阻塞 true:阻塞 serverSocketChannel.configureBlocking(false); //创建selector选择器 Selector selector = Selector.open(); //将通道serverSocketChannel注册到选择器selector,关注可接受事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //等待监听结果,调用选择器的select阻塞等待,直到有事件发生才返回 while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); //是否是可接受事件 if (selectionKey.isAcceptable()) { System.out.println("可接受事件"); //有新用户连接 ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel(); //接受客户端的连接,通过accept(不在阻塞)接受一个SocketChannel通道 SocketChannel socketChannel = serverSocketChannel1.accept(); //设置socketChannel为非阻塞 socketChannel.configureBlocking(false); //将socketChannel注册到选择器selector选择器,关注可读事件 socketChannel.register(selector, SelectionKey.OP_READ); } //是否是可读事件 if (selectionKey.isReadable()) { System.out.println("可读事件"); //获取SocketChannel通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //创建Buffer ByteBuffer buffer = allocate(100); //进行读取操作 socketChannel.read(buffer); //进行读写模式的切换 buffer.flip(); //将数据从Buffer中读取 byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes, 0, bytes.length); //给客户端回复消息 buffer.clear(); buffer.put(("服务端回复"+msg).getBytes()); buffer.flip(); socketChannel.write(buffer); //打印结果 System.out.println("客户端:"+socketChannel.getRemoteAddress()+msg); if(msg.equals("") || "exit".equals(msg)){ selectionKey.cancel(); socketChannel.close(); } } } } } catch (IOException e) { e.printStackTrace(); } finally { //关闭资源 try { if (serverSocketChannel != null) { serverSocketChannel.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
客户端:
public class NIOClient { public static void main(String[] args) { SocketChannel socketChannel = null; try { //创建SocketChannel通道 socketChannel = SocketChannel.open(); //设置socketChannel为非阻塞 socketChannel.configureBlocking(false); //创建Selector选择器 Selector selector = Selector.open(); //主动的进行连接,connect操作不在会阻塞,会直接返回,如果连接成功返回true ,连接还未完成返回false if (!socketChannel.connect(new InetSocketAddress("127.0.0.1",9998))) { //当前连接操作未完成 //将SocketChannel注册到选择器,并关注可连接事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); //等待连接完成 selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); //是否是可连接事件 if (selectionKey.isConnectable()) { //可连接事件完成 SocketChannel channel = (SocketChannel) selectionKey.channel(); //连接操作完成 channel.finishConnect(); } } } //注册读事件 socketChannel.register(selector,SelectionKey.OP_READ); Scanner scanner = new Scanner(System.in); //连接成功,给服务端发送消息 ByteBuffer buffer = ByteBuffer.allocate(100); while (scanner.hasNext()){ String msg = scanner.nextLine(); //将发送的数据写到Buffer中 buffer.put((msg+" ").getBytes()); //读写模式的切换 buffer.flip(); socketChannel.write(buffer); //关注服务端的读事件 selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); //判断是否是可读事件 if(selectionKey.isReadable()){ SocketChannel channel = (SocketChannel) selectionKey.channel(); //读操作 buffer.clear(); channel.read(buffer); //读写模式切换 buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String s = new String(bytes); System.out.println(s); } } if("".equals(msg) || "exit".equals(msg)) break; buffer.clear(); //每次使用需要清空,循环使用 } //关闭资源 selector.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (socketChannel != null) { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
NIO+多线程
NIO中一个selector可以关注多个用户的连接(即一个线程可以同时处理多个用户的通信),为了并发用户量能够处理更多,可以使用NIO+多线程的形式来处理;其主要处理思路是主线程接收来自客户端的连接(accept),子线程处理用户的IO操作,主线程接收到客户端连接Socketchannel通道,将SocketChannel交给子线程。并且应该在主线程和子线程中使用各自的Selector,假如主线程和子线程共用一个选择器,当处理来自客户端的请求是一个连接请求时,子线程恰好获取到,就会造成混乱。
服务端:
public class MTNIOServer { public static void main(String[] args) { ServerSocketChannel serverSocketChannel = null; try { //创建ServerSocketChannel通道实例 serverSocketChannel = ServerSocketChannel.open(); //绑定端口 serverSocketChannel.bind(new InetSocketAddress(9998)); System.out.println("服务端启动了"); //将serverSocketChannel设置为非阻塞 configureBlocking设置阻塞非阻塞 false:非阻塞 true:阻塞 serverSocketChannel.configureBlocking(false); //创建selector选择器 Selector selector = Selector.open(); //将通道serverSocketChannel注册到选择器selector,关注可接受事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //子线程以线程池的形式提供 ExecutorService executorService = Executors.newFixedThreadPool(10); //等待监听结果,调用选择器的select阻塞等待,直到有事件发生才返回 while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); //是否是可接受事件 if (selectionKey.isAcceptable()) { System.out.println("可接受事件"); //有新用户连接 ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel(); //获取新用户channel SocketChannel socketChannel = serverSocketChannel1.accept(); System.out.println(Thread.currentThread().getName()+":客户端:"+socketChannel.getRemoteAddress()+" 连接上了"); //将通道交给子线程 executorService.submit(new NIOServerHandler(socketChannel)); } } } } catch (Exception e) { } } } public class NIOServerHandler implements Runnable { //通过主线程将socketChannel获取到 private SocketChannel socketChannel; //创建selector实例 private Selector selector = null ; public NIOServerHandler(SocketChannel socketChannel) { this.socketChannel = socketChannel; try { //在一个子线程中只需要创建一个selector实例 if (selector == null) selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { try { //将socketChannel设置为非阻塞 socketChannel.configureBlocking(false); //将socketChannel注册到选择器中,并且关注可读事件 socketChannel.register(selector, SelectionKey.OP_READ); while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); //是否是可读事件 if (selectionKey.isReadable()) { //获取SocketChannel通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //创建Buffer ByteBuffer buffer = ByteBuffer.allocate(100); //进行读取操作 socketChannel.read(buffer); //进行读写模式的切换 buffer.flip(); //将数据从Buffer中读取 byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes, 0, bytes.length); //给客户端回复消息 buffer.clear(); buffer.put(("客户端回复:"+msg).getBytes()); //读写模式切换 buffer.flip(); //回复消息 socketChannel.write(buffer); //打印结果 System.out.println(Thread.currentThread().getName()+"客户端:"+socketChannel.getRemoteAddress()+" 消息:"+msg); if ("".equals(msg)|| "exit".equals(msg)){ System.out.println(Thread.currentThread().getName()+"客户端:"+socketChannel.getRemoteAddress()+" 下线"); //当前注册的感兴趣事件取消 selectionKey.cancel(); //关闭通道 socketChannel.close(); } } } } } catch (IOException e) { e.printStackTrace(); } } }
NIO相比于BIO,一个线程可以处理多个socket,NIO+多线程提高网络服务性能,降低线程的数量。在服务器端线程过多地影响:
- Java中创建线程,最终会映射到本地操作系统上的进程所对应的线程,Linux为例,fork是重量级的操作,系统开销大
- 多线程需要CPU调度,会有上下文的切换,线程过多时,上下文切换的时间会趋近于或大于线程本身业务执行的时间,CPU就存在浪费,降低了系统的性能
- 线程创建需要开辟线程私有的内空间,线程过多的话,为线程运行准备的内存就会占去很大一部分,真正用来分配还给业务的内存就大大减少,系统运行不可靠
- 线程过多的业务中,线程阻塞,等待网络事件的发生,如果一瞬间客户端的请求量比较大,系统会瞬间唤醒很多数量的线程,造成系统内存和CPU使用率居高不小,造成系统负载过高