zoukankan      html  css  js  c++  java
  • netty

    BIO

    同步阻塞bio:链接数目较少

    image-20200922101237506

    public static void main(String args[]) throws IOException {
        ExecutorService pool = ThreadPool.getCachedThreadPool();
        ServerSocket socket = new ServerSocket(6666);
        System.out.println("服务器启动....");
        while(true){
            final Socket client = socket.accept();
            System.out.println("this is a client");
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    handler(client);
                }
            });
        }
    }
    public static void handler(Socket socket){
        System.out.print(String.format("线程id: %s 线程name: %s",Thread.currentThread().getId(),Thread.currentThread().getName()));
        byte[] bytes = new byte[1024];
        try (InputStream inputStream = socket.getInputStream()) {
            while(true){
                int read = inputStream.read(bytes);
                if(read!=-1){
                    System.out.print(new String(bytes,0,read));
                }else {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    NIO

    • 同步非阻塞nio:链接数目较多且链接比较短,聊天,弹幕,服务器间通讯

    • 三大组件:

      • channel,

        • 每个channel对应一个buffer
        • 双向,可以返回os底层的情况
        • file channel
        • dataprogram channel
        • file channel
      • buffer,

        • 内存块,底层是数组
        • 数据读取的位置,与bio不同,不使用流,但也可以双向读取(filp方法:反转)
        • 重要属性
          • Capacity:容量大小,不可改变
          • Limit:极限位置标记,可变,默认就是Capacity
          • mark:标记 默认-1
          • Position:下一个要被读或写的位置索引

        buffer的读写都依赖这四个属性,

      • Selector

        • 对应一个线程
        • 对应多个channel
    image-20200922110437831 image-20200922111521881
    String str = "hello world";
    try (FileOutputStream stream = new FileOutputStream("d:/desktop/1.txt")) {
        FileChannel channel = stream.getChannel();
        ByteBuffer buffer1 = ByteBuffer.allocate(1024);
        buffer1.put(str.getBytes());
        buffer1.flip();
        channel.write(buffer1);
        stream.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    

    image-20200922120031552

    文件复制:同一个buffer

    image-20200922120221883

    文件复制:使用通道的transform(),或transto()方法,更方便

    文件直接修改,文件内容不再进入jvm,buffer直接映射到文件

    image-20200922122102802

    通信示例

    public static void server() throws Exception{
        // 创建ServerSocketChannel  ---》ServerSocket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress socketAddress = new InetSocketAddress(7000);
        serverSocketChannel.socket().bind(socketAddress);
        serverSocketChannel.configureBlocking(false);
        // 创建Selector
        Selector selector = Selector.open();
    
        // serverSocketChannel注册到seletcor,事件为OP——ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
        while(true){
            // 没有事件
            if(selector.select(1000) == 0){ // selector 等待1秒钟
                System.out.println("waiting for 1 seconds");
                continue;
            }
            // 有事件
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()){
                // 获取key
                SelectionKey key = keyIterator.next();
                // 获取对应通道
                if(key.isAcceptable()){
                    // 生成socketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    // socketChannel注册到selector,关注事件为OP_read
                    socketChannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
                }
                if (key.isReadable()){
                    // 反向获取到channel
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 会获取到buffer
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    channel.read(buffer);
                    System.out.println(String.format("当前线程:%s-%s,form客户端:%s",Thread.currentThread().getId(),Thread.currentThread().getName(),new String(buffer.array())));
                }
                // 删除selectionkey,防止重复操作
                keyIterator.remove();
            }
        }
    }
    public static void client() throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        // 未连接上或正在连接
        if(!socketChannel.connect(new InetSocketAddress("127.0.0.1",7000))){
            while(!socketChannel.finishConnect()){
                System.out.println("正在连接,但客户端没有阻塞");
            }
        }
        // 已连接
        String str = "hello world";
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        // 发送数据
        socketChannel.write(buffer);
    }
    

    image-20200922140108426

    image-20200922101211665

    零拷贝:没有cpu copy或copy 的信息很少

    示例:服务器读取文件,发送给客户

    • 传统:用户---》内核---》用户---》内核,三次状态切换,四次copy

      • 硬件--》内核:dma copy
        • 用户态--》内核态
      • 内核---》用户:cpu copy
        • 内核态--》用户态
      • 用户---》socket: cpu copy
        • 用户态---》内核态
      • socket---》硬件协议:dma copy
    • mmap:内存映射,将用户内存映射到内核

      • 还是3次切换,但只有3次copy
      • 硬件--》内核:dma copy
        • 用户态--》内核态
        • 此时内核数据用户共享,但此时程序获取数据会进行状态切换,所以还是3次状态切换
      • 内核---》socket: cpu copy
        • 用户态---》内核态
      • socket---》硬件协议:dma copy

    sendFile:2次切换,3次copy linux2.1

    • 硬件--》内核:dma copy
      • 用户态--》内核态
    • 内核---》socket:cup copy
      • 内核态---》用户态
    • socket---》硬件协议:dma copy

    sendFile:2次切换,2次copy linux2.4

    • 硬件--》内核:dma copy

      • 用户态--》内核态

      • 内核态---》用户态

    • 内核(socket)---》硬件协议:dma copy

    AIO

    异步非阻塞aio:链接数目多,并且链接时间长

    image-20200922153711739

    Netty

    • 用于基于nio的数据传输(大数据,小数据都可)框架
    • 自己用nio写,还是比较偏底层的,比较麻烦
    • 简化nio的开发流程
    • tcp/upd(传输协议)----》nio(基于传输协议的api),netty------》用用协议(http,websocket,ssl等等)
    传统网络I/O服务模型

    image-20200922155132004

    REACTOR模型
    • 基于I/O多路复用
    • 基于线程池线程复用

    image-20200922160119610

    单Rector单线程

    image-20200922160638095

    单Rector多线程

    image-20200922160845862

    主从Reactor

    image-20200922162138699

    Netty
    • BossGroup,负责接收客户端连接

    • WorkGroup,负责读写

    • NioEventLoopGroup,事件循环组

      • NioEventLoop,不断循环的处理任务线程,有一个selector
    • boss nioeventloop

      • 轮训accept事件
      • 处理accept事件,与client建立连接,
      • 生成NioSocketChannel,并注册到某个work的NIOEventLoop的selector
      • 处理任务队列的任务
    • work nioeventloop

      • 轮训read,write事件
      • 处理i/o事件,即read,write,在NiosocketChannel处理
      • 处理队列任务
    • work nioeventloop处理数据会使用pipline

      • pipline包含channel
    // netty 聊天
    public class Server{
    
        public static void main(String[] args){
            // 连个线程组,
            // 处理链接,默认线程数=cup核数*2
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            // 处理业务
            EventLoopGroup workGroup = new NioEventLoopGroup();
            // 服务端启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            try {
                bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)  // 通道类型
                    .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    }); // 给workgroup的eventloop设置处理器
                // 绑定端口,并启动
                ChannelFuture cf = bootstrap.bind("127.0.0.1",6668).sync();
                System.out.println("服务端启动....");
                // 关闭,当有关闭事件时关闭
                cf.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    
    }
    class NettyServerHandler extends ChannelInboundHandlerAdapter {
        // 读取客户端消息
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(ctx.channel().remoteAddress());
            System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
            
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端", CharsetUtil.UTF_8));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    public class Client {
        public static void main(String[] args) {
            // 事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            // 客户端对象
            Bootstrap bootstrap = new Bootstrap();
            try {
                bootstrap.group(eventExecutors) // 设置线程组
                    .channel(NioSocketChannel.class) // 设置通道实现类
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); // 加入处理器
                        }
                    });
                // 启动
                ChannelFuture cf = bootstrap.connect("127.0.0.1", 6668).sync();
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                eventExecutors.shutdownGracefully();
            }
        }
    }
    class NettyClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("ctx"+ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("客户端发来的信息",CharsetUtil.UTF_8));
        }
    
        // 读取客户端消息
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(ctx.channel().remoteAddress());
            System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            super.channelReadComplete(ctx);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    Netty Task

    • 用户自定义普通任务
    • 用户自定义定时任务
    • 非Reactor调用channel:服务器推送信息到客户端
    // 用户自定义的普通任务
     @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(ctx.channel().remoteAddress());
            System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
            // 若此时是耗时任务,客户端都要等待服务器执行完毕,
            // 通过下面方式可以异步执行,就是将任务交给了netty的task
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(ctx.channel().remoteAddress());
                    System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
                }
            });
            // 任务放在scheduleTaskQueue
            ctx.channel().eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(ctx.channel().remoteAddress());
                    System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
                }
            },5, TimeUnit.SECONDS);
        }
    

    Netty实现Http协议

    public class Main{
    
        public static void main(String[] args){
            // 连个线程组,
            // 处理链,默认线程数=cup核数*2
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            // 处理业务
            EventLoopGroup workGroup = new NioEventLoopGroup();
            // 服务端启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            try {
                bootstrap.group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)  // 通道类型
                        .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
                        .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
                        .childHandler(new ChannelInitializer<SocketChannel>() {// 给workgroup添加handler
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {// 每个请求pipline与handler是不共享的,
    
                                // 得到管道
                                ChannelPipeline pipeline = ch.pipeline();
                                // http 解码器
                                pipeline.addLast(new HttpServerCodec());
                                // 响应
                                pipeline.addLast(new NettyServerHandler());
    
                            }
                        }); // 给workgroup的eventloop设置处理器
                // 绑定端口,并启动
                ChannelFuture cf = bootstrap.bind("127.0.0.1",8080).sync();
                System.out.println("服务端启动....");
                // 关闭,当有关闭事件时关闭
                cf.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    // SimpleChannelInboundHandler<HttpObject>是ChannelInboundHandlerAdapter的子类
    // 通信数据被封装httpobject
    
    class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
            System.out.println(ctx.channel().pipeline().hashCode()+" "+this.hashCode());
            HttpRequest request = (HttpRequest) msg;
            URI uri = new URI(request.uri());
            if("/favicon.ico".equals(uri.getPath())){
                System.out.println("不作响应");
                return;
            }
            ByteBuf content = Unpooled.copiedBuffer("我是服务器,", CharsetUtil.UTF_16);
            DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,content);
            defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
            defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
            ctx.writeAndFlush(defaultFullHttpResponse);
        }
    }
    

    image-20200922202021791

    Unpooled类

    // 该对象包含数组,读取的时候不同flip进行翻转,底层维护了readerIndex,writerIndex
    ByteBuf a = Unpooled.buffer(100);
    ByteBuf b = Unpooled.copiedBuffer("helloworld",CharsetUtil.UTF_8);
    

    Netty心跳检测

    // 心跳检测处理器,触发器
    // 3s没读取,发送检测包
    // 5s没有写,发送检测包
    // 7s没有读,也没有写,发送检测包
    // 事件传递到下一个handler处理(自定义)
    pipline.addLast(new IdleStateHandler(3,5,7TimeUnit.SECONDS))
    pipline.addLast(new ChannelInboundHandlerAdapter(){
        public void userEventTriggered(ChannelHandlerContext ctx,Object evt){
            if(evt instance of IdleStateEvent){
                IdleStateEvent event = (IdleStateEvent)evt;
                swicth(event.state()){
                    case READER_IDLE:
                    XXX
                    case WRITER_IDLE:
                    XXX
                    case ALL_IDLE:
                    XXX
                }
            }
        }
    })
    

    Websocket

    image-20201002101425900

    class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            ctx.channel().writeAndFlush(new TextWebSocketFrame("xxxx"));
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            super.handlerAdded(ctx);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            super.handlerRemoved(ctx);
        }
    }
    
    var socket;
    if(windows.WebSocket){
        socket = new WebSocket("ws://localhost:7000/hello");
        socket.onmessage = function(ev){
            收到消息
        }
        socket.onopen = function(ev){
            连接开启
        }
        socket.onclose = function(ev){
            连接关闭
        }
    }
    
    function send(message){
        if(socket.readyState==WebSocket.OPEN){
    		socket.send(message);
        }
    }
    

    编码解码

    Netty编码解码

    • StringEncoder,StringDecoder
    • ObjectEncoder,ObjectDecoder

    序列化的缺点

    • 无法跨语言
    • 序列化体积大
    • 序列化性能低
    google的Protobuf
    • rpc数据交换
      • http+json(以前)
      • tcp+protobuf (现在)
    • 跨语言,支持很多语言
    • 高性能
    syntax = "protobuf4";// 版本号,
    option optimize_for = SPEED; // 加快解析
    option java_package = "top.deanxxx"; // 指定包名
    option java_outer_classname = "MyData";//文件名,外部类名
    message MyMessage{
    	enum DataType{
    		StudentType = 0;
    		UserType = 1;
    	}
    	DataType data_Type = 1; // MyMessage 的第一个属性
    	oneof dataBody{
    		Stduent s = 2; // MyMessage 的另一个属性
    		User u = 3;// MyMessage 的另一个属性
    	}
    }
    message User{
    	int32 id = 1; // id属性 ,1 表示属性的序号
    	string name = 2;
    }
    message Student{
    	int32 id = 1; // id属性 ,1 表示属性的序号
    	string name = 2;
    }
    // protoc.exe --java_out=. xxx.proto
    
    • 编写proto文件
    • 变为对对应语言
    • 服务端对应语言中的proto编码器(handler),发送
    • 客户端接收,对应解码器(handler)

    数据入栈出栈

    image-20201002134742983

    TCP粘包与拆包

    粘包:间隔时间短的包合并为一个

    拆包:数据过大拆分

    自定义协议+编解码器来解决

    关键是解决服务器每次读取数据长度问题。

    实现RPC

    image-20201002144927179

    // 公共接口
    public interface{
        String hello(String msg);
    }
    
    // provider
    public class HelloService implement HelloService{
        String hello(String msg){
            
        }
    }
    public class ServerBootstrap{
        public static void main(String args){
            
        }
    }
    public class NettyServer{
        // netty初始化
        private static void startserver0(String hostname,int port){
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            try {
                bootstrap.group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)  // 通道类型
                        .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
                        .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(new ServerHandler());
                            }
                        }); 
                ChannelFuture cf = bootstrap.bind(hostname,port).sync();
                cf.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    public class ServerHandler{
        channelRead:
        	xxxxx
    }
    
    public class NettyClient{
        private static ExecutorService executor = Executors.newFixedThreadPool(4);
        private static NettyClient client;
        // 编写方法,代理模式
        public  Object getBean(final Class<?>serviceclass,final String providerName){
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},(proxy,method,args)->{
                // 每调用一次hello,
                if(cliet ==null){
                    initial();
                }
                client.setPara(prividerName+args[0]);
                return executor.submit(client).get()
            })
        }
        
        private static void initial(){
            client = new NettyClient();
             EventLoopGroup eventExecutors = new NioEventLoopGroup();
            // 客户端对象
            Bootstrap bootstrap = new Bootstrap();
            try {
                bootstrap.group(eventExecutors)
                        .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new StringEncoder());
                            }
                        }); 
                // 启动
                ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync();
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                eventExecutors.shutdownGracefully();
            }
        }
        }
    }
    public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable{
        private ChannelHandlerContext ctx;
        private String result;
        private para;
        	channelActivate(ctx){
                ctx = ctx
            }
        	synchronzied channelRead(msg){
                result = msg.toString();
                notify();
            }
        synchronized call(){
            ctx.writeAndFlush(para);
            wait();
            retutn result;
        }
    }
    
  • 相关阅读:
    Android Studio Gradle 添加.so 支持文件
    poj 3270 更换使用
    linux通过使用mail发送电子邮件
    php 上传文件 $_FILES['']['type']的值
    浅谈Base64编码
    expect实现ssh自动登录
    C++ 多源码文件简单组织
    linux下修改hostid
    SQLite/嵌入式数据库
    类内数组声明,“类外”指定大小
  • 原文地址:https://www.cnblogs.com/Dean0731/p/13761767.html
Copyright © 2011-2022 走看看