最近看了《netty in action》,关于netty的线程模型不太理解,于是学习了一下java nio的知识,利用java nio写个简单的服务器便于理解。
java nio有3个重要的概念, Channels ,Buffers ,Selectors。通过他们我们可以用单个的线程监听多个数据通道。
java nio可以进行阻塞的io操作,也可以进行非阻塞的io操作,我们更多是用非阻塞式的操作。
参考文章
测试工具
USR-TCP232
SocketTool2
服务器端使用两个线程,一个线程负责 accept 连接,另一个线程负责处理 接收到的数据
accept代码
package me.zingon.nioserver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
/**
* Created by zhidi on 2018-6-29.
*/
public class SocketAccepter implements Runnable{
private Integer port = 9999;
private ServerSocketChannel server=null;
//接受到的连接
private BlockingQueue<SocketChannel> queue=null;
Selector acceptSelector=Selector.open();
public SocketAccepter(Integer port, BlockingQueue<SocketChannel> queue) throws IOException {
this.port = port;
this.queue = queue;
}
@Override
public void run() {
try {
this.server = ServerSocketChannel.open();
//配置为非阻塞
this.server.configureBlocking(false);
//注册accept事件
this.server.register(acceptSelector, SelectionKey.OP_ACCEPT);
this.server.bind(new InetSocketAddress(port));
System.out.println("服务器在 "+port +"端口启动");
} catch (IOException e) {
e.printStackTrace();
}
try {
SocketChannel socketChannel=this.server.accept();
} catch (IOException e) {
e.printStackTrace();
}
while (true){
int count = 0;
try {
count = acceptSelector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
if(count ==0 ){
continue;
}
Iterator<SelectionKey> iterator = acceptSelector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
try {
//接受连接
SocketChannel sc = ssc.accept();
queue.add(sc);
System.out.println("服务器接收连接 :" + sc);
} catch (IOException e) {
e.printStackTrace();
}
iterator.remove();
}
}
}
}
处理线程
package me.zingon.nioserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
/**
* Created by zhidi on 2018-6-29.
*/
public class SocketLoop implements Runnable {
//已接受连接
private BlockingQueue<SocketChannel> queue;
//读selector
private Selector readSelector=Selector.open();
//写selector
private Selector writeSelector=Selector.open();
private ByteBuffer readBuf=ByteBuffer.allocate(1024*64);
private Queue<String> msgQueue=new LinkedList<>();
public SocketLoop(BlockingQueue<SocketChannel> queue) throws IOException {
this.queue = queue;
}
@Override
public void run() {
System.out.println("服务器处理线程启动");
while (true){
//处理已接收连接
registerNewChannel();
try {
readFromChannels();
writeToChannels();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//给socketChannel注册读/写时间
private void registerNewChannel(){
SocketChannel sc=null;
while( (sc = queue.poll()) != null){
try {
sc.configureBlocking(false);
sc.register(readSelector, SelectionKey.OP_READ);
sc.register(writeSelector,SelectionKey.OP_WRITE);
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//从读就绪的channel中读取数据,添加到msgQueue中
private void readFromChannels() throws IOException {
int count = readSelector.selectNow();
if(count == 0){
return;
}
Iterator<SelectionKey> iterator = readSelector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(!key.isValid()){
continue;
}
SocketChannel channel = (SocketChannel) key.channel();
try {
channel.read(readBuf);
readBuf.flip();
StringBuilder sb=new StringBuilder();
while(readBuf.hasRemaining()) {
sb.append((char)readBuf.get());
}
readBuf.compact();
System.out.println(sb.toString());
msgQueue.add(sb.toString());
} catch (IOException e) {
e.printStackTrace();
key.cancel();
channel.socket().close();
channel.close();
}
iterator.remove();
}
}
//当写就绪并且msgQueue不为空时,将msgQueue中的数据发送给所有写就绪channel
private void writeToChannels() throws IOException {
int count = writeSelector.selectNow();
if(count == 0){
return;
}
Iterator<SelectionKey> iterator = writeSelector.selectedKeys().iterator();
String msg=msgQueue.poll();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(!key.isValid()){
continue;
}
if(msg == null){
iterator.remove();
return;
}
SocketChannel channel = (SocketChannel) key.channel();
byte[] asd=(Thread.currentThread().getName()+msg).getBytes();
ByteBuffer bf=ByteBuffer.allocate(asd.length);
bf.put(asd);
bf.flip();
while(bf.hasRemaining()) {
try {
channel.write(bf);
} catch (IOException e) {
key.cancel();
channel.socket().close();
channel.close();
e.printStackTrace();
}
}
bf.clear();
iterator.remove();
}
}
}