Channel可以理解为数据传输的管道。通道可用于读,写或同时用于读写。
I/O可以分为光宇的两大类区别:File I/O和Stream I/O。
public interface ReadableByteChannel extends Channel{ public int read(ByteBuffer dst) throws IOException; } public interface WritableByteChannel extends Channel{ public int write(ByteBuffer src) throws IOException; }
read和write方法接收的都是一个ByteBuffer参数,其中read方法就是向ByteBuffer中put数据,write方法就是讲ByteBuffer中的数据get出来。两方法均返回已传输的字节数,可能比缓冲区的字节数少。缓冲区的position位置会发生于一传输字节相同数量的前移。若只进行了部分传输,缓冲区可以被重新提交给通道并从上次中断的地方继续传输。该过程重复进行直到缓冲区的hasRemaining()方法返回false。
FileInputStream对象的getChannel()方法获取的FileChannel对象是只读的,在该通道上调用write()方法将抛出NonWritableChannelException异常,因为FileInputStream对象总是以Read-Only的权限打开。
通道可以以多种方式创建。Socket通道有可以直接创建新的socket通道的工厂方法。但是一个FileChannel对象却只能通过在一个打开的RandomAccessFile,FileInputStream或FileOutputStream对象上调用getChannel()方法来获取,不能直接传建一个FileChannel对象。
SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("host", port)); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(url)); DatagramChannel dc = DatagramChannel.open(); RandomAccessFile raf = new RandomAccessFile(""fileName", "r"); FileChannel fc = raf.getChannel();
Socket通道
在通道类中,DatagramChannel和SocketChannel实现定义读和写功能的接口而ServerSocketChannel不实现。ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象,它本身从不传输数据。
全部NIO中的socket通道类(DatagramChannel, SocketChannel和ServerSocketChannel)在被实例化时都会创建一个对等的BIO中socket对象(Socket, ServerSocket和DatagramSocket)。DatagramChannel,SocketChannel和ServerSocketChannel通道类都定义了socket()方法,可以通过这个方法获取其关联的socket对象。另外每个Socket,ServerSocket和DatagramChannel都定义了getChannel()方法来获取对应的通道。只有通过通道类创建的socket对象,其getChannel方法才能返回对应的通道,若直接new了socket对象,那么getChannel返回的是null。
非阻塞模式
通道可以以阻塞或非阻塞模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作用么立即完成,要么返回一个结果表明未进行任何操作。
在创建通道时,可以调用configureBlocking(boolean)方法,若参数为true则为阻塞模式,若为false,则为费阻塞模式。isBlocking()方法来判断某个socket通道当前处于哪种模式。使用blockingLock()方法可以阻止通道的阻塞模式被更改,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。拥有此对象的锁的线程才可更改通道的阻塞模式,确保在执行代码的关键部分时socket通道的阻塞模式不会改变以及在不影响其它线程的前提下暂时改变阻塞模式。
Socket socket = null; Object lockObj = serverChannel.blockingLock(); synchronize(lockObj){ boolean prevState = serverChannel.isBlocking(); serverChannel.configureBlocking(false); socket = serverChannel.accept(); serverChannel.configureBlocking(prevState); } if(socket != null){ doSomething(socket); }
ServerSocketChannel
ServerSocketChannel是一个基于通道的socket监听器。用静态的open()工厂方法创建一个新的ServerSocketChannel对象,将会返回一个未绑定的java.net.ServerSocket关联的通道。该对等ServerSocket可以通过在返回的ServerSocketChannel上调用socket()方法来获取。作为ServerSocketChannel的对等体被创建的ServerSocket对象以来通道是实现。这些socket关联的SocketImpl能识别通道。通道不能被封装在随意的socket对象外面。
由于ServerSocketChannel没有bind()方法,因为有必要取出对等的socket并使用它来绑定到一个端口以开始监听连接。同时也使用对等ServerSocket的API来根据需要设置其他的socket选项。
ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket serverSocket = ssc.socket(); serverSocket.bind(new InetSocketAddress(1234));
当在ServerSocket上调用accept()方法,那么它总是阻塞并返回一个java.net.Socket对象。若在ServerSocketChannel上调用accept()方法则会返回SocketChannel类型的对象,返回的对象能够在非阻塞模式下运行。若以非阻塞模式被调用,当没有传入连接在等待时,ServerSocketChannel.accpet()会立即返回null。
public class ChannelAccpet{ public static final String GREETING ="Hello I must be going. "; public static void main(String[] args) throws Exception{ int port = 1234; if(args.length > 0) port = Integer.parseInt(args[0]); ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes()); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); while(true){ System.out.println("Waiting for connections"); SocketChannel sc = ssc.accpet(); if(sc == null) Thread.sleep(2000); else{ sc.configureBlocking(false); ByteBuffer allocate =ByteBuffer.allocateDirect(16 *1024); while(sc.read(allocate) > 0){ allocate.flip(); while(buffer.hasRemaining()){ byte b = buffer.get(); System.out.println(b); } allocate.clear(); } System.out.println("Incoming connection from : " + sc.socket().getRemoteSocketAddress()); buffer.rewind(); sc.write(buffer); sc.close } } } }
SocketChannel
Socket和SocketChannel类封装点对点,有序的网络连接(TCP/IP)。SocketChannel扮演客户端发起同一个监听服务器的连接,直到连接成功,它才能收到数据并且只会从连接到的地址接收。每个SocketChannel对象创建时都是同一个对等的java.net.Socket对象串联。open()方法可以创建一个新的SocketChannel对象,而在新创建的SocketChannel上调用socket()方法能返回它对等的Socket对象;在该Socket上调用getChannel()方法则能返回最初的那个SocketChannel。新创建的SocketChannel虽已打开却是未连接的。在一个未连接的SocketChannel对象上尝试一个I/O操作会导致NotYetConnectedExceptin异常。可以通过在通道上直接调用connect()方法或在通道关联的Socket对象上调用connect()来将该socket通道连接。一旦一个socket通道被连接,它将保持连接状态直到关闭。可以通过调用isConnected()方法来测试某个SocketChannel当前是否已连接。
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddrss("host", port)); SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("host", port));
通过在对等Socket对象上调用connect()方法,线程在连接建立好或超时过期之前都将保持阻塞。connect()方法在非阻塞模式下被调用时SockeChannel提供并发链接,它发起对请求地址的连接并立即返回值。若返回值是true,说明连接立即建立;拖链接不能立即建立则返回false且并发地继续连接建立过程。面向流的socket建立连接状态需要一定的时间,因为两个带连接系统之间必须进行包对话以建立流socket所需的装填信息。若某个SocketChannel上当前正由一个并发连接,isConnectPending()方法就会返回true值。调用finishConnect()方法来完成连接的过程,该方法任何时候都可以安全地进行调用。若一个非阻塞模式的SocketChannel对象上调用finishConnect()方法,将可能出现以下情形:
connect()方法尚未被调用。则将产生NoConnectionPendingException异常
连接建立过程正在进行,尚未完成。则什么都不会发生,finishConnect()方法返回false
在非阻塞模式下调用connect()方法中,SocketChannel又被切换回了阻塞模式。则有必要的话,调用线程会则会直到连接建立完成,finishConnect()方法则会返回true值
在初次调用connect()或最后一次调用finishConnect()之后,连接建立过程已经完成。则socketChannel对象的内部状态将被更新到已连接状态,finishConnect()方法会返回true值,然后SocketChannel对象就可以被用来传输数据了
连接已建立,则什么都不会发生,finishConnect()方法返回true值
当通道处于中间的连接等到状态时,只可以调用finishConnect(),isConnectPending()或isConnected()方法。一旦连接建立过程成功完成,isConnected()将返回true。
InetSocketAddress addr = new InetSocketAddress(host, port); SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); sc.connect(addr); while(!sc.finishConnect()) doSomething(); doSomethingWithChannel(sc); sc.close()
使用非阻塞模式的Channel实现TimeServer,TimeClient
public class TimeServer{ public static ExecutorService executor; static{ executors = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000)); } public static void main(String[] args) throws IOException, InterruptedException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(8080)); while(true){ SocketChannel socketChannel = ssc.accept(); if(socketChannel == null) continue; else{ socketChannel.configureBlocing(false); executor.sumbit(new TimeServerHandleTask(socketChannel, executor)); } } } } public class TimeServerHandleTask implements Runnable{ SocketChannel socketChannel; ExecutorService executorService; ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); public TimeServerHandleTask(SocketChannel socketChannel, ExecutorService executorService) { this.socketChannel = socketChannel; this.executorService = executorService; } public void run(){ try{ if(socketChannel.read(byteBuffer) > 0){ while(true){ byteBuffer.flip(); if(byteBuffer.remaining() < "GET CURRENT TIME"){ byteBuffer.compact(); socketChannel.read(byteBuffer); continue; } byte[] request = new byte[byteBuffer.remaining()]; byteBuffer.get(request); String requestStr = new String(request); byteBuffer.clear(); if(!"GET CURRENT TIME".equals(requestStr)) socketChannel.write(byteBuffer.put("BAD_REQUEST".getBytes())); else { ByteBuffer byteBuffer = this.byteBuffer,out(Calendar.getInstance().getTime().toLocaleString().getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); } } } TimeServerHandleTask currentTask = new TimeServerHandleTask(socketChannel, executorService); executorServce.submit(currentTask); }catch(Exception e){ e.printStackTrace(); } } }
在结合线程池的时候,编写的任务不能再针对每个client,而是针对每个client的每次请求。因为线程的数量有限,所以要合理的分配,不能让一个线程一直处理一个client的请求。
public class TimeClinet{ static int connectTimeOut = 3000; static ByteBuffer buffer = ByteBuffer.allocateDirect(1024); public static void main(String[] args) throws IOException, InterruptedException{ SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080)); socketChannel.configureBlocking(false); long start = System.currentTimeMillis(); while(!socketChannel.finishConnect()){ if(System.currentTImeMillis() - start >= connectTimeOut) throw new RuntimeException(""尝试建立连接超过3s"); } while(true){ buffer.put("GET CURRENT TIME".getBytes()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); } Thread.sleep(5000); } }