zoukankan      html  css  js  c++  java
  • Netty权威指南之AIO编程

    由JDK1.7提供的NIO2.0新增了异步的套接字通道,它是真正的异步I/O,在异步I/O操作的时候可以传递信号变量,当操作完成后会回调相关的方法,异步I/o也被称为AIO,对应于UNIX网络编程中的事件驱动I/O;不再需要通过多路复用器(Selector)对注册的通道进行轮询操作就可以实现异步读写

    package com.hjp.netty.aio;
    
    import java.io.IOException;
    
    public class TimeServer {
    
        public static void main(String[] args)throws IOException{
            int port=8080;
            if (args!=null&&args.length>0){
               try {
                   port=Integer.valueOf(args[0]);
               }catch (NumberFormatException e){
    
               }
            }
            AsyncTimeServerHandler timeServerHandler=new AsyncTimeServerHandler(port);
            new Thread(timeServerHandler,"AIOServer").start();
        }
    
    }
    TimeServer
    package com.hjp.netty.aio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * Created by JiaPeng on 2017/7/24.
     */
    public class AsyncTimeServerHandler implements Runnable {
    
        private int port;
    
        CountDownLatch latch;
        AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    
        public AsyncTimeServerHandler(int port) {
            this.port = port;
            try {
                asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
                //绑定监听端口
                asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
                System.out.println("The time server is start in port : " + port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    
        @Override
        public void run() {
            //CountDownLatch作用是完成一组正在执行的操作之前,允许当前的线程一直阻塞,
            //实际项目中不需要独立启动一个线程来处理的
            latch=new CountDownLatch(1);
            doAccept();
            try{
                latch.await();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    
        public void doAccept(){
            asynchronousServerSocketChannel.accept(this,new AcceptCompletionHandler());
        }
    }
    AsyncTimeServerHandler
    package com.hjp.netty.aio;
    
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    
    public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {
        @Override
        public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
            //再次让asynchronousServerSocketChannel对象调用accept方法是因为:
            //调用AsynchronousServerSocketChannel的accept方法后,如果有新的客户端接入,
            // 系统将回调我们传入的CompletionHandler实例的completed方法,表示新客户端连接成功。
            // 因为AsynchronousServerSocketChannel可以接受成千上万个客户端,所以需要继续调用它的accept方法,
            // 接受其他客户端连接,最终形成一个环;每当一个客户端连接成功后,再异步接受新的客户端连接
            attachment.asynchronousServerSocketChannel.accept(attachment,this);
            ByteBuffer readBuffer=ByteBuffer.allocate(1024);
            result.read(readBuffer,readBuffer,new ReadCompletionHandler(result));
        }
    
        @Override
        public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
            exc.printStackTrace();
            attachment.latch.countDown();
        }
    }
    AcceptCompletionHandler
    package com.hjp.netty.aio;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.Date;
    
    public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    
        private AsynchronousSocketChannel socketChannel;
    
        public ReadCompletionHandler(AsynchronousSocketChannel socketChannel) {
            if (this.socketChannel == null) {
                this.socketChannel = socketChannel;
            }
        }
    
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            byte[] body = new byte[attachment.remaining()];
            attachment.get(body);
            try {
                String request = new String(body, "UTF-8");
                System.out.println("The time server receive order : " + request);
                String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(request) ? new Date().toString() : "BAD ORDER";
                doWrite(currentTime);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    
        private void doWrite(String currentTime) {
            if (currentTime != null && currentTime.trim().length() > 0) {
                byte[] bytes = currentTime.getBytes();
                final ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                socketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        //如果没有发送完继续发送
                        if (attachment.hasRemaining()) {
                            socketChannel.write(attachment, attachment, this);
                        }
                    }
    
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        try {
                            socketChannel.close();
                        } catch (IOException e) {
    
                        }
                    }
                });
            }
        }
    
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                socketChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    ReadCompletionHandler
    package com.hjp.netty.aio;
    
    public class TimeClient {
    
    
        public static void main(String[] args){
            int port=8080;
            if(args!=null&&args.length>0){
                try {
                    port=Integer.valueOf(args[0]);
                }catch (NumberFormatException e){
    
                }
            }
            new Thread(new AsyncTimeClientHandler("127.0.0.1",port),"AIOClient").start();
        }
    
    }
    TimeClient
    package com.hjp.netty.aio;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.CountDownLatch;
    
    public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
    
        private AsynchronousSocketChannel socketChannel;
        private String host;
        private int port;
        private CountDownLatch latch;
    
        public AsyncTimeClientHandler(String host,int port){
            this.host=host;
            this.port=port;
            try {
                socketChannel=AsynchronousSocketChannel.open();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            latch=new CountDownLatch(1);
            socketChannel.connect(new InetSocketAddress(host,port),this,this);
            try {
                latch.await();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            try {
                socketChannel.close();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    
        @Override
        public void completed(Void result, AsyncTimeClientHandler attachment) {
            byte[] request="QUERY TIME ORDER".getBytes();
            ByteBuffer writeBuffer=ByteBuffer.allocate(request.length);
            writeBuffer.put(request);
            writeBuffer.flip();
            socketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    if (attachment.hasRemaining()){
                        socketChannel.write(attachment,attachment,this);
                    }else {
                        ByteBuffer readBuffer=ByteBuffer.allocate(1024);
                        socketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                            @Override
                            public void completed(Integer result, ByteBuffer attachment) {
                                attachment.flip();
                                byte[] bytes=new byte[attachment.remaining()];
                                attachment.get(bytes);
                                try {
                                    String body=new String(bytes,"UTF-8");
                                    System.out.println("Now is "+body);
                                    latch.countDown();
                                }catch (UnsupportedEncodingException e){
                                    e.printStackTrace();
                                }
    
                            }
    
                            @Override
                            public void failed(Throwable exc, ByteBuffer attachment) {
                                try {
                                    socketChannel.close();
                                    latch.countDown();
                                }catch (IOException e){
    
                                }
                            }
                        });
                    }
                }
    
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        socketChannel.close();
                        latch.countDown();
                    }catch (IOException e){
    
                    }
                }
            });
        }
    
        @Override
        public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
            exc.printStackTrace();
            try {
                socketChannel.close();
                latch.countDown();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
    AsyncTimeClientHandler
  • 相关阅读:
    vscode 自动换行 关闭
    QGIS geojson 矢量操作 稀释 抽稀 压缩 边界抽稀
    vscode 关闭烦人的 tooltip 提示
    sql函数汉字转拼音
    sql 排名函数
    客户端文件下载 download.js
    Oracle 创建 DBLink 的方法
    RDLC分组序号
    sql server中使用函数验证身份证号码是否合法
    oracle导出数据库exp时报错,ORA12154 TNS: 无法解析指定的连接标识符
  • 原文地址:https://www.cnblogs.com/hujiapeng/p/7233760.html
Copyright © 2011-2022 走看看