zoukankan      html  css  js  c++  java
  • 从BIO到Netty

    想要进行节点与节点间、客户端与服务器端、进程与进程间的通信,需要通过网络IO进行。JAVA通过Socket封装TCP/IP网络协议,进行网络之间的数据传输。

    1.首先来回顾一下ISO七层模型和TCP/IP四层协议

     

    2.了解一下JAVA通过Socket进行网络通信的流程

    本地的进程通过PID标识唯一进程号,网络中通过协议+ip地址+端口号标识一个进程号,如:http://10.10.10.10:8080,网络间的进程被唯一标识后,可以进行网络通信。Socket将TCP/IP复杂的操作,封装为简单的接口供应用层使用,实现网络通信。就像文件IO的“打开—读写—关闭”一样,网络间的通信也变成客户端、服务器端可以给自己的”文件“写入内容,供对方读取,通信结束时关闭“文件”。

    Socket通信流程:

    服务端accept()方法会产生阻塞,等待客户端连接。

    3.JAVA中的ServerSocket、Socket类

    ServerSocket类:                                                        Socket类:

       

    4.简单的传统BIO通信例子(同步阻塞一问一答式)

    服务端代码

    package service;
    
    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class BIOServerTest {
    
    
        public static void main(String[] args) throws IOException {
            new BIOServerTest().bind();
        }
    
        public void bind() throws IOException {
            ServerSocket server = null;
            try {
                server = new ServerSocket(8000);
                Socket socket = null;
                while (true) {
                    socket = server.accept();
                    new Thread(new BIOServerHandler(socket)).start();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (server != null) {
                    server.close();
                }
            }
        }
    
        public class BIOServerHandler implements Runnable {
            private Socket socket;
    
            public BIOServerHandler(Socket socket) {
                this.socket = socket;
            }
    
            public void run() {
                BufferedReader in = null;
                PrintWriter out = null;
                try {
                    in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
                    out = new PrintWriter(this.socket.getOutputStream(), true);
                    String body = null;
                    while (true) {
                        body = in.readLine();
                        if (body == null) {
                            break;
                        }
                        System.out.println("client request " + body);
                        out.println("ok");
                    }
                } catch (Exception e) {
                    if (in != null) {
                        try {
                            in.close();
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                    if (out != null) {
                        out.close();
                    }
                    if (socket!=null){
                        try {
                            socket.close();
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    View Code

    客户端代码

    package service;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
    
    public class BIOClientTest {
        public static void main(String[] args) {
            Socket socket = null;
            BufferedReader in = null;
            PrintWriter out = null;
            try {
                socket = new Socket("127.0.0.1", 8000);
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                out = new PrintWriter(socket.getOutputStream(), true);
                out.println("hello");
                String resp = in.readLine();
                System.out.println("server response " + resp);
            } catch (Exception e) {
    
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (out != null) {
                    out.close();
                }
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }
    View Code

    传统的同步阻塞BIO(Blocking IO)的处理过程中,每有一个客户端的请求,服务端就需要启动一个线程与之对接,监听数据传输。成千上百的客户端访问,在服务端就需要启动成千上百个线程,每一个线程在监听不到数据的时候,又不能得到充分的利用,这种模型显然无法满足高性能、高并发的场景。

    5.NIO

    JDK1.4引入一个新的库NIO,有人称之为New IO,有人称之为Non-blocking IO,它出现的目的就是要解决阻塞问题,实现非阻塞网络通信,我们就暂且称之为非阻塞IO吧。

    在上面的BIO中,数据是面向字节传输,NIO则是面向块传输的,就好比吃米饭,一粒一粒吃,肯定没有一勺一勺吃有感觉,又快又爽,此时它便是在java中提供的高速的、面向块的IO。

    下面来了解NIO传输的相关概念:

    缓冲区buffer

    在面向流的IO中,是将数据写入或读取至Stream对象中(InputStream/OutputStream)。在NIO中,服务端和客户端都是通过访问缓冲区读取、写入来操作数据。每一种java基本类型都对应一种缓冲区如:ByteBuffer、IntBuffer等。每一个Buffer类都是Buffer接口的子类。

    通道Channel

    Channel是一个通道,网络数据通过Channel读取和写入。Channel不像流,写入要用InputStream,读取要用OutputStream,Channel是双向的,所以既可以用来读,也可以用来写。

    多路复用器Selector

    多路复用器提供选择已经就绪的任务的能力,相当于雇佣一个服务员来对接多个用餐的客户,Selector会不断轮询注册在其上的Channel,如果某个Channel发生读写事件,这个Channel便处于就绪状态,会被Selector轮询处来,通过SelectorKey获取就绪的Channel后,进行后续的IO操作。在BIO中客户端每有一个客户请求,服务端就需要一个服务员来对接,一个顾客,一个服务员。而NIO用多路复用器之后,就可以一个服务端的线程,服务多个客户端请求,实现高可用。

    关键步骤:

    (1)创建ServerSocketChannel,配置为非阻塞模式
    (2)绑定监听,配置TCP参数
    (3)创建独立IO线程,用于轮询多路复用器Selector
    (4)创建Selector,将(1)中的serverSocketChannel注册在selector上,并监听selectorKey.ACCEPT
    (5)启动IO线程,循环执行Selector.select()方法,轮询就绪的Channel
    (6)轮询到就绪的channel时,如果是新的客户端接入,调用ServerSocketChannel.accept()方法,接受新的客户端。
    (7)将新接入的客户端SocketChannel设为非阻塞模式,配置TCP参数
    (8)SocketChannel注册到Selector,监听OP_READ操作位
    (9)如果轮询到Channel为OP_READ,则读取数据
    (10)如果为OP_WRITE,说明数据还在发送。
    View Code

     NIO原生接口复杂,不易编写,不好维护,感兴趣的同学可以去了解下。

    6.Netty

    Netty是NIO框架之一,接口简洁,入门简单,有好的稳定性、健壮性、可扩展性,已经得到了较为广泛的使用。

    Netty 提供基于NIO的Server/Client网络应用框架,服务端启动后,就开始等待浏览器(httpRequest请求)或者客户端的请求(服务器绑定端口监听)。浏览器输入ip:port后,server得到请求,经过serverHandler处理后,返回给浏览器或者客户端。
    7.Netty的使用

    a.maven工程添加netty依赖

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.33.Final</version>
            </dependency>
    View Code

    b.基于netty的服务端开发:启动,添加线程组,添加过滤器,绑定端口,监听,释放资源

    package service;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class NettyServerTest {
        public static void main(String[] args) {
            new NettyServerTest().bind();
        }
        public void bind() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).
                    channel(NioServerSocketChannel.class).
                    option(ChannelOption.SO_BACKLOG, 1024).
                    childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });
            try {
                ChannelFuture f = bootstrap.bind(8000).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public class ServerHandler extends ChannelHandlerAdapter {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf byteBuffer = (ByteBuf) msg;
                byte[] req = new byte[byteBuffer.readableBytes()];
                byteBuffer.readBytes(req);
                String body = new String(req, "UTF-8");
                System.out.println("request body:" + body);
                ByteBuf resp = Unpooled.copiedBuffer(new String("ok").getBytes());
                ctx.write(resp);
            }
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                ctx.flush();
            }
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                super.exceptionCaught(ctx, cause);
            }
        }
    }
    View Code

    c.基于netty的客户端开发:启动,添加线程组,添加过滤器,建立连接,监听,释放资源

    package service;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClientTest {
        public static void main(String[] args) {
            new NettyClientTest().connect();
        }
    
        public void connect() {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ClientHandler());
                        }
                    });
            try {
                ChannelFuture f = bootstrap.connect("127.0.0.0", 8000).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        private class ClientHandler extends ChannelHandlerAdapter {
            ByteBuf message;
    
            public ClientHandler() {
                byte[] req = "hello".getBytes();
                message = Unpooled.buffer(req.length);
                message.writeBytes(req);
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                ctx.writeAndFlush(message);
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf byteBuf = (ByteBuf) msg;
                byte[] resp = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(resp);
                String body = new String(resp, "UTF-8");
                System.out.println("server response " + body);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                super.exceptionCaught(ctx, cause);
            }
        }
    }
    View Code
  • 相关阅读:
    asp.net留言板项目源代码下载
    HoverTree项目添加了查看留言列表功能
    HoverTree开源项目已经实现管理员登录
    HoverTree项目已经实现分层
    c# 连接Mysql数据库
    单行文字滚动就用myslider
    C#播放MP3源代码
    PHP 判断是否为 AJAX 请求
    c# TCP Socket通讯基础
    javascript类型注意事项
  • 原文地址:https://www.cnblogs.com/zhxdxf/p/10340791.html
Copyright © 2011-2022 走看看