zoukankan      html  css  js  c++  java
  • JavaAIO编程

    Asynchronous IO: 异步非阻塞的编程方式

    与 NIO 不同,当进行读写操作时,只须直接调用 API 的 read 或 write 方法即可。这两种 方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入 read 方 法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将 write 方法传递的流写入完 毕时,操作系统主动通知应用程序。即可以理解为,read/write 方法都是异步的,完成后会 主动调用回调函数。在 JDK1.7 中,这部分内容被称作 NIO.2,主要在 java.nio.channels 包下 增加了下面四个异步通道: 

    AsynchronousSocketChannel 

    AsynchronousServerSocketChannel 

    AsynchronousFileChannel

    AsynchronousDatagramChannel 

    异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的 I/O 请求都是由 OS 先完成了再通知服务器应用去启动线程进行处理。

    AIO 方式使用于连接数目多且连接比较长(重操作)的架构

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.AsynchronousChannelGroup;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class AIOServer {
    	// 线程池, 提高服务端效率。
    		private ExecutorService service;
    		// 线程组
    		// private AsynchronousChannelGroup group;
    		// 服务端通道, 针对服务器端定义的通道。
    		private AsynchronousServerSocketChannel serverChannel;
    		
    		public AIOServer(int port){
    			init(9999);
    		}
    		
    		private void init(int port){
    			try {
    				System.out.println("server starting at port : " + port + " ...");
    				// 定长线程池
    				service = Executors.newFixedThreadPool(4);
    				/* 使用线程组
    				group = AsynchronousChannelGroup.withThreadPool(service);
    				serverChannel = AsynchronousServerSocketChannel.open(group);
    				*/
    				// 开启服务端通道, 通过静态方法创建的。
    				serverChannel = AsynchronousServerSocketChannel.open();
    				// 绑定监听端口, 服务器启动成功,但是未监听请求。
    				serverChannel.bind(new InetSocketAddress(port));
    				System.out.println("server started.");
    				// 开始监听 
    				// accept(T attachment, CompletionHandler<AsynchronousSocketChannel, ? super T>)
    				// AIO开发中,监听是一个类似递归的监听操作。每次监听到客户端请求后,都需要处理逻辑开启下一次的监听。
    				// 下一次的监听,需要服务器的资源继续支持。
    				serverChannel.accept(this, new AIOServerHandler());
    				try {
    					TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		
    		public static void main(String[] args) {
    			new AIOServer(9999);
    		}
    
    		public ExecutorService getService() {
    			return service;
    		}
    
    		public void setService(ExecutorService service) {
    			this.service = service;
    		}
    
    		public AsynchronousServerSocketChannel getServerChannel() {
    			return serverChannel;
    		}
    
    		public void setServerChannel(AsynchronousServerSocketChannel serverChannel) {
    			this.serverChannel = serverChannel;
    		}
    		
    	}
    

      

    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.Scanner;
    import java.util.concurrent.ExecutionException;
    
    public class AIOClient {
    	private AsynchronousSocketChannel channel;
    
    	public AIOClient(String host, int port) {
    		init(host, port);
    	}
    
    	private void init(String host, int port) {
    		try {
    			// 开启通道
    			channel = AsynchronousSocketChannel.open();
    			// 发起请求,建立连接。
    			channel.connect(new InetSocketAddress(host, port));
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void write(String line) {
    		try {
    			ByteBuffer buffer = ByteBuffer.allocate(1024);
    			buffer.put(line.getBytes("UTF-8"));
    			buffer.flip();
    			channel.write(buffer);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void read() {
    		ByteBuffer buffer = ByteBuffer.allocate(1024);
    		try {
    			// read方法是异步方法,OS实现的。get方法是一个阻塞方法,会等待OS处理结束后再返回。
    			channel.read(buffer).get();
    			// channel.read(dst, attachment, handler);
    			buffer.flip();
    			System.out.println("from server : " + new String(buffer.array(), "UTF-8"));
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (ExecutionException e) {
    			e.printStackTrace();
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void doDestory() {
    		if (null != channel) {
    			try {
    				channel.close();
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public static void main(String[] args) {
    		AIOClient client = new AIOClient("localhost", 9999);
    		try {
    			System.out.print("enter message send to server > ");
    			Scanner s = new Scanner(System.in);
    			String line = s.nextLine();
    			client.write(line);
    			client.read();
    		} finally {
    			client.doDestory();
    		}
    	}
    
    }
    

      

    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.Scanner;
    
    public class AIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServer> {
    
    	/**
    	 * 业务处理逻辑, 当请求到来后,监听成功,应该做什么。
    	 * 一定要实现的逻辑: 为下一次客户端请求开启监听。accept方法调用。
    	 * result参数 : 就是和客户端直接建立关联的通道。
    	 *  无论BIO、NIO、AIO中,一旦连接建立,两端是平等的。
    	 *  result中有通道中的所有相关数据。如:OS操作系统准备好的读取数据缓存,或等待返回数据的缓存。
    	 */
    	@Override
    	public void completed(AsynchronousSocketChannel result, AIOServer attachment) {
    		// 处理下一次的客户端请求。类似递归逻辑
    		attachment.getServerChannel().accept(attachment, this);
    		doRead(result);
    	}
    
    	/**
    	 * 异常处理逻辑, 当服务端代码出现异常的时候,做什么事情。
    	 */
    	@Override
    	public void failed(Throwable exc, AIOServer attachment) {
    		exc.printStackTrace();
    	}
    	
    	/**
    	 * 真实项目中,服务器返回的结果应该是根据客户端的请求数据计算得到的。不是等待控制台输入的。
    	 * @param result
    	 */
    	private void doWrite(AsynchronousSocketChannel result){
    		try {
    			ByteBuffer buffer = ByteBuffer.allocate(1024);
    			System.out.print("enter message send to client > ");
    			Scanner s = new Scanner(System.in);
    			String line = s.nextLine();
    			buffer.put(line.getBytes("UTF-8"));
    			// 重点:必须复位,必须复位,必须复位
    			buffer.flip();
    			// write方法是一个异步操作。具体实现由OS实现。 可以增加get方法,实现阻塞,等待OS的写操作结束。
    			result.write(buffer);
    			// result.write(buffer).get(); // 调用get代表服务端线程阻塞,等待写操作完成
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}/* catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (ExecutionException e) {
    			e.printStackTrace();
    		}*/
    	}
    	
    	private void doRead(final AsynchronousSocketChannel channel){
    		ByteBuffer buffer = ByteBuffer.allocate(1024);
    		/*
    		 * 异步读操作, read(Buffer destination, A attachment, 
    		 *                    CompletionHandler<Integer, ? super A> handler)
    		 * destination - 目的地, 是处理客户端传递数据的中转缓存。 可以不使用。
    		 * attachment - 处理客户端传递数据的对象。 通常使用Buffer处理。
    		 * handler - 处理逻辑
    		 */
    		channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
    
    			/**
    			 * 业务逻辑,读取客户端传输数据
    			 * attachment - 在completed方法执行的时候,OS已经将客户端请求的数据写入到Buffer中了。
    			 *  但是未复位(flip)。 使用前一定要复位。
    			 */
    			@Override
    			public void completed(Integer result, ByteBuffer attachment) {
    				try {
    					System.out.println(attachment.capacity());
    					// 复位
    					attachment.flip();
    					System.out.println("from client : " + new String(attachment.array(), "UTF-8"));
    					doWrite(channel);
    				} catch (UnsupportedEncodingException e) {
    					e.printStackTrace();
    				}
    			}
    
    			@Override
    			public void failed(Throwable exc, ByteBuffer attachment) {
    				exc.printStackTrace();
    			}
    		});
    	}
    
    }
    

      

  • 相关阅读:
    tp5 lock的使用
    array_chunk的用法和php操作大数据
    find_in_set使用
    [沈航软工教学] 前六周3,4班排行榜
    [沈航软工教学] 团队项目地址汇总
    [沈航软工教学] 前五周3,4班排行榜
    [Latex]Travis-CI与Latex构建开源中文PDF
    [沈航软工教学] 前三周3,4班排行榜
    [沈航软工教学] 前三周作业总结
    [沈航软工教学] 学生项目Coding地址汇总
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/12248112.html
Copyright © 2011-2022 走看看