zoukankan      html  css  js  c++  java
  • 2 你的第一款Netty应用程序

     

    BIO:同步阻塞IO
    NIO:同步非阻塞IO
    AIO:异步非阻塞IO
    先弄清楚同步、异步,阻塞、非阻塞概念。
    
    io操作分为两部分,发起io请求,和io数据读写阻塞、非阻塞主要是针对线程发起io请求后,是否立即返回来定义的,立即返回称为非阻塞io,否则称为阻塞io。
    
    同步、异步主要针对io数据读写来定义的,读写数据过程中不阻塞线程称为异步io,否则,称为同步io。

    参考文章:https://my.oschina.net/ljhlgj/blog/1811319   这篇文章写得很详细

    同样是监听3333端口,这里结合IO,NIO,对netty进行对比:

    IO处理如下:

    public class IOServer {
        public static void main(String[] args) throws IOException {
            // TODO 服务端处理客户端连接请求
            ServerSocket serverSocket = new ServerSocket(3333);
    
            // 接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理
            new Thread(() -> {
              while (true) {
                try {
                  // 阻塞方法获取新的连接
                  Socket socket = serverSocket.accept();
    
                  // 每一个新的连接都创建一个线程,负责读取数据
                  new Thread(() -> {
                    try {
                      int len;
                      byte[] data = new byte[1024];
                      InputStream inputStream = socket.getInputStream();
                      // 按字节流方式读取数据
                      while ((len = inputStream.read(data)) != -1) {
                        System.out.println(new String(data, 0, len));
                      }
                    } catch (IOException e) {
                    }
                  }).start();
    
                } catch (IOException e) {
                }
    
              }
            }).start();
    
          }
    }

    NIO处理如下:

    public class NIOServer {
        
        public static void main(String[] args) throws IOException {
            // 1. serverSelector负责轮询是否有新的连接,服务端监测到新的连接之后,不再创建一个新的线程,
            // 而是直接将新连接绑定到clientSelector上,这样就不用 IO 模型中 1w 个 while 循环在死等
            Selector serverSelector = Selector.open();
            // 2. clientSelector负责轮询连接是否有数据可读
            Selector clientSelector = Selector.open();
            try {
                // 对应IO编程中服务端启动
                ServerSocketChannel listenerChannel = ServerSocketChannel.open();
                listenerChannel.socket().bind(new InetSocketAddress(3333));
                listenerChannel.configureBlocking(false);
                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
                while (true) {
                    // 监测是否有新的连接,这里的1指的是阻塞的时间为 1ms
                    if (serverSelector.select(1) > 0) {
                        Set<SelectionKey> set = serverSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            if (key.isAcceptable()) {
                                try {
                                    // (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
                                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                                    clientChannel.configureBlocking(false);
                                    clientChannel.register(clientSelector, SelectionKey.OP_READ);
                                } finally {
                                    keyIterator.remove();
                                }
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            new Thread(() -> {
                try {
                    while (true) {
                        // (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为 1ms
                        if (clientSelector.select(1) > 0) {
                            Set<SelectionKey> set = clientSelector.selectedKeys();
                            Iterator<SelectionKey> keyIterator = set.iterator();
                            while (keyIterator.hasNext()) {
                                SelectionKey key = keyIterator.next();
                                if (key.isReadable()) {
                                    try {
                                        SocketChannel clientChannel = (SocketChannel) key.channel();
                                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                        // (3) 面向 Buffer
                                        clientChannel.read(byteBuffer);
                                        byteBuffer.flip();
                                        System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());
                                    } finally {
                                        keyIterator.remove();
                                        key.interestOps(SelectionKey.OP_READ);
                                    }
                                }
    
                            }
                        }
                    }
                } catch (IOException ignored) {
                }
            }).start();
        }
    }

    Netty如下:

    public class EchoServer {
      private final int port;
    
      public EchoServer(int port) {
        this.port = port;
      }
    
      public static void main(String[] args)
          throws Exception {
        if (args.length != 1) {
          System.err.println("Usage: " + EchoServer.class.getSimpleName() +
              " <port>"
          );
          return;
        }
        //设置端口值(如果端口参数的格式不正确,则抛出一个NumberFormatException)
        int port = Integer.parseInt(args[0]);
        //调用服务器的 start()方法
        new EchoServer(port).start();
      }
    
      public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //(1) 创建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
          //(2) 创建ServerBootstrap
          ServerBootstrap b = new ServerBootstrap();
          b.group(group)
              //(3) 指定所使用的 NIO 传输 Channel
              .channel(NioServerSocketChannel.class)
              //(4) 使用指定的端口设置套接字地址
              .localAddress(new InetSocketAddress(port))
              //(5) 添加一个EchoServerHandler到于Channel的 ChannelPipeline
              .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                  //EchoServerHandler 被标注为@Shareable,所以我们可以总是使用同样的实例
                  //这里对于所有的客户端连接来说,都会使用同一个 EchoServerHandler,因为其被标注为@Sharable,
                  //这将在后面的章节中讲到。
                  ch.pipeline().addLast(serverHandler);
                }
              });
          //(6) 异步地绑定服务器;调用 sync()方法阻塞等待直到绑定完成
          ChannelFuture f = b.bind().sync();
          System.out.println(EchoServer.class.getName() +
              " started and listening for connections on " + f.channel().localAddress());
          //(7) 获取 Channel 的CloseFuture,并且阻塞当前线程直到它完成
          f.channel().closeFuture().sync();
        } finally {
          //(8) 关闭 EventLoopGroup,释放所有的资源
          group.shutdownGracefully().sync();
        }
      }
    }
    //标示一个ChannelHandler可以被多个 Channel 安全地共享
    @Sharable
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        //将消息记录到控制台
        System.out.println(
            "Server received: " + in.toString(CharsetUtil.UTF_8));
        //将接收到的消息写给发送者,而不冲刷出站消息
        ctx.write(in);
      }
    
      @Override
      public void channelReadComplete(ChannelHandlerContext ctx)
          throws Exception {
        //将未决消息冲刷到远程节点,并且关闭该 Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
            .addListener(ChannelFutureListener.CLOSE);
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx,
                                  Throwable cause) {
        //打印异常栈跟踪
        cause.printStackTrace();
        //关闭该Channel
        ctx.close();
      }
    }

    比较一下:

    BIO的server:

    1、ServerSocket serverSocket = new ServerSocket(3333);//绑定监听端口

    2、Socket socket = serverSocket.accept(); // 阻塞方法获取新的连接,这里是main线程执行的代码,所以把main线程阻塞了,因此是阻塞IO

    3、InputStream inputStream = socket.getInputStream(); //获取输入流,数据从内核写入socket空间

    NIO的server:
    1、新建一个clientSelector和一个serverSelector

    Selector clientSelector = Selector.open();

    Selector serverSelector = Selector.open();

    2、新建一个ServerSocketChannel(类似于BIO里的serverSocket),并注册到serverSelector中。

    ServerSocketChannel listenerChannel = ServerSocketChannel.open(); listenerChannel.socket().bind(new InetSocketAddress(3333));

    listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

    3、serverSelector中有新连接时,从serverSelector中获取socketChannel(类似BIO的第2步)并组册到clientSelector中

    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
    clientChannel.register(clientSelector, SelectionKey.OP_READ);

    4、clientSelcetor中有新连接时,处理该连接。//此处理方法用不用多线程,代表了是否异步。

    if (clientSelector.select(1) > 0){}

    Netty的server:

    1、ServerBootstrap b = new ServerBootstrap();//(2) 创建ServerBootstrap

    b.group(new NioEventLoopGroup())//这个类似于NIO里的serverSelector
    .channel(NioServerSocketChannel.class)//(3) 类似NIO里的serverSocketChannel

    .localAddress(new InetSocketAddress(port))//(4) 类似NIO里为serverSocketChannel设置监听端口

    .childHandler(new ChannelInitializer<SocketChannel>() {})//(5) 添加一个EchoServerHandler到于Channel的 ChannelPipeline ,类似NIO里的clientSelector

    2、EchoServerHandler 中就是对clientSelector中事件的处理。

  • 相关阅读:
    table
    html <input>
    html基本结构
    Spark join连接
    combineByKey
    scala mkstring
    countByValue
    spark aggregate
    scala flatmap、reduceByKey、groupByKey
    生态圈安装
  • 原文地址:https://www.cnblogs.com/lakeslove/p/13055951.html
Copyright © 2011-2022 走看看