zoukankan      html  css  js  c++  java
  • 多线程设计模式——Read-Write Lock模式和Future模式分析

    本文内所有实现的代码均附在文末,有需要可以参考。(好奇宝宝们可以粘贴下来跑一下

    多线程程序评价标准

    • 安全性:

      ​ 安全性就是不损坏对象。也就是保证对象内部的字段的值与预期相同。

    • 生存性:

      ​ 生存性是指无论什么时候,必要的处理都一定能够执行。失去生存性最典型的例子就是“死锁”。

    • 可复用性:

      ​ 指类能够重复利用。若类能够作为组件从正常运行的软件里分割出来,说明这个类有很高的复用性。

    • 性能:

      ​ 指能够快速、大批量地执行处理。主要影响因素有:吞吐量、响应性、容量等。

    这里还要再区分一下这四条。前两条是程序正常运行的必要条件;后两条是程序提高质量的必要条件。

    任何模式都有一个相同的“中心思想”

    安全性和生存性是基础,是所有模式都必须保证的;可复用性和性能是目的,是所有模式诞生的意义。”

    上面这句话是我们每一个使用设计模式的人,甚至是自己编写代码的人所应该牢记在心的。

    在接下来分析的两个模式中,我会用实际的设计模式的例子来帮助大家理解上面这句话的含义。

    Read-Write Lock 模式

    RW-Lock模式特点

    • 在执行读取操作之前,线程必须获取用于读取的锁。
    • 在执行写入操作之前,线程必须获取用于写入的锁。
    • 多个线程可以同时读取,但是读取时,不可以写入。
    • 至多有一个线程正在写入,此时其他线程不可以读取或写入。

    一般来说,执行互斥处理(也是必要的)会降低程序性能(这里的互斥处理指使用synchronized关键字)。但是通过这个模式,将针对写入的互斥处理和读取的互斥处理分开考虑,则可以提高程序性能。(具体性能提升效果请见下文“性能对比”一节)

    冲突总结

    多线程读写时总共有4种情况,会发生冲突的有三种。下面给出冲突表格:

    读取 写入
    读取 无冲突 读和写的冲突 RW Conflict
    写入 读和写的冲突 RW Conflict 写和写的冲突 WW Conflict

    手搓RW Lock模式代码

    这部分内容是为了帮助大家更好地理解RW Lock的实现原理和过程,实际操作中我们不必编写这么多代码来实现读写锁。但是在这里强烈建议认真阅读此部分,了解原理后对使用JAVA自带包或是实现特殊需求都会大有裨益!

    类图

    Data类中的buffer字段是读写的信息。ReaderThread类是读取的线程,WriterThread是写入的线程。Data类中还保有一个ReadWriteLock类的实例,它是这个模式的主角,起到保护读写的作用。

    Data类


    第一行红线处,lock是一个ReadWriteLock类的实例,起到保护读写的作用。

    第二、三行红线处,分别是readLock方法和readUnlock方法,夹在中间的是doRead方法(进行读取的方法)。

    第四、五行红线处,分别是writeLock方法和writeUnlock方法,夹在中间的是doWrite方法(进行写入的方法)。

    Data类中还有用于模拟耗时的方法,即假定写入操作耗时比读取长(符合通常程序的情况)。

    这里提到的”夹在中间“的说法,其实是另一种设计模式——“Before/After模式”。由于它的使用有一些坑点,我这里先“中断”一下,简单讲一下“Before/After模式”。

    P.S. Before/After模式

    前置处理(此模式中为获取锁)
        try{
            实际的操作(有return也会执行finally语句块中的内容)
        } finally {
            后置处理(此模式中为释放锁)
        }
    

    以上代码为Before/After模式的基本框架。

    此模式使用有两点要特别注意!!!

    • try语句后面一定要跟着finally语句块!finally语句块的含义是:只要进入了try语句块,就一定会在最后执行一次finally语句块内的代码,即使try语句块内有return语句也会执行。在这个模式中,使用finally语句就保证了,获取的锁在最后一定会被释放掉,避免"死锁"发生。

    • 前置处理的语句一定要放在try语句块外面!这一点可能会有很多人不理解,放在里面还是外面有什么区别?回答是:在绝大多数情况下,确实没有区别。 但是当线程被interrupt时,程序就有可能出现过多调用readUnlock和writeUnlock方法的风险。假如现在程序正在lock.readLock()中进行wait,此时该线程被interrupt,那么程序会抛出InterruptedException异常,并退出readLock方法。这时readingReaders字段并不会递增。

      从readLock方法退出的线程回跳到finally语句块,执行lock.readUnlock()。在这个方法中,,之前未递增的readingReaders字段会执行递减操作,该字段的值会与我们预期不同(变得比正常要小)。这就很有可能引发难以察觉的bug。

      (上面两段中出现的方法名和字段不知道没关系,它们都在下面即将介绍的ReadWriteLock类中,建议大家看完下面的ReadWriteLock类的介绍再回来理解一下这部分,很重要!!很容易出bug!!!

    ReadWriteLock类

    该类中保存有四个私有字段,前三个字段的含义很好理解,见图片中的代码注释。

    在这里,特别强调preferWriter字段!这是保证程序运行结果达到预期的重要一环,其含义和用法需要大家好好理解。这个preferWriter代表的含义是读取和写入两者之间的优先级关系。当preferWriter字段为true时,代表写入优先;为false时,代表读取优先。那么这个读取或写入的优先又是如何通过这一个布尔值实现的呢?这里就体现出了ReadWriteLock类的设计巧妙之处。

    我们看readLock方法中的守护模式(while+wait)的守护条件(while成立的条件)。(见上图中第二行红线)这行代码的含义是如果有正在写入的线程(数据正在被写入)或是写入优先并且有正在等待写入的线程,那么读取的线程就要wait。这里,preferWriter字段发挥了它关键的作用。

    再看readUnlock方法中对preferWriter字段的操作(第三行红线)。这里的含义是,在读取锁释放时,就把preferWriter字段置为true。因为读取锁释放时,一定表示已经进行完一次读取操作了,此时应该把优先权让给写入操作,所以将preferWriter置为true。

    同理,writeUnlock方法中对preferWriter字段的操作(第四行红线)也即代表进行完一次写入操作后,要把优先权交给读取操作,即把preferWriter字段置为false。

    这就像两个人却只有一个水瓶,一个人喝完一口水之后就要把水瓶交给对方,不然就会出现渴死的现象。

    那么如果把ReadWriteLock类中的preferWriter字段去掉,程序运行起来会是什么样子呢?如下:

    读取线程比写入线程多,而且读取操作耗时短,所以读取线程会一直抢占锁,导致写入线程无法写入。这就是程序“渴死”的样子了。(大家有兴趣可以把文末代码粘贴下来,把preferWriter字段去掉自己跑一下

    正确运行结果

    正确的运行结果应该是读取一段时间就写入一次,这样不断循环。所以读取的内容应该不断变化。结果见下图:

    适用场合

    • 读取操作繁重时

      ​ 即read操作很耗费时间。这种情况下,使用这种模式比Single Thread Execution模式(使用synchrnized关键字)更适合。反之,Single Thread Execution模式性能更好。

    • 读取频率比写入频率高时

      ​ 该模式的优点在于Reader角色之间不会发生冲突,这样可以避免阻塞而耗费时间。但若写入频率很高,则Writer角色会频繁打断Reader角色的读取工作,导致性能提升不会很明显。

    “逻辑锁”vs“物理锁”

    大家肯定都很熟悉通过synchronized关键字来进行线程同步控制,因为synchronized关键字可以获取实例的锁。但是这里synchronized关键字所获取的锁是JVM为每一个实例提供的一个物理锁。每个实例只有一个物理锁,无论如何编写程序,也无法改变这个物理锁的运行。

    我们这个Read Write Lock模式中所提供的“用于写入的锁”和“用于读取的锁”都是逻辑锁。这个锁不是JVM所规定的结构,而是编程人员自己实现的一种逻辑结构。这就是所谓的逻辑锁。我们可以通过控制ReadWriteLock类来控制逻辑锁的运行。

    那么这二者的关系是什么呢?其实,ReadWriteLock类提供的两个逻辑锁的实现,都是依靠ReadWriteLock实例持有的物理锁完成的。

    而此处我们也来解释一下上节中所说的,读取不繁重时,使用我们自己所构建的逻辑锁就会导致比使用synchronized关键字(物理锁)多很多逻辑操作,这样多出来的逻辑操作所耗费的时间也许会大于线程被阻塞的时间。这样就会导致本模式反而会比Single Thread Execution性能差。

    性能对比

    示例代码中一共有6个读取线程,两个写入线程。在本节性能对比中,我让每个读取线程进行20次读取后就输出运行时间然后终止。以下两张图分别为使用Read-Write Lock模式耗时和使用synchronized关键字耗时。

    Read-Write Lock模式:

    synchronized关键字:

    从以上两图输出的时间可以看出,在每个线程读取20次的情况下,使用Read-Write Lock模式可以比synchronized关键字节省三分之二(7秒钟左右)的时间。这在大量读取的程序中,会给程序性能带来极大的提升!!!(当然对于OO第二单元电梯作业来说,由于读写频率差异不大而且读取并不繁琐,所以在电梯程序中使用Read-Write Lock模式性能提升并不明显。不过谁又能说得准以后会不会用到呢?)

    “中心思想”分析

    • 正常运行的必要条件

      ​ 本模式中,通过ReadWriteLock类中的两个获取锁和两个释放锁的方法来模拟了synchronized关键字获取实例的锁和释放实例的锁这两个过程,从而在逻辑上保证了本模式在线程安全方面与synchronized关键字保护的方法完全相同。因此在安全性和生存性两方面,本模式很好地完成了。

    • 提升性能的必要条件

      ​ 本模式中,通过找到读取和写入交汇的四种情况中的读读无冲突的情况,并且实现读取锁和写入锁的分离,实现了多线程同时读取的效果,以此来提高频繁读取或是“重读取”的程序的性能。
      ​ 同时,我们不难发现,关于多线程同步控制的代码都封装在ReadWriteLock类中,其他部分直接调用即可,无需进行同步控制,提高了可复用性。

    Future 模式

    Future模式特点

    我从本模式中先提取出两个最关键的核心代码展示一下。

    Data data = host.request(10, ‘A’);

    host.request方法是启动一个新线程来执行请求。但是在这行代码中该方法的返回值,不是新线程执行得到的最后结果,这个data只是一张“提货单”、“预约券”

    先返回“提货单”的意义在于这个返回值可以立即得到,不用等待请求处理线程返回最后结果。在“做蛋糕”的期间,我们可以做一些别的和“蛋糕”无关的事情,等到“蛋糕做好了”我们再回去取“蛋糕”。

    data.getContent();

    上面这句代码就是线程“取蛋糕”的动作。这个方法的返回值是真正的“能吃的蛋糕”。

    手搓Future模式代码

    类图

    Main类发出请求给Host类,Host类接收到请求后立刻制造一个FutureData类的实例当作提货券返回给Main类,同时Host类立刻启动一个新线程来处理请求(假设此处请求处理需要花费相当长时间),最后处理结果得到RealData类(蛋糕)。

    Main类

    Main类中,向Host类发出了三个请求。之后Main线程就去做别的工作了,我们这里用sleep(2000)来模拟。做完别的工作之后,Main线程输出请求的结果。

    Host类

    第一个红线处,通过Future这个FutureData类的实例(共享对象),将Main线程(买蛋糕的人)和realdata(蛋糕)建立起了,超越“时空”的联系。

    为什么说“时空”呢?我自己的理解这个模式,就是在主线程得到提货券后,主线程不管在何时何地(这里的空间是抽象空间,也即主线程不在处理请求线程的”线程空间"内)都可以在结果计算出来后即时获取结果。

    第二个红线处,使用了一个不太常用的语法模式——匿名内部类。读者不必对这个语法熟练掌握,只需要知道在示例程序里,这个类新建了一个处理请求的线程实例并让新的线程运行起来去处理请求即可。(count和c变量前面都加上final关键字是匿名内部类的要求,了解即可)

    说到这里,对于每个新的请求都启动一个新的线程来处理是另一个多线程设计模式——Thread-Per-Message模式。这个模式较为简单,感兴趣的读者可以自行学习了解一下,这里不再赘述了。

    FutureData类

    第一个红线处,这里设计的又是一个新的多线程设计模式——Balk模式。Balk模式的“中心思想”是不要我就走了。即当有多个线程时,其中一个线程已经完成了请求,那么别的线程来要完成请求时,这个模式就通过if条件告诉线程:“我已经完成我的请求了,不用你再来工作了,你可以走了。”,通过return将线程返回回去。

    第二、三个红线处,即在请求处理线程完成“蛋糕”的交付之后(this.realdata = realdata;),将ready字段置true,表明“蛋糕”已经随时可以取走了。然后通知所有等待线程。

    第四、五个红线处,使用守护模式,以没有ready作为守护条件,即如果“蛋糕”还没有做好,“取蛋糕”的线程就要wait。否则通过getContent方法返回回去。

    RealData类

    第一个红线处,这个String字段在本示例程序中代表“蛋糕”。

    第二个红线处,示例程序中用sleep来模拟耗时很长的请求处理过程。

    运行结果

    通过结果输出来看,在主线程执行其他工作的时候,与此同时请求正在被处理,这样极大地提高了处理效率。

    模式分析

    • 提高吞吐量

      ​ 单核CPU中,纯计算过程是无法提高吞吐量的。其他情况均可。

    • 异步方法调用

      ​ 通过Thread-Per-Message模式通过新建线程,模拟实现了异步。但是Thread-Per-Message模式无法接收返回值。

    • “准备”和“使用”返回值的分离

      ​ 为了解决Thread-Per-Message模式无法接收返回值的尴尬局面,Future模式横空出世。Future模式通过将准备返回值(返回提货券)和使用返回值(调用getContent方法)分离,即解决了异步调用无法接收返回值的问题,又提高了性能。

    与生产者-消费者模式有区别吗?

    答案是有。

    生产者-消费者模式大家都很熟悉,通过一个tray来将生产者生产产品(有的人将其对应为本模式的请求处理过程)和消费者使用产品(有的人将其对应为本模式的使用返回值过程)分离开来。目前来看,没有什么区别。

    但是,我们仔细想一想,Future模式通过一张提货券将“生产者“和”消费者“建立起来一对一的独一无二的联系。也就是说我有这个”蛋糕”的提货券,我只能取我这个自己的“蛋糕”,而不能取“蛋糕店”里做好的别人的“蛋糕”。说到这里,相信大家都已经发现本模式与生产者-消费者模式最大的区别了吧。

    模式拓展

    • 不让主线程久等的Future角色

      ​ 在示例程序中,如果FutureData的getContent方法被调用时,RealData类的实例还没有创建完成,则要主线程wait创建完成,有时这也会对主线程的效率造成损失。

      ​ 所以,为了避免这种情况的发生,我们可以将守护模式换成Balk模式,即主线程来“取蛋糕”时,若“蛋糕”还没做好,就让主线程返回,再等一会儿。这样主线程可以继续进行其他工作,过一定时间后再回来“取蛋糕”。

    • 会发生变化的Future角色

      ​ 通常情况下,返回值只会被设置到Future角色中一次。但是在有时需要不断反复设置返回值时,可以考虑给Future角色赋予“当前返回值”,即这个返回值会不断随时间而改变。

      ​ 例如:在通过网络获取图像数据时,可以在最开始获取图像的长和宽,接着获取模糊图像数据,在获取清晰图像数据。此时,这个不断变化的Future角色可能会大有用处。

    模式思考

    在课上,老师提示我,是否可以用简单的方法实现主动返回值的Future模式。

    目前,我只想到使用回调模式,在Future模式返回值设置好后,通过Host类回调主线程。不过,使用这种方式会导致Main类里多出很多与多线程同步处理相关的代码,导致Main类变的臃肿,而且整个模式可复用性也会降低。

    我在想出好的解决办法之后会及时更新本文,向大家展示。同时也欢迎各位读者有好的解决办法在评论区留言。

    Future模式“中心思想”

    • 正常运行必要条件

      本模式类似生产者-消费者的逻辑,将处理与请求分离,分离的同时建立起超越“时空”的联系,保证了最后结果传输的准确性。

    • 提高性能必要条件

      通过将“准备”返回值和“使用”返回值分离,将主线程从漫长的请求处理过程解放出来,让主线程在请求处理期间,可以做别的工作,提高性能。

    伟大的Concurrent包!

    RW Lock模式

    JAVA提供了java.util.concurrent.locks包来提供读写锁的实现。这个包里的ReentrantReadWriteLock类实现了ReadWriteLock接口。这个包的实现原理即为上述手搓RW-Lock模式代码所讲解的原理和实现。具体使用方法很简单,在理解原理之后使用很简单,就不多赘述了。

    Future模式

    JAVA提供了java.util.concurrent.Future接口相当于本模式中的Future角色。其中java.util.concurrent.FutureTask类是实现了Future接口的标准类,主要有get(获取返回值)、set(设置返回值)、cancel(中断请求处理运行)和setException(设置异常)四个方法。

    其原理和上述Future模式的手搓代码原理完全一致,相信大家完全理解上述讲解后,对这些concurrent包的使用一定会更加得心应手!!

    示例程序代码

    • RW Lock模式
    public class Main {
        public static void main(String[] args) {
            Data data = new Data(10);
            Thread Reader1 = new ReaderThread(data);
            Reader1.start();
            Thread Reader2 = new ReaderThread(data);
            Reader2.start();
            Thread Reader3 = new ReaderThread(data);
            Reader3.start();
            Thread Reader4 = new ReaderThread(data);
            Reader4.start();
            Thread Reader5 = new ReaderThread(data);
            Reader5.start();
            Thread Reader6 = new ReaderThread(data);
            Reader6.start();
            Thread Writer1 = new WriterThread(data, "ABCDEFGHIJKLMNOPQTSTUVWXYZ");
            Writer1.start();
            Thread Writer2 = new WriterThread(data, "abcdefghijklmnopqrstuvwxyz");
            Writer2.start();
            Scanner input = new Scanner(System.in);
            String end  = input.nextLine();
            while (end.equals("")) { end  = input.nextLine(); }
            Reader1.interrupt();
            Reader2.interrupt();
            Reader3.interrupt();
            Reader4.interrupt();
            Reader5.interrupt();
            Reader6.interrupt();
            Writer1.interrupt();
            Writer2.interrupt();
        }
    }
    
    
    public class Data {
        private final char[] buffer;
        private ReadWriteLock lock = new ReadWriteLock();
        public Data(int size) {
            this.buffer = new char[size];
            for (int i = 0; i < buffer.length; i++) {
                buffer[i] = '*';
            }
        }
        public synchronized char[] read() throws InterruptedException {
            lock.readLock();
            try {
                return doRead();
            } finally {
                lock.readUnlock();
            }
        }
        public synchronized void write(char c) throws InterruptedException {
            lock.writeLock();
            try {
                doWrite(c);
            } finally {
                lock.writeUnlock();
            }
        }
        private char[] doRead() {
            char[] newbuf = new char[buffer.length];
            for (int i = 0; i < buffer.length; i++) {
                newbuf[i] = buffer[i];
            }
            slowly();
            return newbuf;
        }
        private void doWrite(char c) {
            for (int i = 0; i < buffer.length; i++) {
                buffer[i] = c;
                slowly();
            }
        }
        private void slowly() {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
            }
        }
    }
    
    //此处为性能测试代码(即执行20次读取,并统计时间)
    public class ReaderThread extends Thread {
        private final Data data;
        public ReaderThread(Data data) {
            this.data = data;
        }
        public void run() {
            try {
                long begin = System.currentTimeMillis();
                for (int i = 0; i < 20; i++) {
                    char[] readbuf = data.read();
                    System.out.println(Thread.currentThread().getName() + " reads " + String.valueOf(readbuf));
                }
                long time = System.currentTimeMillis() - begin;
                System.out.println(Thread.currentThread().getName() + ":time = " + time);
            } catch (InterruptedException e) {
            }
        }
    }
    
    
    import java.util.Random;
    
    public class WriterThread extends Thread {
        private static final Random random = new Random();
        private final Data data;
        private final String filler;
        private int index = 0;
        public WriterThread(Data data, String filler) {
            this.data = data;
            this.filler = filler;
        }
        public void run() {
            try {
                while (true) {
                    char c = nextchar();
                    data.write(c);
                    Thread.sleep(random.nextInt(3000));
                }
            } catch (InterruptedException e) {
            }
        }
        private char nextchar() {
            char c = filler.charAt(index);
            index++;
            if (index >= filler.length()) {
                index = 0;
            }
            return c;
        }
    }
    
    
    public final class ReadWriteLock {
        private int readingReaders = 0; // (A)…实际正在读取中的线程个数
        private int waitingWriters = 0; // (B)…正在等待写入的线程个数
        private int writingWriters = 0; // (C)…实际正在写入中的线程个数
        private boolean preferWriter = true; // 若写入优先,则为true
    
        public synchronized void readLock() throws InterruptedException {
            while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
                wait();
            }
            readingReaders++;                       // (A) 实际正在读取的线程个数加1
        }
    
        public synchronized void readUnlock() {
            readingReaders--;                       // (A) 实际正在读取的线程个数减1
            preferWriter = true;
            notifyAll();
        }
    
        public synchronized void writeLock() throws InterruptedException {
            waitingWriters++;                       // (B) 正在等待写入的线程个数加1
            try {
                while (readingReaders > 0 || writingWriters > 0) {
                    wait();
                }
            } finally {
                waitingWriters--;                   // (B) 正在等待写入的线程个数减1
            }
            writingWriters++;                       // (C) 实际正在写入的线程个数加1
        }
    
        public synchronized void writeUnlock() {
            writingWriters--;                       // (C) 实际正在写入的线程个数减1
            preferWriter = false;
            notifyAll();
        }
    }
    
    • Future模式
    public class Main {
        public static void main(String[] args) {
            System.out.println("main BEGIN");
            Host host = new Host();
            Data data1 = host.request(10, 'A');
            Data data2 = host.request(20, 'B');
            Data data3 = host.request(30, 'C');
    
            System.out.println("main otherJob BEGIN");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println("main otherJob END");
    
            System.out.println("data1 = " + data1.getContent());
            System.out.println("data2 = " + data2.getContent());
            System.out.println("data3 = " + data3.getContent());
            System.out.println("main END");
        }
    }
    
    
    public class Host {
        public Data request(final int count, final char c) {
            System.out.println("    request(" + count + ", " + c + ") BEGIN");
    
            // (1) 创建FutureData的实例
            final FutureData future = new FutureData();
    
            // (2) 启动一个新线程,用于创建RealData的实例
            new Thread() {
                public void run() {
                    RealData realdata = new RealData(count, c);
                    future.setRealData(realdata);
                }
            }.start();
    
            System.out.println("    request(" + count + ", " + c + ") END");
    
            // (3) 返回FutureData的实例
            return future;
        }
    }
    
    
    public interface Data {
        public abstract String getContent();
    }
    
    
    public class RealData implements Data {
        private final String content;
        public RealData(int count, char c) {
            System.out.println("        making RealData(" + count + ", " + c + ") BEGIN");
            char[] buffer = new char[count];
            for (int i = 0; i < count; i++) {
                buffer[i] = c;
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                }
            }
            System.out.println("        making RealData(" + count + ", " + c + ") END");
            this.content = new String(buffer);
        }
        public String getContent() {
            return content;
        }
    }
    
    
    public class FutureData implements Data {
        private RealData realdata = null;
        private boolean ready = false;
        public synchronized void setRealData(RealData realdata) {
            if (ready) {
                return;     // balk
            }
            this.realdata = realdata;
            this.ready = true;
            notifyAll();
        }
        public synchronized String getContent() {
            while (!ready) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }
            return realdata.getContent();
        }
    }
    

    参考资料:《图解JAVA多线程设计模式》

  • 相关阅读:
    OpenCL、CUDA
    最小和最廉价的超级计算机,DIY的
    组装属于您自己的Tesla个人超级计算机
    多处理器系统
    开源项目Spark简介
    基于Cassandra的日志和分布式小文件存储系统【1】
    网络广告js备忘【2】
    网络广告js备忘【1】
    成功产品的意外
    Cassandra HBase和MongoDb性能比较
  • 原文地址:https://www.cnblogs.com/rains-in-your-eyes/p/12618817.html
Copyright © 2011-2022 走看看