zoukankan      html  css  js  c++  java
  • Channel

    在BIO编程中,每一个客户端连接请求过来,对于输入流,必须有单独的线程监听,看是否有数据到达,对于输出流,可以采用一个线程池管理,这样服务端的线程数量至少为n

    下面例子是NIO中采用Channel+线程池方式,有个缺点是不知道SocketChannel是否有数据到达了,必须迭代所有的SocketChannel,如果有数据到达,有就处理,否则就跳过,效率太低

    public class TimeServer {
        private  BlockingQueue<SocketChannel> idleQueue =new LinkedBlockingQueue<SocketChannel>();
        private  BlockingQueue<Future<SocketChannel>> workingQueue=new LinkedBlockingQueue<Future<SocketChannel>>();
        private  ExecutorService executor = Executors.newSingleThreadExecutor();
       
         {
            new Thread(){
                @Override
                public void run() {
                try {
                   while (true) {
                            //task1:迭代当前idleQueue中的SocketChannel,提交到线程池中执行任务,并将其移到workingQueue中
                            for (int i = 0; i < idleQueue.size(); i++) {
                                SocketChannel socketChannel = idleQueue.poll();
                                if (socketChannel != null) {
                                    Future<SocketChannel> result = executor.submit(new TimeServerHandleTask(socketChannel), socketChannel);
                                    workingQueue.put(result);
                                }
                            }
                            //task2:迭代当前workingQueue中的SocketChannel,如果任务执行完成,将其移到idleQueue中
                            for (int i = 0; i < workingQueue.size(); i++) {
                                Future<SocketChannel> future = workingQueue.poll();
                                if (!future.isDone()){
                                    workingQueue.put(future);
                                    continue;
                                }
                                 SocketChannel channel  = null;
                                 channel = future.get();
                                 idleQueue.put(channel);
                            }
                        }
                } catch (Exception e) {
                   e.printStackTrace();
                }
                }
            }.start();
        }
         
         /**
          * 1、在main线程中,当接受到一个新的连接时,我们将相应的SocketChannel放入idleQueue
          * 
          * 
          */
         
        public static void main(String[] args) throws IOException, InterruptedException {
            TimeServer timeServer = new TimeServer();
            ServerSocketChannel ssc=ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.socket().bind(new InetSocketAddress(8080));
            while (true){
                SocketChannel socketChannel = ssc.accept();
                if(socketChannel==null){
                    continue;
                }else{
                    socketChannel.configureBlocking(false);
                    timeServer.idleQueue.add(socketChannel);
                }
            }
        }
    }
    public class TimeServerHandleTask implements Runnable {
         SocketChannel socketChannel;
           public TimeServerHandleTask(SocketChannel socketChannel) {
              this.socketChannel = socketChannel;
           }
           @Override
           public void run() {
              try {
                 ByteBuffer requestBuffer = ByteBuffer.allocate("GET CURRENT TIME".length());
                 //尝试读取数据,因为是非阻塞,所以如果没有数据会立即返回。
                 int bytesRead = socketChannel.read(requestBuffer);
                //如果没有读取到数据,说明当前SocketChannel并没有接受到数据,不需要处理
                 if (bytesRead <= 0) {
                    return;
                 }
                 //如果读取到了数据,则需要考虑粘包、解包问题,这个while代码是为了读取一个完整的请求信息"GET CURRENT TIME",
                 while (requestBuffer.hasRemaining()) {
                    socketChannel.read(requestBuffer);
                 }
                 String requestStr = new String(requestBuffer.array());
                 if (!"GET CURRENT TIME".equals(requestStr)) {
                    String bad_request = "BAD_REQUEST";
                    ByteBuffer responseBuffer = ByteBuffer.allocate(bad_request.length());
                        responseBuffer.put(bad_request.getBytes());
                        responseBuffer.flip();
                        socketChannel.write(responseBuffer);
                 } else {
                        String timeStr = Calendar.getInstance().getTime().toLocaleString();
                        ByteBuffer responseBuffer = ByteBuffer.allocate(timeStr.length());
                        responseBuffer.put(timeStr.getBytes());
                        responseBuffer.flip();
                        socketChannel.write(responseBuffer);
                 }
              } catch (Exception e) {
                 throw new RuntimeException(e);
              }
           }
        
    }
    public class TimeClient {
    
        // 连接超时时间
        static int connectTimeOut = 3000;
        static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    
        public static void main(String[] args) throws IOException, InterruptedException {
            SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080));
            socketChannel.configureBlocking(false);
            long start = System.currentTimeMillis();
            while (!socketChannel.finishConnect()) {
                if (System.currentTimeMillis() - start >= connectTimeOut) {
                    throw new RuntimeException("尝试建立连接超过3秒");
                }
            }
            // 如果走到这一步,说明连接建立成功
            while (true) {
                buffer.put("GET CURRENT TIME".getBytes());
                buffer.flip();
                socketChannel.write(buffer);
                buffer.clear();
                if (socketChannel.read(buffer) > 0) {
                    buffer.flip();
                    byte[] response = new byte[buffer.remaining()];
                    buffer.get(response);
                    System.out.println("reveive response:" + new String(response));
                    buffer.clear();
                }
                Thread.sleep(5000);
            }
    
        }
    }

     Selector   只用一个线程池

    public class TimeServer {
    
        private static ExecutorService   executor;
        
        static {
            executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
        }
    
        public static void main(String[] args) throws IOException {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress(8080));
            ssc.configureBlocking(false);
            Selector selector = Selector.open();
            ssc.register(selector, ssc.validOps());
    
            while (true) {
                int readyCount = selector.select(1000);
                if (readyCount == 0) {
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey selectionKey = keyIterator.next();
                    if (selectionKey.isValid()) {
                        // 表示ServerSocketChannel
                        if (selectionKey.isAcceptable()) {
                            ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                            SocketChannel socketChannel = server.accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        }
    
                        // 表示SocketChannel
                        if (selectionKey.isReadable()) {
                            executor.submit(new TimeServerTask(selectionKey));
                        }
                        keyIterator.remove();
                    }
                }
            }
        }
    }
    public class TimeServerTask implements Runnable{
          private SelectionKey selectionKey;
          
            public TimeServerTask(SelectionKey selectionKey) {
                this.selectionKey = selectionKey;
            }
         
            @Override
            public void run() {
                SocketChannel channel = (SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                try {
                    int count =0;
                    while ((count = channel.read(byteBuffer)) > 0) {
                        byteBuffer.flip();
                        byte[] request=new byte[byteBuffer.remaining()];
                        byteBuffer.get(request);
                        String requestStr=new String(request);
                        byteBuffer.clear();
                        if (!"GET CURRENT TIME".equals(requestStr)) {
                            channel.write(byteBuffer.put("BAD_REQUEST".getBytes()));
                        } else {
                            byteBuffer.put(Calendar.getInstance().getTime().toLocaleString().getBytes());
                            byteBuffer.flip();
                            channel.write(byteBuffer);
                        }
                    }
         
                } catch (IOException e) {
                    e.printStackTrace();
                    selectionKey.cancel();
                }
            }
    }
  • 相关阅读:
    kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found 解决方案
    查看运行的容器完整命令
    python 生成requirements.txt文件
    解决matplotlib 中文乱码
    django 显示本地图片
    JniLibs和Jni的区别
    Error:No such property: GradleVersion for class: JetGradlePlugin
    Flutter 导包或者其他三方工程的时候出现Flutter 导包错误 Target of URI doesn't exist
    Flutter工程泡在真机上时,xocde提示 codesign 想要访问你的钥匙串中的秘钥
    Flutter 工程报错 Failed to create provisioning profile.
  • 原文地址:https://www.cnblogs.com/moris5013/p/11350474.html
Copyright © 2011-2022 走看看