zoukankan      html  css  js  c++  java
  • java 网络1.为什么要非阻塞式?

    先看看多种java通信方式的特点

    1.阻塞式

       服务器收到了一个连接,当通信完毕后释放连接,接收新的连接

    2.阻塞式+线程池

       可利用java库类中现成的线程池,做起来比较简单

    为每个用户分配一个线程

    缺点:虽然可以通过线程池限制线程数防止服务器效率过低,但是线程调度毕竟要消耗资源

    3.非阻塞式 用A,B方式

       A 用1个线程

       当用一个线程时,采用轮询的方式。看连接,可读,可写3个事件有谁就绪了,就处理谁。例如:如果一个线程连接就绪了,但是可读没有就绪,此时是接受新的连接任务的。

        对比1(阻塞式)

        不用像阻塞式一样把一个连接的3个事件(连接,可读,可写就绪)处理完了,才能处理下一个连接

        对比2(阻塞式+线程池)

        少了线程调度的开销,但是效率不一定比2(阻塞式+线程池)高。所以有了下面的连接方式

    B 用2个线程

    采用一个线程接收客户连接就绪,一个处理(可读,可写就绪)

        对比 A (用1个线程的阻塞式)

        当数据处理的时间较大时,现在可以一边处理数据,一边接受新的连接。 

        少了一次对连接就绪的轮询

        对比2(阻塞式+线程池)

        B只用了两个线程,减小了线程调度开销

    以下代码出自 孙卫琴《java 网络编程精解》

    方式一 阻塞式

    package block;

    import java.io.*;

    import java.nio.*;

    import java.nio.channels.*;

    import java.nio.charset.*;

    import java.net.*;

    import java.util.*;

    import java.util.concurrent.*;

    public class EchoServer {

      private int port=8000;

      private ServerSocketChannel serverSocketChannel = null;

      private ExecutorService executorService;

      private static final int POOL_MULTIPLE = 4;

      public EchoServer() throws IOException {

        executorService= Executors.newFixedThreadPool(

         Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);

        serverSocketChannel= ServerSocketChannel.open();

        serverSocketChannel.socket().setReuseAddress(true);

        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        System.out.println("服务器启动");

      }

      public void service() {

        while (true) {

          SocketChannel socketChannel=null;

          try {

            socketChannel = serverSocketChannel.accept();

            executorService.execute(new Handler(socketChannel));

          }catch (IOException e) {

             e.printStackTrace();

          }

        }

      }

      public static void main(String args[])throws IOException {

        new EchoServer().service();

      }

    }

    class Handler implements Runnable{

      private SocketChannel socketChannel;

      public Handler(SocketChannel socketChannel){

        this.socketChannel=socketChannel;

      }

      public void run(){

        handle(socketChannel);

      }

      public void handle(SocketChannel socketChannel){

        try {

            Socket socket=socketChannel.socket();

            System.out.println("接收到客户连接,来自: " +

            socket.getInetAddress() + ":" +socket.getPort());

            BufferedReader br =getReader(socket);

            PrintWriter pw = getWriter(socket);

            String msg = null;

            while ((msg = br.readLine()) != null) {

              System.out.println(msg);

              pw.println(echo(msg));

              if (msg.equals("bye"))

                break;

            }

          }catch (IOException e) {

             e.printStackTrace();

          }finally {

             try{

               if(socketChannel!=null)socketChannel.close();

             }catch (IOException e) {e.printStackTrace();}

          }

      }

      private PrintWriter getWriter(Socket socket)throws IOException{

        OutputStream socketOut = socket.getOutputStream();

        return new PrintWriter(socketOut,true);

      }

      private BufferedReader getReader(Socket socket)throws IOException{

        InputStream socketIn = socket.getInputStream();

        return new BufferedReader(new InputStreamReader(socketIn));

      }

      public String echo(String msg) {

        return "echo:" + msg;

      }

    }

    2.阻塞式+线程池

    package multithread4;

    import java.io.*;

    import java.net.*;

    import java.util.concurrent.*;

    public class EchoServer {

      private int port=8000;

      private ServerSocket serverSocket;

      private ExecutorService executorService; //线程池

      private final int POOL_SIZE=4;  //单个CPU时线程池中工作线程的数目

      

      private int portForShutdown=8001;  //用于监听关闭服务器命令的端口

      private ServerSocket serverSocketForShutdown;

      private boolean isShutdown=false; //服务器是否已经关闭

      private Thread shutdownThread=new Thread(){   //负责关闭服务器的线程

        public void start(){

          this.setDaemon(true);  //设置为守护线程(也称为后台线程)

          super.start();

        }

        public void run(){

          while (!isShutdown) {

            Socket socketForShutdown=null;

            try {

              socketForShutdown= serverSocketForShutdown.accept();

              BufferedReader br = new BufferedReader(

                                new InputStreamReader(socketForShutdown.getInputStream()));

              String command=br.readLine();

             if(command.equals("shutdown")){

                long beginTime=System.currentTimeMillis(); 

                socketForShutdown.getOutputStream().write("服务器正在关闭\r\n".getBytes());

                isShutdown=true;

                //请求关闭线程池

    //线程池不再接收新的任务,但是会继续执行完工作队列中现有的任务

                executorService.shutdown();  

                

                //等待关闭线程池,每次等待的超时时间为30秒

                while(!executorService.isTerminated())

                  executorService.awaitTermination(30,TimeUnit.SECONDS); 

                

                serverSocket.close(); //关闭与EchoClient客户通信的ServerSocket 

                long endTime=System.currentTimeMillis(); 

                socketForShutdown.getOutputStream().write(("服务器已经关闭,"+

                    "关闭服务器用了"+(endTime-beginTime)+"毫秒\r\n").getBytes());

                socketForShutdown.close();

                serverSocketForShutdown.close();

                

              }else{

                socketForShutdown.getOutputStream().write("错误的命令\r\n".getBytes());

                socketForShutdown.close();

              }  

            }catch (Exception e) {

               e.printStackTrace();

            } 

          } 

        }

      };

      public EchoServer() throws IOException {

        serverSocket = new ServerSocket(port);

        serverSocket.setSoTimeout(60000); //设定等待客户连接的超过时间为60秒

        serverSocketForShutdown = new ServerSocket(portForShutdown);

        //创建线程池

        executorService= Executors.newFixedThreadPool( 

         Runtime.getRuntime().availableProcessors() * POOL_SIZE);

        

        shutdownThread.start(); //启动负责关闭服务器的线程

        System.out.println("服务器启动");

      }

      

      public void service() {

        while (!isShutdown) {

          Socket socket=null;

          try {

            socket = serverSocket.accept();  //可能会抛出SocketTimeoutException和SocketException

            socket.setSoTimeout(60000);  //把等待客户发送数据的超时时间设为60秒          

            executorService.execute(new Handler(socket));  //可能会抛出RejectedExecutionException

          }catch(SocketTimeoutException e){

             //不必处理等待客户连接时出现的超时异常

          }catch(RejectedExecutionException e){

             try{

               if(socket!=null)socket.close();

             }catch(IOException x){}

             return;

          }catch(SocketException e) {

             //如果是由于在执行serverSocket.accept()方法时,

             //ServerSocket被ShutdownThread线程关闭而导致的异常,就退出service()方法

             if(e.getMessage().indexOf("socket closed")!=-1)return;

           }catch(IOException e) {

             e.printStackTrace();

          }

        }

      }

      public static void main(String args[])throws IOException {

        new EchoServer().service();

      }

    }

    class Handler implements Runnable{

      private Socket socket;

      public Handler(Socket socket){

        this.socket=socket;

      }

      private PrintWriter getWriter(Socket socket)throws IOException{

        OutputStream socketOut = socket.getOutputStream();

        return new PrintWriter(socketOut,true);

      }

      private BufferedReader getReader(Socket socket)throws IOException{

        InputStream socketIn = socket.getInputStream();

        return new BufferedReader(new InputStreamReader(socketIn));

      }

      public String echo(String msg) {

        return "echo:" + msg;

      }

      public void run(){

        try {

          System.out.println("New connection accepted " +

          socket.getInetAddress() + ":" +socket.getPort());

          BufferedReader br =getReader(socket);

          PrintWriter pw = getWriter(socket);

          String msg = null;

          while ((msg = br.readLine()) != null) {

            System.out.println(msg);

            pw.println(echo(msg));

            if (msg.equals("bye"))

              break;

          }

        }catch (IOException e) {

           e.printStackTrace();

        }finally {

           try{

             if(socket!=null)socket.close();

           }catch (IOException e) {e.printStackTrace();}

        }

      }

    }

    3. A 非阻塞式用1个线程

    package nonblock;

    import java.io.*;

    import java.nio.*;

    import java.nio.channels.*;

    import java.nio.charset.*;

    import java.net.*;

    import java.util.*;

    public class EchoServer{

      private Selector selector = null;

      private ServerSocketChannel serverSocketChannel = null;

      private int port = 8000;

      private Charset charset=Charset.forName("GBK");

      public EchoServer()throws IOException{

        selector = Selector.open();

        serverSocketChannel= ServerSocketChannel.open();

        serverSocketChannel.socket().setReuseAddress(true);

        serverSocketChannel.configureBlocking(false);

        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        System.out.println("服务器启动");

      }

      public void service() throws IOException{

        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT );

        while (selector.select() > 0 ){

          Set readyKeys = selector.selectedKeys();

          Iterator it = readyKeys.iterator();

          while (it.hasNext()){

             SelectionKey key=null;

             try{

                key = (SelectionKey) it.next();

                it.remove();

                if (key.isAcceptable()) {

                  ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                  SocketChannel socketChannel = (SocketChannel) ssc.accept();

                  System.out.println("接收到客户连接,来自:" +

                                     socketChannel.socket().getInetAddress() +

                                     ":" + socketChannel.socket().getPort());

                  socketChannel.configureBlocking(false);

                  ByteBuffer buffer = ByteBuffer.allocate(1024);

                  socketChannel.register(selector,

                                         SelectionKey.OP_READ |

                                         SelectionKey.OP_WRITE, buffer);

                }

                if (key.isReadable()) {

                    receive(key);

                }

                if (key.isWritable()) {

                    send(key);

                }

            }catch(IOException e){

               e.printStackTrace();

               try{

                   if(key!=null){

                       key.cancel();

                       key.channel().close();

                   }

               }catch(Exception ex){e.printStackTrace();}

            }

          }//#while

        }//#while

      }

      public void send(SelectionKey key)throws IOException{

        ByteBuffer buffer=(ByteBuffer)key.attachment();

        SocketChannel socketChannel=(SocketChannel)key.channel();

        buffer.flip();  //把极限设为位置,把位置设为0

        String data=decode(buffer);

        if(data.indexOf("\r\n")==-1)return;

        String outputData=data.substring(0,data.indexOf("\n")+1);

        System.out.print(outputData);

        ByteBuffer outputBuffer=encode("echo:"+outputData);

        while(outputBuffer.hasRemaining())

          socketChannel.write(outputBuffer);

        ByteBuffer temp=encode(outputData);

        buffer.position(temp.limit());

        buffer.compact();

        if(outputData.equals("bye\r\n")){

          key.cancel();

          socketChannel.close();

          System.out.println("关闭与客户的连接");

        }

      }

      public void receive(SelectionKey key)throws IOException{

        ByteBuffer buffer=(ByteBuffer)key.attachment();

        SocketChannel socketChannel=(SocketChannel)key.channel();

        ByteBuffer readBuff= ByteBuffer.allocate(32);

        socketChannel.read(readBuff);

        readBuff.flip();

        buffer.limit(buffer.capacity());

        buffer.put(readBuff);

      }

      public String decode(ByteBuffer buffer){  //解码

        CharBuffer charBuffer= charset.decode(buffer);

        return charBuffer.toString();

      }

      public ByteBuffer encode(String str){  //编码

        return charset.encode(str);

      }

      public static void main(String args[])throws Exception{

        EchoServer server = new EchoServer();

        server.service();

      }

    }

    3. B 非阻塞式用2个线程

    package thread2;

    import java.io.*;

    import java.nio.*;

    import java.nio.channels.*;

    import java.nio.charset.*;

    import java.net.*;

    import java.util.*;

    public class EchoServer{

      private Selector selector = null;

      private ServerSocketChannel serverSocketChannel = null;

      private int port = 8000;

      private Charset charset=Charset.forName("GBK");

      public EchoServer()throws IOException{

        selector = Selector.open();

        serverSocketChannel= ServerSocketChannel.open();

        serverSocketChannel.socket().setReuseAddress(true);

        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        System.out.println("服务器启动");

      }

      public void accept(){

          for(;;){

            try{

                SocketChannel socketChannel = serverSocketChannel.accept();

                System.out.println("接收到客户连接,来自:" +

                                   socketChannel.socket().getInetAddress() +

                                   ":" + socketChannel.socket().getPort());

                socketChannel.configureBlocking(false);

                ByteBuffer buffer = ByteBuffer.allocate(1024);

                synchronized(gate){

                    selector.wakeup();

                    socketChannel.register(selector,

                                           SelectionKey.OP_READ |

                                           SelectionKey.OP_WRITE, buffer);

                }

            }catch(IOException e){e.printStackTrace();}

          }

      }

      private Object gate=new Object();

      public void service() throws IOException{

        for(;;){

          synchronized(gate){}

          int n = selector.select();

          if(n==0)continue;

          Set readyKeys = selector.selectedKeys();

          Iterator it = readyKeys.iterator();

          while (it.hasNext()){

            SelectionKey key=null;

            try{

                key = (SelectionKey) it.next();

                it.remove();

                if (key.isReadable()) {

                    receive(key);

                }

                if (key.isWritable()) {

                    send(key);

                }

            }catch(IOException e){

               e.printStackTrace();

               try{

                   if(key!=null){

                       key.cancel();

                       key.channel().close();

                   }

               }catch(Exception ex){e.printStackTrace();}

            }

          }//#while

        }//#while

      }

      public void send(SelectionKey key)throws IOException{

        ByteBuffer buffer=(ByteBuffer)key.attachment();

        SocketChannel socketChannel=(SocketChannel)key.channel();

        buffer.flip();  //把极限设为位置

        String data=decode(buffer);

        if(data.indexOf("\n")==-1)return;

        String outputData=data.substring(0,data.indexOf("\n")+1);

        System.out.print(outputData);

        ByteBuffer outputBuffer=encode("echo:"+outputData);

        while(outputBuffer.hasRemaining())

          socketChannel.write(outputBuffer);

        ByteBuffer temp=encode(outputData);

        buffer.position(temp.limit());

        buffer.compact();

        if(outputData.equals("bye\r\n")){

          key.cancel();

          socketChannel.close();

          System.out.println("关闭与客户的连接");

        }

      }

      public void receive(SelectionKey key)throws IOException{

        ByteBuffer buffer=(ByteBuffer)key.attachment();

        SocketChannel socketChannel=(SocketChannel)key.channel();

        ByteBuffer readBuff= ByteBuffer.allocate(32);

        socketChannel.read(readBuff);

        readBuff.flip();

        buffer.limit(buffer.capacity());

        buffer.put(readBuff);

      }

      public String decode(ByteBuffer buffer){  //解码

        CharBuffer charBuffer= charset.decode(buffer);

        return charBuffer.toString();

      }

      public ByteBuffer encode(String str){  //编码

        return charset.encode(str);

      }

      public static void main(String args[])throws Exception{

        final EchoServer server = new EchoServer();

        Thread accept=new Thread(){

            public void run(){

                server.accept();

            }

        };

        accept.start();

        server.service();

      }

    }

  • 相关阅读:
    从尾到头打印链表
    剑指offer
    Codeforces Round #345
    算法入门系列之字符串
    【codenet】代码相似度计算框架调研 -- 把内容与形式分开
    【学习笔记--数据结构】合法的出栈序列与栈混洗
    我的博客即将入驻“云栖社区”,诚邀技术同仁一同入驻。
    【PAT L2-001】最短路计数
    【CF689D Friends and Subsequences】二分搜索,区间查询
    【编译原理】语法分析LL(1)分析法的FIRST和FOLLOW集
  • 原文地址:https://www.cnblogs.com/chenying99/p/2728735.html
Copyright © 2011-2022 走看看