在BIO编程中,每一个客户端连接请求过来,对于输入流,必须有单独的线程监听,看是否有数据到达,对于输出流,可以采用一个线程池管理,这样服务端的线程数量至少为n
下面例子是NIO中采用Channel+线程池方式,有个缺点是不知道SocketChannel是否有数据到达了,必须迭代所有的SocketChannel,如果有数据到达,有就处理,否则就跳过,效率太低
public class TimeServer { private BlockingQueue<SocketChannel> idleQueue =new LinkedBlockingQueue<SocketChannel>(); private BlockingQueue<Future<SocketChannel>> workingQueue=new LinkedBlockingQueue<Future<SocketChannel>>(); private ExecutorService executor = Executors.newSingleThreadExecutor(); { new Thread(){ @Override public void run() { try { while (true) { //task1:迭代当前idleQueue中的SocketChannel,提交到线程池中执行任务,并将其移到workingQueue中 for (int i = 0; i < idleQueue.size(); i++) { SocketChannel socketChannel = idleQueue.poll(); if (socketChannel != null) { Future<SocketChannel> result = executor.submit(new TimeServerHandleTask(socketChannel), socketChannel); workingQueue.put(result); } } //task2:迭代当前workingQueue中的SocketChannel,如果任务执行完成,将其移到idleQueue中 for (int i = 0; i < workingQueue.size(); i++) { Future<SocketChannel> future = workingQueue.poll(); if (!future.isDone()){ workingQueue.put(future); continue; } SocketChannel channel = null; channel = future.get(); idleQueue.put(channel); } } } catch (Exception e) { e.printStackTrace(); } } }.start(); } /** * 1、在main线程中,当接受到一个新的连接时,我们将相应的SocketChannel放入idleQueue * * */ public static void main(String[] args) throws IOException, InterruptedException { TimeServer timeServer = new TimeServer(); ServerSocketChannel ssc=ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(8080)); while (true){ SocketChannel socketChannel = ssc.accept(); if(socketChannel==null){ continue; }else{ socketChannel.configureBlocking(false); timeServer.idleQueue.add(socketChannel); } } } }
public class TimeServerHandleTask implements Runnable { SocketChannel socketChannel; public TimeServerHandleTask(SocketChannel socketChannel) { this.socketChannel = socketChannel; } @Override public void run() { try { ByteBuffer requestBuffer = ByteBuffer.allocate("GET CURRENT TIME".length()); //尝试读取数据,因为是非阻塞,所以如果没有数据会立即返回。 int bytesRead = socketChannel.read(requestBuffer); //如果没有读取到数据,说明当前SocketChannel并没有接受到数据,不需要处理 if (bytesRead <= 0) { return; } //如果读取到了数据,则需要考虑粘包、解包问题,这个while代码是为了读取一个完整的请求信息"GET CURRENT TIME", while (requestBuffer.hasRemaining()) { socketChannel.read(requestBuffer); } String requestStr = new String(requestBuffer.array()); if (!"GET CURRENT TIME".equals(requestStr)) { String bad_request = "BAD_REQUEST"; ByteBuffer responseBuffer = ByteBuffer.allocate(bad_request.length()); responseBuffer.put(bad_request.getBytes()); responseBuffer.flip(); socketChannel.write(responseBuffer); } else { String timeStr = Calendar.getInstance().getTime().toLocaleString(); ByteBuffer responseBuffer = ByteBuffer.allocate(timeStr.length()); responseBuffer.put(timeStr.getBytes()); responseBuffer.flip(); socketChannel.write(responseBuffer); } } catch (Exception e) { throw new RuntimeException(e); } } }
public class TimeClient { // 连接超时时间 static int connectTimeOut = 3000; static ByteBuffer buffer = ByteBuffer.allocateDirect(1024); public static void main(String[] args) throws IOException, InterruptedException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080)); socketChannel.configureBlocking(false); long start = System.currentTimeMillis(); while (!socketChannel.finishConnect()) { if (System.currentTimeMillis() - start >= connectTimeOut) { throw new RuntimeException("尝试建立连接超过3秒"); } } // 如果走到这一步,说明连接建立成功 while (true) { buffer.put("GET CURRENT TIME".getBytes()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); if (socketChannel.read(buffer) > 0) { buffer.flip(); byte[] response = new byte[buffer.remaining()]; buffer.get(response); System.out.println("reveive response:" + new String(response)); buffer.clear(); } Thread.sleep(5000); } } }
Selector 只用一个线程池
public class TimeServer { private static ExecutorService executor; static { executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000)); } public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(8080)); ssc.configureBlocking(false); Selector selector = Selector.open(); ssc.register(selector, ssc.validOps()); while (true) { int readyCount = selector.select(1000); if (readyCount == 0) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey selectionKey = keyIterator.next(); if (selectionKey.isValid()) { // 表示ServerSocketChannel if (selectionKey.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = server.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } // 表示SocketChannel if (selectionKey.isReadable()) { executor.submit(new TimeServerTask(selectionKey)); } keyIterator.remove(); } } } } }
public class TimeServerTask implements Runnable{ private SelectionKey selectionKey; public TimeServerTask(SelectionKey selectionKey) { this.selectionKey = selectionKey; } @Override public void run() { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); try { int count =0; while ((count = channel.read(byteBuffer)) > 0) { byteBuffer.flip(); byte[] request=new byte[byteBuffer.remaining()]; byteBuffer.get(request); String requestStr=new String(request); byteBuffer.clear(); if (!"GET CURRENT TIME".equals(requestStr)) { channel.write(byteBuffer.put("BAD_REQUEST".getBytes())); } else { byteBuffer.put(Calendar.getInstance().getTime().toLocaleString().getBytes()); byteBuffer.flip(); channel.write(byteBuffer); } } } catch (IOException e) { e.printStackTrace(); selectionKey.cancel(); } } }