zoukankan      html  css  js  c++  java
  • Channel

    在BIO编程中,每一个客户端连接请求过来,对于输入流,必须有单独的线程监听,看是否有数据到达,对于输出流,可以采用一个线程池管理,这样服务端的线程数量至少为n

    下面例子是NIO中采用Channel+线程池方式,有个缺点是不知道SocketChannel是否有数据到达了,必须迭代所有的SocketChannel,如果有数据到达,有就处理,否则就跳过,效率太低

    public class TimeServer {
        private  BlockingQueue<SocketChannel> idleQueue =new LinkedBlockingQueue<SocketChannel>();
        private  BlockingQueue<Future<SocketChannel>> workingQueue=new LinkedBlockingQueue<Future<SocketChannel>>();
        private  ExecutorService executor = Executors.newSingleThreadExecutor();
       
         {
            new Thread(){
                @Override
                public void run() {
                try {
                   while (true) {
                            //task1:迭代当前idleQueue中的SocketChannel,提交到线程池中执行任务,并将其移到workingQueue中
                            for (int i = 0; i < idleQueue.size(); i++) {
                                SocketChannel socketChannel = idleQueue.poll();
                                if (socketChannel != null) {
                                    Future<SocketChannel> result = executor.submit(new TimeServerHandleTask(socketChannel), socketChannel);
                                    workingQueue.put(result);
                                }
                            }
                            //task2:迭代当前workingQueue中的SocketChannel,如果任务执行完成,将其移到idleQueue中
                            for (int i = 0; i < workingQueue.size(); i++) {
                                Future<SocketChannel> future = workingQueue.poll();
                                if (!future.isDone()){
                                    workingQueue.put(future);
                                    continue;
                                }
                                 SocketChannel channel  = null;
                                 channel = future.get();
                                 idleQueue.put(channel);
                            }
                        }
                } catch (Exception e) {
                   e.printStackTrace();
                }
                }
            }.start();
        }
         
         /**
          * 1、在main线程中,当接受到一个新的连接时,我们将相应的SocketChannel放入idleQueue
          * 
          * 
          */
         
        public static void main(String[] args) throws IOException, InterruptedException {
            TimeServer timeServer = new TimeServer();
            ServerSocketChannel ssc=ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.socket().bind(new InetSocketAddress(8080));
            while (true){
                SocketChannel socketChannel = ssc.accept();
                if(socketChannel==null){
                    continue;
                }else{
                    socketChannel.configureBlocking(false);
                    timeServer.idleQueue.add(socketChannel);
                }
            }
        }
    }
    public class TimeServerHandleTask implements Runnable {
         SocketChannel socketChannel;
           public TimeServerHandleTask(SocketChannel socketChannel) {
              this.socketChannel = socketChannel;
           }
           @Override
           public void run() {
              try {
                 ByteBuffer requestBuffer = ByteBuffer.allocate("GET CURRENT TIME".length());
                 //尝试读取数据,因为是非阻塞,所以如果没有数据会立即返回。
                 int bytesRead = socketChannel.read(requestBuffer);
                //如果没有读取到数据,说明当前SocketChannel并没有接受到数据,不需要处理
                 if (bytesRead <= 0) {
                    return;
                 }
                 //如果读取到了数据,则需要考虑粘包、解包问题,这个while代码是为了读取一个完整的请求信息"GET CURRENT TIME",
                 while (requestBuffer.hasRemaining()) {
                    socketChannel.read(requestBuffer);
                 }
                 String requestStr = new String(requestBuffer.array());
                 if (!"GET CURRENT TIME".equals(requestStr)) {
                    String bad_request = "BAD_REQUEST";
                    ByteBuffer responseBuffer = ByteBuffer.allocate(bad_request.length());
                        responseBuffer.put(bad_request.getBytes());
                        responseBuffer.flip();
                        socketChannel.write(responseBuffer);
                 } else {
                        String timeStr = Calendar.getInstance().getTime().toLocaleString();
                        ByteBuffer responseBuffer = ByteBuffer.allocate(timeStr.length());
                        responseBuffer.put(timeStr.getBytes());
                        responseBuffer.flip();
                        socketChannel.write(responseBuffer);
                 }
              } catch (Exception e) {
                 throw new RuntimeException(e);
              }
           }
        
    }
    public class TimeClient {
    
        // 连接超时时间
        static int connectTimeOut = 3000;
        static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    
        public static void main(String[] args) throws IOException, InterruptedException {
            SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080));
            socketChannel.configureBlocking(false);
            long start = System.currentTimeMillis();
            while (!socketChannel.finishConnect()) {
                if (System.currentTimeMillis() - start >= connectTimeOut) {
                    throw new RuntimeException("尝试建立连接超过3秒");
                }
            }
            // 如果走到这一步,说明连接建立成功
            while (true) {
                buffer.put("GET CURRENT TIME".getBytes());
                buffer.flip();
                socketChannel.write(buffer);
                buffer.clear();
                if (socketChannel.read(buffer) > 0) {
                    buffer.flip();
                    byte[] response = new byte[buffer.remaining()];
                    buffer.get(response);
                    System.out.println("reveive response:" + new String(response));
                    buffer.clear();
                }
                Thread.sleep(5000);
            }
    
        }
    }

     Selector   只用一个线程池

    public class TimeServer {
    
        private static ExecutorService   executor;
        
        static {
            executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
        }
    
        public static void main(String[] args) throws IOException {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress(8080));
            ssc.configureBlocking(false);
            Selector selector = Selector.open();
            ssc.register(selector, ssc.validOps());
    
            while (true) {
                int readyCount = selector.select(1000);
                if (readyCount == 0) {
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey selectionKey = keyIterator.next();
                    if (selectionKey.isValid()) {
                        // 表示ServerSocketChannel
                        if (selectionKey.isAcceptable()) {
                            ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                            SocketChannel socketChannel = server.accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        }
    
                        // 表示SocketChannel
                        if (selectionKey.isReadable()) {
                            executor.submit(new TimeServerTask(selectionKey));
                        }
                        keyIterator.remove();
                    }
                }
            }
        }
    }
    public class TimeServerTask implements Runnable{
          private SelectionKey selectionKey;
          
            public TimeServerTask(SelectionKey selectionKey) {
                this.selectionKey = selectionKey;
            }
         
            @Override
            public void run() {
                SocketChannel channel = (SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                try {
                    int count =0;
                    while ((count = channel.read(byteBuffer)) > 0) {
                        byteBuffer.flip();
                        byte[] request=new byte[byteBuffer.remaining()];
                        byteBuffer.get(request);
                        String requestStr=new String(request);
                        byteBuffer.clear();
                        if (!"GET CURRENT TIME".equals(requestStr)) {
                            channel.write(byteBuffer.put("BAD_REQUEST".getBytes()));
                        } else {
                            byteBuffer.put(Calendar.getInstance().getTime().toLocaleString().getBytes());
                            byteBuffer.flip();
                            channel.write(byteBuffer);
                        }
                    }
         
                } catch (IOException e) {
                    e.printStackTrace();
                    selectionKey.cancel();
                }
            }
    }
  • 相关阅读:
    稳扎稳打Silverlight(13) 2.0交互之鼠标事件和键盘事件
    稳扎稳打Silverlight(17) 2.0数据之详解DataGrid, 绑定数据到ListBox
    再接再厉VS 2008 sp1 + .NET 3.5 sp1(2) Entity Framework(实体框架)之详解 Linq To Entities 之一
    稳扎稳打Silverlight(8) 2.0图形之基类System.Windows.Shapes.Shape
    稳扎稳打Silverlight(11) 2.0动画之ColorAnimation, DoubleAnimation, PointAnimation, 内插关键帧动画
    稳扎稳打Silverlight(21) 2.0通信之WebRequest和WebResponse, 对指定的URI发出请求以及接收响应
    稳扎稳打Silverlight(16) 2.0数据之独立存储(Isolated Storage)
    稳扎稳打Silverlight(9) 2.0画笔之SolidColorBrush, ImageBrush, VideoBrush, LinearGradientBrush, RadialGradientBrush
    稳扎稳打Silverlight(23) 2.0通信之调用WCF的双向通信(Duplex Service)
    游戏人生Silverlight(1) 七彩俄罗斯方块[Silverlight 2.0(c#)]
  • 原文地址:https://www.cnblogs.com/moris5013/p/11350474.html
Copyright © 2011-2022 走看看