实现的功能:
运行一个服务端,运行多个客户端。在客户端1,发送消息,其余客户端都能收到客户端1发送的消息。
重点:
1、ByteBuffer在使用时,注意flip()方法的调用,否则读取不到消息。
服务端
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; public class NioServer { public static void main(String[] args) throws Exception{ //创建服务端 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //绑定端口 serverSocketChannel.bind(new InetSocketAddress("localhost",12345)); //创建selector Selector selector = Selector.open(); //在selector中注册服务端的链接事件(注1) serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // //用于存放客户端的链接,用远端的端口,作为唯一标识(由于是本机开启多个客户端进行测试,所以不存在端口冲突问题) // Map<Integer,SocketChannel> clients = new HashMap<>(); List<SocketChannel> clients = new ArrayList<>(); while (true){ //阻塞等待事件的到来 selector.select(); //获取被触发的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); //遍历触发的事件 while (iterator.hasNext()){ try { //获取事件 SelectionKey event = iterator.next(); //是否可以链接 if(event.isAcceptable()){ //为什么需要强转? 因为在(注1)中,我们注册的是 ServerSocketChannel ,所有需要强转回来。(注2) ServerSocketChannel ssc = (ServerSocketChannel) event.channel(); //获取到链接的socketchannel SocketChannel socketChannel = ssc.accept(); socketChannel.configureBlocking(false); //将获取到的链接,注册读事件到selector中, socketChannel.register(selector,SelectionKey.OP_READ); // //将获取到的客户端,保存起来,用于跟其它客户端进行通信,由于不涉及线程问题,所以使用map足已 // clients.put(((InetSocketAddress)socketChannel.getRemoteAddress()).getPort(),socketChannel); clients.add(socketChannel); }else if(event.isReadable()){ //是否可以读取 //同理(注2) SocketChannel socketChannel = (SocketChannel) event.channel(); //创建socketChannel需要的buffer ByteBuffer byteBuffer = ByteBuffer.allocate(512); String receiveMessage = ""; while (true){ try{ //重置buffer byteBuffer.clear(); int read = socketChannel.read(byteBuffer); if(read <= 0 ){ //当读取到末尾时,跳出循环 break; } receiveMessage += new String(byteBuffer.array(), Charset.forName("UTF-8")); }catch (Exception e){ e.printStackTrace(); break; } } System.out.println("收到的消息为:"+((InetSocketAddress)socketChannel.getRemoteAddress()).getPort()+"---"+receiveMessage); //拼装需要发送的消息 final ByteBuffer otherbf = ByteBuffer.allocate(receiveMessage.length()+10); otherbf.put((((InetSocketAddress)socketChannel.getRemoteAddress()).getPort()+":"+receiveMessage).getBytes()); System.out.println(new String(otherbf.array())); //遍历客户端,发送消息 clients.stream().forEach(sc -> { try { if(((InetSocketAddress)socketChannel.getRemoteAddress()).getPort() == ((InetSocketAddress)sc.getRemoteAddress()).getPort()){ //消息不发给自己 }else{ otherbf.flip(); sc.write(otherbf); } }catch (Exception e){ e.printStackTrace(); } }); } }catch (Exception e){ //添加try是为了程序的健壮 e.printStackTrace(); }finally { //删除已经处理了的事件 iterator.remove(); } } } } }
客户端
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioClient { public static void main(String[] args) throws Exception{ //打开socketChannel SocketChannel socketChannel = SocketChannel.open(); //设置为非阻塞 socketChannel.configureBlocking(false); //链接到服务器 socketChannel.connect(new InetSocketAddress("localhost",12345)); //创建Selector Selector selector = Selector.open(); //向Selector注册连接事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); //阻塞等待事件触发 selector.select(); //获取连接事件key Set<SelectionKey> connectEventKey = selector.selectedKeys(); //获取触发的连接事件 SelectionKey connectEvent = connectEventKey.iterator().next(); //删除已经处理了的事件 selector.selectedKeys().clear(); //转换为注册时的channel SocketChannel eventSocketChannel = (SocketChannel) connectEvent.channel(); //向selector注册读事件 eventSocketChannel.register(selector,SelectionKey.OP_READ); new Thread(){ @Override public void run() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); ByteBuffer inputBuffer = ByteBuffer.allocate(1024); //长度需要重新考量 try{ if(socketChannel.finishConnect()){ System.out.println("完成连接。"); } while (true){ String s = reader.readLine(); inputBuffer.clear(); inputBuffer.put(s.getBytes()); inputBuffer.flip(); socketChannel.write(inputBuffer); } }catch (Exception e){ e.printStackTrace(); }finally { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } }.start(); //没有和连接事件合并到一个while里面,是因为压根就不会有两次连接,所以我将连接事件单独出来 while (true){ //阻塞等待事件触发,这次是触发读事件 selector.select(); Set<SelectionKey> readEventKey = selector.selectedKeys(); Iterator<SelectionKey> readIterator = readEventKey.iterator(); SocketChannel readSocketChannel = (SocketChannel) readIterator.next().channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(256); String content = ""; while (true){ byteBuffer.clear(); int read = readSocketChannel.read(byteBuffer); if(read <= 0){ break; } content += new String(byteBuffer.array()); } System.out.println("收到的消息为:"+content); readIterator.remove(); } } }