zoukankan      html  css  js  c++  java
  • 05-netty读书笔记

    本文是《Netty、Redis、Zookeeper高并发实战》的读书笔记。

    包含Netty部分的内容。未包含Redis和zookeeper的内容。

    IO

    Socket,文件的IO都是进程缓存区和Kernel缓存区的交换,当内核缓存需要进行真实的物理IO的时候,会发生系统中断(读中断,写中断),此时会发生进程的信息和状态保护。完成物理IO后,还需要回复进程的数据和状态信息。

    4中主要的IO模型

    1、同步阻塞IO(Blocking IO)

    阻塞与非阻塞:kernel IO完成后才能返回用户控件执行用户的操作,否则,用户控件处于“卡死”状态(其实是在等待IO完成。阻塞指的是用户空间的状态。

    同步与异步:是一种用户空间与内核空间的IO发起方式。

    同步IO是指用户空间的线程是主动发起IO请求的一方,内核空间是被动接受方。

    异步IO则反过来,是指系统内核是主动发起IO请求的一方,用户空间的线程是被动接受方。

    传统的IO模型都是同步阻塞IO。在Java中,默认创建的socket都是阻塞的。

    详细解释

    直来直去的。属于过程编程。

    优点:开发简单;阻塞线程会被挂起,基本不会占用CPU的资源。

    2、同步非阻塞IO(Non-blocking IO)

    非阻塞IO:指的是用户空间的线程不需要等待内核IO操作彻底完成,可以立即返回用户空间执行接下来的操作,即处于非阻塞状态。与此同时,内核会立即返回给用户一个状态值。

    非阻塞IO要求socket被设置为NONBLOCKING。

    此处的NIO(同步非阻塞IO)模型不是Java中的NIO(New IO)库。

    详细解释

    NIO中,一旦开始IO系统调用,会出现两种情况:

    • 内核缓冲区中无数据,系统调用会立即返回,返回一个失败的信息
    • 内核缓冲区中有数据,是阻塞的,直到数据从内核缓冲区复制到用户进程缓冲区。完成后,系统调用返回成功,用户进程开始处理用户空间的缓存数据。

    特点

    • 用户程序的线程需要不断的进行IO系统调用,轮询数据是否已经准备好,如果没有准备好,就继续轮询,直到按成IO系统调用为止。
    • 每次发起IO系统调用,在内核等待数据过程中可以立即返回,用户线程不会阻塞,实时性较好
    • 不断的轮询,占用大量的CPU时间,效率低下。
    • 高并发场景下,同步非阻塞IO不可以用

    3、IO多路复用(IO Multiplexing)

    即经典的Reactor反应器设计模式,有时也称为异步阻塞IO,Java中的Selector选择器和Linux的epoll都是这种模型。

    详细解释

    IO多路复用,可以解决同步非阻塞IO中的轮询等待问题。

    引入一种新的系统调用:查询IO的就绪状态。Linux中是select/epoll系统调用。通过该系统调用,一个线程可以见识多个文件描述符,一旦某个描述符就绪(一般是内核缓冲区可读/可写),内容能够将就绪的状态返回给应用程序。随后应用程序根据就绪状态进行相应的IO系统调用。

    在IO多路复用模型中通过select/epoll系统调用,单个应用程序的线程可以不断地轮询成百上千的socket连接,当某个连接有IO就绪的状态,就返回对应的可以执行的读写操作。

    过程

    • 1,选择器注册。讲需要操作的目标socket连接提前注册到select/epoll的选择器中。
    • 2.就绪状态的轮询。通过选择器的查询方法,查询注册过的所有socket连接的就绪状态。通过查询的系统调用,内核会返回一个就绪的socket列表。放任何一个注册过的socket中的数据准备好了,内核缓冲区有数据了,内核会降该socket加入到就绪列表中。
    • 当用户线程调用了select查询方法,那么这个线程就会被阻塞掉
    • 3,用户线程获得了就绪状态的列表后,根据其中的socket连接,发起read系统调用,用户线程阻塞。内核开始复制数据到用户空间缓冲区。
    • 4、复制完成后,内核返回结果,用户线程解除阻塞。用户线程读取到了数据,就继续执行。

    特点

    • IO多路复用模型涉及到两种系统调用:

      • select/epoll的就绪查询
      • IO操作(read.write)
    • 与NIO相似,IO多路复用模型也需要轮询。线程需要不断的进行select/epoll轮询,查找出达到IO操作就绪的socket连接。

    • IO多路复用与NIO有密切关系,对于注册在选择器上的每一个可以查询的socket连接,一般都设置为同步非阻塞模型。

    • 优点:

      • 与一个线程维护一个连接的阻塞IO相比,使用select/epoll的最大优势在于,一个选择器查询线程可以同时处理成千上万的连接。大大减小了系统开销。
    • 缺点:

      • 本质上,select/epoll系统调用是阻塞的,属于同步IO。都需要在读写事件就绪后,由系统调用本身负责读写,也就是说这个读写过程是阻塞的。

    4、异步IO(Asynchronous IO,AIO)

    异步IO,指的是用户空间与内核空间的调用方式反过来。用户空间的线程变成被动接受者,而内核空间成了主动调用者。类似于Java中的回调模式,用户空间的线程向内核空间注册了各种IO事件的回调函数,由内核去主动调用。

    详细解释

    AIO基本流程:用户线程通过系统调用,向内核注册某个IO操作。内核在整个IO操作完成后(数据从网卡复制到用户内存缓冲区),通知用户线程继续执行后续的业务操作。

    举个例子

    • 1、用户线程发起一个read系统调用,然后就去做用户线程接下来的其他事情,用户线程不阻塞
    • 2、内核开始IO操作,完成数据准备,复制到用户内存空间
    • 3、内核给用户线程发生一个信号或者回调用户线程注册的回调接口,告诉用户线程read操作完成了
    • 用户线程读取用户缓冲区的数据,完成后续的业务操作。

    特点

    • 用户线程非阻塞
    • 用户线程需要接收内核IO完成的事件,或者用户线程需要注册一个IO操作完成的回调函数
    • AIO也被称为信号驱动IO
    • 缺点:
      • 需要底层内核支持
      • 用户程序仅需要进行事件的注册与接收,其他事情交给了操作系统

    Netty使用的是IO多路复用模型。Linux底层仍使用的事epoll。

    Java NIO基础详解

    Java NIO

    核心组件

    • Channel(通道)
    • Buffer(缓冲区)
    • Selector(选择器)

    属于IO多路复用模型。

    Old IO(OIO)是面向流的,NIO是面向缓冲区的。

    • OIO是面向字节流或者字符流的,一个个读取,NIO是面向缓冲区的,通过Channel和Buffer进行读取数据,可以随意读取Buffer中任意位置的数据
    • OIO是阻塞的,NIO是非阻塞的

    Channel

    OIO中会有输入流和输出流,而NIO中的Channel可以进行读取与写入。

    Selector

    IO多路复用模型,指的是一个线程或者进程可以同时监视多个文件描述符,一旦其中一个或者多个文件描述符可读或者可写,系统内核就通知该线程。Selector选择器是一个IO事件的查询器。通过选择器,一个线程可以查询多个通道的IO事件的就绪状态。

    首先把通道注册到Selector中,然后通过Selector内部的机制,可以查询注册的通道是否有就绪的IO事件(读/写/连接完成)。

    一个选择器只需要一个线程进行监控。即,可以通过一个线程去管理多个通道。(背后是IO多路复用的支持)

    Buffer

    应用程序与Channel的主要交互操作就是进行数据的读写。通道的读取,就是把数据从Channel中读取到缓冲区,通道的写入,就是把数据从缓冲区写入到Channel中。

    Buffer的本质是一个内存块(数组)。与普通的内存块(数组)不同的地方是,NIO Buffer对象提供了一组更加有效的读取写入的方法(可以交替读写)。

    Buffer类是一个非线程安全的类。

    有8中Buffer。ByteBuffer(使用最多),CharBufferDoubleBufferFloatBufferIntBufferLongBufferShortBufferMappedByteBuffer

    Buffer类

    内部有一个byte[]数组内存块,作为缓冲区。

    三个重要属性:capacity(容量),position(读写位置),limit(读写限制)

    标记属性:mark,可以将当前position临时存入mark中,需要的时候再恢复。

    capacity:指的是写入对象的数量,不是byte[]的大小。比如写入的是IntBuffer,capacity是10,那么只能写入10个对象。

    position是当前指针的位置。写入时候,position从0开始,最大值是limit,limit会被置为capacity大小。

    当需要读取写入的Buffer时候,调用flip(),把写模式转换成读模式,这个时候,limit会被置为position,表示可以被读取的最大值,接着position会被重置为0,即为通道的开头了。

    Buffer类几个重要方法
    • allocate()创建缓冲区
      • intBuffer = IntBuffer.allocate(20);
      • 创建后的intBuffer,处于写入模式,大小为20*4个字节
    • put(),写入到缓冲区,intBuffer.put(8);
    • flip(),翻转模式;写模式翻转到读模式,会清楚mark中的值,每个模式下应该使用自己的mark,否则会混乱啊
    • 读模式返回写模式: intBuffer.clear()或者intBuffer.compact()
    • get()
    • rewind() 倒带,可以重复读,主要调整position的位置,mark被清除。
    • mark()和reset(),mark()讲当前的position放入mark,reset是把mark恢复到position位置。

    使用Buffer类基本步骤:

    • 创建子类实例对象:使用allocate()
    • 调用put
    • 调用flip()
    • 调用get()
    • 调用clear()或者compact()

    Channel类

    一个Channel对于一个底层的文件描述符。还可以对应某个网络传输协议,Java中实现了几个,比如FileChannel(文件IO),SocketChannel(TCP连接),ServerSocketChannel(TCP连接监听),DatagramChannel(对于UDP)。

    FileChannel

    • 专门操作文件的

    • 阻塞模式,不可设置非阻塞模式

    • 文件获取,读取,写入,关闭

      • 获取

        		String scrFile = "D:\ok.txt";
                try {
                    FileInputStream fis = new FileInputStream(scrFile);
                    FileChannel channel = fis.getChannel();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }
                try {
                    FileOutputStream fileOutputStream = new FileOutputStream(scrFile);
                    FileChannel channel = fileOutputStream.getChannel();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }
        		try {
                    RandomAccessFile rw = new RandomAccessFile(scrFile, "rw");
                    FileChannel channel = rw.getChannel();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }
        
      • 读取

        从通道中读取数据,读取的数据是要写入buffer中的,所以:

        channel.read(byteBuffer);是把通道中的数据以ByteBuffer的格式写入到通道中的。
        
      • 写入通道

        buf.flip();处于读模式
        channel.write(buf);这个方法是把buffer中的数据写入通道,此时buffer应该处于读状态。返回值是写入通道的字节数,长度。
        
      • 关闭通道

        channel.close();
        
      • 强制刷新到磁盘

        channel.fore(true);
        

      SocketChannel与ServerSocketChannel

      客户端:socketChannel

      服务端:SocketChannel和ServerSocketChannel

      都支持阻塞和非阻塞模式。

      socketChannel.configureBlocking(false|true);
      

      非阻塞模式下的通道操作是异步的。

      • 获取SocketChannel

        SocketChannel sc = SocketChannel.open();静态获取
        sc.configureBlocking(false);设置非阻塞
        sc.connect(new InetSocketAddress("127.0.0.1",8080));发起连接
        

        由于非阻塞情况下,可能连接还没有建立,connect方法就返回了。

        while(!sc.finishConnect()){
            ....
        }
        
      • 读取与写入

        读取通道的内容,写入到buf中,与上面一致。

      • 关闭通道

        sc.shutdownOutput();
        

    DatagramChannel

    DatagramChannel.open();//创建一个udp通道

    datagramChannel.configureBlocking(false);//设置为非阻塞

    channel.socket().bind(new InetSocketAddress(8090));//绑定upd端口

    读取数据使用receive()方法。channel.receive(buf);返回的事SocketAddress类型

    写入。使用send(buf, new InetSocketAddress(IP,Port));

    关闭。channel.close();

    selector类

    selector的使命是完成IO的多路复用。

    一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO状况。选择器和通道是监控和被监控的关系。

    一个单线程处理一个selector,一个selector可以监控多个Channel。

    Channel和Selector之间的关系是通过注册的方式完成的。

    Channel.register(Selector sel, int ops)方法可以把一个通道实例注册到一个Selector上。第一个参数是Selector实例,第二参数是Selector需要监控的IO事件类型。

    IO事件类型:

    • 可读:SelectionKey.OP_READ
    • 可写:SelectionKey.OP_WRITE
    • 连接:SelectionKey.OP_CONNECT
    • 接收:SelectionKey.OP_ACCEPT

    如果监控的事件是上面四种的组合,可以用“按位或”运算符来实现。

    int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
    

    IO操作不是对通道的IO操作,而是通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件。

    比如:某个SocketChannel通道,完成了和对端的握手连接,那么该通道则处于“连接就绪”(OP_CONNECT)的状态。

    通道是否可以监控

    FileChannel不可以监控。能不能被监控(也就是被选择),要看该Channel是否集成SelectableChannel类。

    SelectableChannel类实现了通道的可选择性所需要的的公共方法。

    选择键SelectionKey

    通道和选择器通过注册完成监控与被监控的关系后,就可以选择就绪事件。怎么选择呢?调用选择器的select();方法完成。

    选择键:就是那些被选择器选中的IO事件(这些事件是之前被注册到Selector上的)。选择键集合中包含的都是被选择中的且注册过的IO事件。

    选择键的功能强大:通过SelectionKey可以获得通道的IO事件类型,可以获得发生IO事件所在的通道,还可获得选出该选择键的选择器的实例。

    选择器使用流程

    • 1、获取选择器实例
    • 2、把通道注册到选择器中
    • 3、轮询感兴趣的IO就绪事件(选择键集合)

    1、Selector selector = Selector.open();

    2、将通道注册到选择器中

    //获取通道
    ServerSocketChannel ssc = ServerSocketChannel.open();
    //设置非阻塞
    ssc.configureBlocking(false);
    //绑定链接
    ssc.bind(new InetSocketAddress(port));
    //注册到选择器上,并制定监听事件为“接收连接”事件
    ssc.register(selector, SelectionKey.OP_ACCEPT);
    
    

    3、选择出感兴趣的IO就绪事件

    //轮询,选择感兴趣的IO就绪事件
    while(selector.select() > 0){
        Set selectedKeys = selector.selectedKeys();
        Iterator keyIterator = selectedKeys.iterator();
        while(keyIterator.hasNext()){
            SelectionKey key = keyIterator.next();
            //根据IO事件类型进行业务处理
            if(key.isAcceptable()){
                ...
            }else if(){
                ;;;;
            }
            
            //处理完后移除key
            keyIterator.remove();
        }
    }
    
    

    select()有多个重载版本。

    select();//阻塞调用,一直到至少有一个通道发生了注册的IO事件
    select(long timeout);//指定超时时间,同上
    selectNow();//非阻塞,不管有没有IO事件,都立即返回
    select();//返回值是int,表示发生了IO事件的通道数量。上次select到本次select之间发生的数量。
    
    

    反应器模式(reactor模式)

    反应器模式由两大角色组成:

    • 1、Reactor反应器线程:负责响应IO事件,并且分发到Handlers处理器
    • 2、handlers处理器:非阻塞当然执行业务处理逻辑

    单线程的Reactor

    reactor反应器模式类似于事件驱动模型。

    当事件触发时,事件源会将事件dispatch分发到handler处理器进行处理。这个reactor的角色就类似于dispatch事件分发器。

    多线程的Reactor

    产生多个子反应器,和多个selector。每个线程负责一个选择器的查询和选择。

    handler用到线程池。

    java中的异步回调

    join异步阻塞

    join操作原理:阻塞当前进程,直到准备合并的目标线程执行完成。

    主线程A,B线程,C线程。

    A开始运行--> new B();new C(); --> B.start(); C.start();

    接着A调用了B.join(); 有调用了C.join(); A 继续其他的事情。

    分析:A从开始调用B.jpin();的时候,A就卡主了(阻塞了),需要B完成后,A才会继续执行C.join();这个时候,A又卡主了。C.join();执行完后,A才会继续执行。而且,B.join();C.join();执行是被合并到A线程里的,A却无法知道B,C的执行结果,不能获取B,C的返回值。

    FutureTask类可以。

    FutureTask异步回调之重武器

    先说一下Runnable接口和Callable接口的区别。Runnable接口的run();方法没有返回值,Callable的call()有返回值,返回值是一个泛型。

    FutureTask的构造函数需要传入一个Callable类型的实例,它有一个实例方法get来获取Callable的结果。

    线程只有一个类Thread。那这个Callable的实例怎么执行run方法呢?

    那就是FutureTask类的作用。FutureTask类内部有一个run方法。

      public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;//在FutureTask实例化的时候传入的Callable实例
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();//执行实例的call方法,结果存储到result上
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);//这里把call()的结果设置到FutureTask类的成员outcome中去,可以方便get到
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
    

    这里有个事,FutureTask的实例的get方法还是阻塞的,要等call执行完才能get到,不然就会一直等待(也可以设置超时时间)。所以这个FutureTask的get方法还是异步阻塞的。谷歌的Guava框架实现了异步非阻塞调用获取线程执行结果。

    Guava

    继承了Java的Future接口。对异步回调机制做了增强

    • 引入新的接口:ListenableFuture。使得Java的Future异步任务在Guava中能背监控和获得非阻塞异步执行的结果.
      • ListenableFuture extends Future
      • 增加一个方法:addListener(Runnable r, Executor e);作用是把FutureCallback的回调工作封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行onSuccess或者onFailure处理。
    • 引入新的接口:FutureCallback。独立的新接口。在异步任务完成后,根据结果,完成不同的回调处理,并且可以处理异步结果。
      • 异步任务成功后,回调onSuccess(V result);
      • 异步任务失败后,回调onFailure(Throwable var);

    Guava异步回调流程

    • 1、实现Java的Callable接口,创建异步执行逻辑(如果不需要返回值,那就实现Runnable接口)

    • 2、创建Guava线程池

    • 3、把第一步创建的Callable/Runnable异步执行逻辑的实例通过submit提交给Guava线程池,从而获得ListenableFuture异步任务实例

    • 4、创建FutureCallback回调实例,通过FutureCallback讲回调实例绑定到ListenFuture异步任务上

    package com.stat;
    
    import com.google.common.util.concurrent.*;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class GuavaDemo {
        public static final int SLEEP_GAP = 500;
    
        public static String getCurThreadNmae() {
            return Thread.currentThread().getName();
        }
    
        //业务逻辑烧水
        static class HotWaterJob implements Callable<Boolean> {
            @Override
            public Boolean call() throws Exception {
                System.out.println("shaoshui....1秒");
                Thread.sleep(SLEEP_GAP);
                return true;
            }
        }
    
        static class WashJbo implements Callable<Boolean> {
            @Override
            public Boolean call() throws Exception {
                System.out.println("洗杯子。。。1秒");
                return true;
            }
        }
    
        //新建一个一步业务类型,作为泡茶喝的主线程类
        static class MainJbob implements Runnable {
            boolean waterok = false;
            boolean cupok = false;
            int gap = SLEEP_GAP / 10;
    
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(gap);
                        System.out.println("Doing other things");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (waterok && cupok) {
                        drinkTea(waterok, cupok);//都准备好了,开始泡茶喝
                        System.out.println("喝上了茶了。。。。");
                    }
                }
            }
    
            public void drinkTea(Boolean waterok, Boolean cupok) {
                if (waterok && cupok) {
                    System.out.println("泡茶喝、。。。");
                    this.waterok = false;
                    this.cupok = false;
                } else if (!waterok) {
                    System.out.println("烧水失败");
                } else if (!cupok) {
                    System.out.println("被子未准备ok。");
                }
            }
        }
    
        public static void main(String[] args) {
            final MainJbob mainJbob = new MainJbob();
            Thread mainThread = new Thread(mainJbob);
            mainThread.setName("主线程");
            mainThread.start();
    
            HotWaterJob hotWaterJob = new HotWaterJob();
            WashJbo washJbo = new WashJbo();
            //创建java线程池
            ExecutorService jPool = Executors.newFixedThreadPool(10);
            //包装java线程池,狗仔Guava线程池
            ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
            ListenableFuture<Boolean> hotFuture = gPool.submit(hotWaterJob);
            Futures.addCallback(hotFuture, new FutureCallback<Boolean>() {
                @Override
                public void onSuccess(Boolean aBoolean) {
                    if (aBoolean) {
                        mainJbob.waterok = true;
                    }
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    System.out.println("烧水失败了。。。。。。。1111");
                }
            });
            ListenableFuture<Boolean> washFuture = gPool.submit(washJbo);
            Futures.addCallback(washFuture, new FutureCallback<Boolean>() {
                @Override
                public void onSuccess(Boolean aBoolean) {
                    if (aBoolean) {
                        mainJbob.cupok = true;
                    }
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    System.out.println("洗杯子失败111111111111111");
                }
            });
    
        }
    }
    
    
    

    Netty

  • 相关阅读:
    007_排序_多重排序
    Locust 运行模式
    Locust介绍
    Locust环境搭建及报错解决
    8-02全局变量与输出语句
    8-01变量
    7-15ALL、 ANY、SOME子查询
    7-14 EXISTS子查询
    7-13IN和NOT IN 子查询
    7-12简单子查询
  • 原文地址:https://www.cnblogs.com/dhu121/p/14798592.html
Copyright © 2011-2022 走看看