zoukankan      html  css  js  c++  java
  • NIO学习:异步IO实例

    工作模式:

    客户端代码:

    package demos.nio.socketChannel;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.Set;
    
    import org.apache.log4j.Logger;
    
    /**
     * 非阻塞 Socket 客户端
     * 通过一个线程监听管理所有通道
     * 
     */
    public class Client {
        private Logger logger=Logger.getLogger(Client.class);
        /** * 服务器Ip */
        private String ip;
        /** * 服务器端口 */
        private int port;
        /** * 控制是否监听通道事件 */
        private volatile boolean isListenable;
        /** * 缓冲区大小 */
        private final int bufferSize = 1024;
        /** * 选择器每次阻塞监听的最大时间 */
        private final int selectorTime = 1000;
        /** * 创建Selector来管理通道事件 */
        private Selector selector;
    
        public Client(String ip, int port) {
            this.ip = ip;
            this.port = port;
            // 监听器
            try {
                selector = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void send(String msg) {
            send(msg.getBytes());
        }
    
        /**
         * 发送数据
         * 
         * @param data
         */
        public void send(byte[] data) {
            try {
                // 打开一个网络通道
                SocketChannel socketChannel = SocketChannel.open();
                // 设置通道为非阻塞
                socketChannel.configureBlocking(false);
                // 注册管道事件,监听连接成功
                SelectionKey key = socketChannel.register(selector,
                        SelectionKey.OP_CONNECT);
                // 将发送数据附加在SelectionKey上
                key.attach(ByteBuffer.wrap(data));
                // 建立连接
                socketChannel.connect(new InetSocketAddress(ip, port));
                
                //当第一个通道被注册到Selector上时,开启守护线程开始监听通道的事件
                if (!isListenable&&selector.keys().size() == 1) {
                    //开启监听
                    isListenable = true;
                    // 开一个线程监听所有通道的事件
                    Thread thread = new Thread(this.new SelectionTask());
                    thread.start();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 往通道中写入数据
         * 当通道为非阻塞时它都是可写的,所以如果需要写数据,则注册监听写事件即可
         * @param selectionKey
         */
        private void writeData(SelectionKey selectionKey) {
            selectionKey.interestOps(selectionKey.interestOps()
                    | SelectionKey.OP_WRITE);
            selectionKey.selector().wakeup();
        }
    
        public void closeListen() {
            logger.debug("关闭监听");
            this.isListenable = false;
            this.selector.wakeup();
        }
    
        /**
         * 判断是否继续监听
         * 如果selector中没有可监听的通道,则取消监听
         * @return
         */
        private boolean isListen() {
            return this.isListenable && (this.selector.keys().size() > 0);
        }
    
        /**
         * 监听任务
         * 
         * @author root
         * 
         */
        class SelectionTask implements Runnable {
    
            /**
             * 处理监听到的事件
             * 
             * @param selectionKey
             * @throws IOException
             */
            private void handleSelectionKey(SelectionKey selectionKey)
                    throws IOException {
                /** * 缓冲区 */
                ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
                SocketChannel channel = (SocketChannel) selectionKey.channel();
                if (!selectionKey.isValid()) {
                    return;
                }
                if (selectionKey.isConnectable()) {
                    if (!channel.isConnectionPending()) {
                        return;
                    }
                    channel.finishConnect();
                    logger.debug("与服务器连接成功");
                    // 连接成功后开始写数据
                    writeData(selectionKey);
                } else if (selectionKey.isReadable()) {
                    //循环把接受到的数据写入到内存中
                    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                    byteBuffer.clear();
                    while (channel.read(byteBuffer) > 0) {
                        byteBuffer.flip();
                        byte[] b = Arrays.copyOf(byteBuffer.array(), byteBuffer
                                .limit());
                        outputStream.write(b);
                        byteBuffer.clear();
                    }
                    logger.debug("客户端收到信息:"
                            + new String(outputStream.toByteArray()));
                    // 使Selector注销对该Channel的监听
                    selectionKey.cancel();
                } else if (selectionKey.isWritable()) {
                    logger.debug("写出数据");
                    ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                    if (buffer == null) {
                        return;
                    }
                    while (buffer.hasRemaining()) {
                        channel.write(buffer);
                    }
                    selectionKey.interestOps(SelectionKey.OP_READ);
                }
            }
    
            @Override
            public void run() {
                try {
                    // 控制是否监听
                    while (isListen()) {
                        //判断是否监听到了感兴趣的事件
                        if (selector.select(selectorTime) <= 0) {
                            continue;
                        }
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            handleSelectionKey(iterator.next());
                            //处理完selectionKey后需要移除它
                            iterator.remove();
                        }
                    }
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            Client socket = new Client("127.0.0.1", 8686);
            socket.send("hello");
        }
    }
  • 相关阅读:
    编译debian内核
    mini2440 最小根文件系统制作和nfs启动
    mini2440 uboot烧写uImage
    51单片机串口烧写故障
    uboot 2013.01 代码简析(3)第二阶段初始化
    uboot 2013.01 代码简析(2)第一阶段初始化
    uboot 2013.01 代码简析(1)开发板配置
    uboot 2013.01 s3c6400编译失败
    Shiro
    Shiro
  • 原文地址:https://www.cnblogs.com/DajiangDev/p/3935160.html
Copyright © 2011-2022 走看看