小量的线程如何同时为大量连接服务呢,答案就是就绪选择。这就好比到餐厅吃饭,每来一桌客人,都有一个服务员专门为你服务,从你到餐厅到结帐走人,这样方式的好处是服务质量好,一对一的服务,VIP啊,可是缺点也很明显,成本高,如果餐厅生意好,同时来100桌客人,就需要100个服务员,那老板发工资的时候得心痛死了,这就是传统的一个连接一个线程的方式。
老板是什么人啊,精着呢。这老板就得捉摸怎么能用10个服务员同时为100桌客人服务呢,老板就发现,服务员在为客人服务的过程中并不是一直都忙着,客人点完菜,上完菜,吃着的这段时间,服务员就闲下来了,可是这个服务员还是被这桌客人占用着,不能为别的客人服务,用华为领导的话说,就是工作不饱满。那怎么把这段闲着的时间利用起来呢。这餐厅老板就想了一个办法,让一个服务员(前台)专门负责收集客人的需求,登记下来,比如有客人进来了、客人点菜了,客人要结帐了,都先记录下来按顺序排好。每个服务员到这里领一个需求,比如点菜,就拿着菜单帮客人点菜去了。点好菜以后,服务员马上回来,领取下一个需求,继续为别人客人服务去了。这种方式服务质量就不如一对一的服务了,当客人数据很多的时候可能需要等待。但好处也很明显,由于在客人正吃饭着的时候服务员不用闲着了,服务员这个时间内可以为其他客人服务了,原来10个服务员最多同时为10桌客人服务,现在可能为50桌,60客人服务了。
这种服务方式跟传统的区别有两个:
1、增加了一个角色,要有一个专门负责收集客人需求的人。NIO里对应的就是Selector。
2、由阻塞服务方式改为非阻塞服务了,客人吃着的时候服务员不用一直侯在客人旁边了。传统的IO操作,比如read(),当没有数据可读的时候,线程一直阻塞被占用,直到数据到来。NIO中没有数据可读时,read()会立即返回0,线程不会阻塞。
NIO中,客户端创建一个连接后,先要将连接注册到Selector,相当于客人进入餐厅后,告诉前台你要用餐,前台会告诉你你的桌号是几号,然后你就可能到那张桌子坐下了,SelectionKey就是桌号。当某一桌需要服务时,前台就记录哪一桌需要什么服务,比如1号桌要点菜,2号桌要结帐,服务员从前台取一条记录,根据记录提供服务,完了再来取下一条。这样服务的时间就被最有效的利用起来了。
内容来自 :http://blog.csdn.net/zhouhl_cn/article/details/6568119
简介
Java世界中的两类IO:IO(性能瓶颈)和NIO以及jdk1.7中要加入的增强版NIO
位置被设为 0,而且容量和上界被设为 10,刚好经过缓冲区能够容纳的最后一个字节。标记最初未定义。容量是固定的,但另外的三个属性可以在使用缓冲区时改变
- public abstract class Buffer {//没有get(),put()方法,但是子类中有
- public final int capacity()
- public final int position()
- public final Buffer position(int newPositio) //定位位置
- public final int limit()
- public final Buffer limit (int newLimit)//定位上界
- public final Buffer mark() //将标记设为当前位置的值
- public final Buffer reset() //将位置设为当前的标记值,如果标记值未定义将抛出异常
- public final Buffer clear() //重置方法,将缓冲区置为填充状态即将position设置为 0。将 limit 设置为与 capacity 相同。
- public final Buffer flip() //翻转方法,将缓冲区填充状态翻转成释放状态即 将 limit 设置为当前 position。将 position 设置为 0。实现细:buffer.limit(buffer.position()).position(0);
- public final Buffer rewind()//不影响上界属性。只是将位置值设回 0
- public final int remaining() //从当前位置到上界还剩余的元素数目
- public final boolean hasRemaining()//是否已经达到缓冲区的上界
- public abstract boolean isReadOnly(); //判断缓冲区是否仅可读,修改只读缓冲区将会抛出异常
- }
public abstract class Buffer {//没有get(),put()方法,但是子类中有
public final int capacity()
public final int position()
public final Buffer position(int newPositio) //定位位置
public final int limit()
public final Buffer limit (int newLimit)//定位上界
public final Buffer mark() //将标记设为当前位置的值
public final Buffer reset() //将位置设为当前的标记值,如果标记值未定义将抛出异常
public final Buffer clear() //重置方法,将缓冲区置为填充状态即将position设置为 0。将 limit 设置为与 capacity 相同。
public final Buffer flip() //翻转方法,将缓冲区填充状态翻转成释放状态即 将 limit 设置为当前 position。将 position 设置为 0。实现细:buffer.limit(buffer.position()).position(0);
public final Buffer rewind()//不影响上界属性。只是将位置值设回 0
public final int remaining() //从当前位置到上界还剩余的元素数目
public final boolean hasRemaining()//是否已经达到缓冲区的上界
public abstract boolean isReadOnly(); //判断缓冲区是否仅可读,修改只读缓冲区将会抛出异常
}
应用实例——复制文件的操作:
2、文件锁定功能
3、网络异步IO
socket 的阻塞模式意味着必须要做完IO 操作(包括错误)才会返回。
非阻塞模式下无论操作是否完成都会立刻返回,需要通过其他方式来判断具体操作是否成功。
•异步IO:
1 等待数据准备
2 将数据从内核拷贝到进程中
A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆;
B的鱼竿有个功能,能够显示是否有鱼上钩,所以呢,B就和旁边的MM聊天,隔会再看看有没有鱼上钩,有的话就迅速拉杆;
C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来;
D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信
- public abstract class SelectableChannel extends AbstractChannel implements Channel{
- public abstract SelectionKey register (Selector sel, int ops) throws ClosedChannelException;//将可选择通道注册到选择器上,返回表示二者关系的SelectionKey 对象,如果选择器关闭或者通道为阻塞模式则会抛出异常,第二个参数表示所关心的通道操作,在JDK 1.4中,有四种被定义的可选择操作:读(read),写(write),连接(connect)和接受(accept)。
- public abstract boolean ( ) isRegistered( );//判断是否注册到选择器上
- public abstract SelectionKey key isRegisteredFor (Selector sel);//判断是否注册到特定的选择器上
- public abstract int validOps( );//获取特定的通道所支持的操作集合
- public abstract void configureBlocking (boolean block) throws IOException;
- public abstract boolean isBlocking( );//来配置并检查通道的阻塞模式
- public abstract Object blockingLock( );
- }
public abstract class SelectableChannel extends AbstractChannel implements Channel{
public abstract SelectionKey register (Selector sel, int ops) throws ClosedChannelException;//将可选择通道注册到选择器上,返回表示二者关系的SelectionKey 对象,如果选择器关闭或者通道为阻塞模式则会抛出异常,第二个参数表示所关心的通道操作,在JDK 1.4中,有四种被定义的可选择操作:读(read),写(write),连接(connect)和接受(accept)。
public abstract boolean ( ) isRegistered( );//判断是否注册到选择器上
public abstract SelectionKey key isRegisteredFor (Selector sel);//判断是否注册到特定的选择器上
public abstract int validOps( );//获取特定的通道所支持的操作集合
public abstract void configureBlocking (boolean block) throws IOException;
public abstract boolean isBlocking( );//来配置并检查通道的阻塞模式
public abstract Object blockingLock( );
}
- <pre class=“java” name=“code”>public abstract class Selector{
- public static Selector open() throws IOException;静态工厂方法来实例化Selector 对象
- public abstract boolean isOpen( );
- public abstract void close( ) throws IOException; 释放资源和设置选择键无效
- public abstract SelectionProvider provider( ); SelectionProvider对象用于创建一个Selector对象
- public abstract int select( ) throws IOException;
- public abstract int select (long timeout) throws IOException;是阻塞的,有三种条件可以停止阻塞:1)至少存在一条通道是ready I/O的;2)等待超时;3)被唤醒,如被调用wakeup。返回值即为ready I/O的通道数量
- public abstract int selectNow( ) throws IOException;是非阻塞的,返回值即为ready I/O的通道数量,无ready则返回0
- public abstract void wakeup( );
- public abstract Set keys( ); //返回已注册的键的集合
- public abstract Set selectedKeys( );//返回已选择的键的集合
- }
- </pre>
- <pre></pre>
- <pre></pre>
- public abstract class Selector{
- public static Selector open() throws IOException;静态工厂方法来实例化Selector 对象
- public abstract boolean isOpen( );
- public abstract void close( ) throws IOException; 释放资源和设置选择键无效
- public abstract SelectionProvider provider( ); SelectionProvider对象用于创建一个Selector对象
- public abstract int select( ) throws IOException;
- public abstract int select (long timeout) throws IOException;是阻塞的,有三种条件可以停止阻塞:1)至少存在一条通道是ready I/O的;2)等待超时;3)被唤醒,如被调用wakeup。返回值即为ready I/O的通道数量
- public abstract int selectNow( ) throws IOException;是非阻塞的,返回值即为ready I/O的通道数量,无ready则返回0
- public abstract void wakeup( );
- public abstract Set keys( ); //返回已注册的键的集合
- public abstract Set selectedKeys( );//返回已选择的键的集合
- }
public abstract class Selector{ public static Selector open() throws IOException;静态工厂方法来实例化Selector 对象 public abstract boolean isOpen( ); public abstract void close( ) throws IOException; 释放资源和设置选择键无效 public abstract SelectionProvider provider( ); SelectionProvider对象用于创建一个Selector对象 public abstract int select( ) throws IOException; public abstract int select (long timeout) throws IOException;是阻塞的,有三种条件可以停止阻塞:1)至少存在一条通道是ready I/O的;2)等待超时;3)被唤醒,如被调用wakeup。返回值即为ready I/O的通道数量 public abstract int selectNow( ) throws IOException;是非阻塞的,返回值即为ready I/O的通道数量,无ready则返回0 public abstract void wakeup( ); public abstract Set keys( ); //返回已注册的键的集合 public abstract Set selectedKeys( );//返回已选择的键的集合 }
内容转自http://blog.csdn.net/rogerjava/article/details/7343951
下面给出简单 c/s模式的例子
服务器端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class Server {
private Selector selector;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);//调整缓存的大小可以看到打印输出的变化
private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);//调整缓存的大小可以看到打印输出的变化
String str;
public void start() throws IOException {
// 打开服务器套接字通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 服务器配置为非阻塞
ssc.configureBlocking(false);
// 进行服务的绑定
ssc.bind(new InetSocketAddress("localhost", 8001));
// 通过open()方法找到Selector
selector = Selector.open();
// 注册到selector,等待连接
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
keyIterator.remove(); //该事件已经处理,可以丢弃
}
}
}
private void write(SelectionKey key) throws IOException, ClosedChannelException {
SocketChannel channel = (SocketChannel) key.channel();
System.out.println("write:"+str);
sendBuffer.clear();
sendBuffer.put(str.getBytes());
sendBuffer.flip();
channel.write(sendBuffer);
channel.register(selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
this.readBuffer.clear();
// readBuffer.flip();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(this.readBuffer);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
str = new String(readBuffer.array(), 0, numRead);
System.out.println(str);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = ssc.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("a new client connected "+clientChannel.getRemoteAddress());
}
public static void main(String[] args) throws IOException {
System.out.println("server started...");
new Server().start();
}
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class Client {
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
public void start() throws IOException {
// 打开socket通道
SocketChannel sc = SocketChannel.open();
//设置为非阻塞
sc.configureBlocking(false);
//连接服务器地址和端口
sc.connect(new InetSocketAddress("localhost", 8001));
//打开选择器
Selector selector = Selector.open();
//注册连接服务器socket的动作
sc.register(selector, SelectionKey.OP_CONNECT);
Scanner scanner = new Scanner(System.in);
while (true) {
//选择一组键,其相应的通道已为 I/O 操作准备就绪。
//此方法执行处于阻塞模式的选择操作。
selector.select();
//返回此选择器的已选择键集。
Set<SelectionKey> keys = selector.selectedKeys();
System.out.println("keys=" + keys.size());
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
// 判断此通道上是否正在进行连接操作。
if (key.isConnectable()) {
sc.finishConnect();
sc.register(selector, SelectionKey.OP_WRITE);
System.out.println("server connected...");
break;
} else if (key.isWritable()) { //写数据
System.out.print("please input message:");
String message = scanner.nextLine();
//ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());
writeBuffer.clear();
writeBuffer.put(message.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
writeBuffer.flip();
sc.write(writeBuffer);
//注册写操作,每个chanel只能注册一个操作,最后注册的一个生效
//如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来
//int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
//使用interest集合
sc.register(selector, SelectionKey.OP_READ);
sc.register(selector, SelectionKey.OP_WRITE);
sc.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()){//读取数据
System.out.print("receive message:");
SocketChannel client = (SocketChannel) key.channel();
//将缓冲区清空以备下次读取
readBuffer.clear();
int num = client.read(readBuffer);
System.out.println(new String(readBuffer.array(),0, num));
//注册读操作,下一次读取
sc.register(selector, SelectionKey.OP_WRITE);
}
}
}
}
public static void main(String[] args) throws IOException {
new Client().start();
}
}