zoukankan      html  css  js  c++  java
  • java语言导学(5版)--第12章并发之二

    1不可变对象

    概念:(immutable)对象创建后,状态不可更改。不可变对象在并发程序中尤其有用,因状态不可变,不会被线程干扰,也不会出现不一致状态。

    书中通过实例是可变的类,并从此类衍生出一个不可变实例。

    SynchronizedRGB类是表示颜色的类,每一个对象代表一种颜色,使用三个整形数表示颜色的三基色,字符串表示颜色名称。

    public class SynchronizedRGB {
        // Values must be between 0 and 255.
        private int red;
        private int green;
        private int blue;
        private String name;
    
        private void check(int red,
                           int green,
                           int blue) {
            if (red < 0 || red > 255
                || green < 0 || green > 255
                || blue < 0 || blue > 255) {
                throw new IllegalArgumentException();
            }
        }
    
        public SynchronizedRGB(int red,
                               int green,
                               int blue,
                               String name) {
            check(red, green, blue);
            this.red = red;
            this.green = green;
            this.blue = blue;
            this.name = name;
        }
    
        public void set(int red,
                        int green,
                        int blue,
                        String name) {
            check(red, green, blue);
            synchronized (this) {
                this.red = red;
                this.green = green;
                this.blue = blue;
                this.name = name;
            }
        }
    
        public synchronized int getRGB() {
            return ((red << 16) | (green << 8) | blue);
        }
    
        public synchronized String getName() {
            return name;
        }
    
        public synchronized void invert() {
            red = 255 - red;
            green = 255 - green;
            blue = 255 - blue;
            name = "Inverse of " + name;
        }
    }

    为避免不一致状态,须小心使用SynchronizedRGB,如某线程执行如下代码:

    SynchronizedRGB color =
        new SynchronizedRGB(0, 0, 0, "Pitch Black");
    ...
    int myColorInt = color.getRGB();      //Statement 1
    String myColorName = color.getName(); //Statement 2

    如果在statement1和statement2间,另一个线程执行了color.set方法,则myColorInt的值与myColorName值不匹配。可通过如下方法避免:

    synchronized (color) {
        int myColorInt = color.getRGB();
        String myColorName = color.getName();
    } 

     2高级并发对象

    目前为止,之前的教程都是重点讲述了最初作为 Java 平台一部分的低级别 API。这些API 对于非常基本的任务来说已经足够,但是对于更高级的任务就需要更高级的 API。特别是针对充分利用了当今多处理器和多核系统的大规模并发应用程序。 本章,我们将着眼于 Java 5.0 新增的一些高级并发特征。大多数功能已经在新的java.util.concurrent 包中实现。Java 集合框架中也定义了新的并发数据结构。

    1)锁对象,支持可简化许多并发应用程序的锁用法

    2)执行器,定义运行和管理线程的高级api。适用于大型应用程序的线程池管理。

    3)并发集合,简化大型数据集的管理,减少同步需求。

    4)原子变量,拥有最小化同步特性,避免内存一致性问题。

    5)ThreadLockRandom,提供多线程中的伪随机数生成。

     2.1锁对象

    同步代码依赖于简单的可重入锁(synchronized),但有很多限制。java.util.concurrent.locks包中的最基本接口:Lock接口。Lock对象和同步代码的外部锁类似,每次只能一个线程拥有一个Lock对象,同样支持

    wait和notify机制。Lock对象对比内部锁最大优势是可从获取锁的尝试中退出(当锁当前不可用或超时,tryLock方法退出;当另一个线程的这个锁获得前发送中断,则lockInterruptibly方法会退出。

    (进一步了解?)

    让我们使用 Lock 对象来解决我们在活跃度中见到的死锁问题。Alphonse 和 Gaston 已经把自己训练成能注意到朋友何时要鞠躬。我们通过要求 Friend 对象在双方鞠躬前必须先获得锁来模拟这次改善。下面是改善后模型的源代码 Safelock :

    public class Safelock {
        static class Friend {
            private final String name;
            private final Lock lock = new ReentrantLock();
    
            public Friend(String name) {
                this.name = name;
            }
    
            public String getName() {
                return this.name;
            }
    
            public boolean impendingBow(Friend bower) {
                Boolean myLock = false;
                Boolean yourLock = false;
                try {
                    myLock = lock.tryLock();
                    yourLock = bower.lock.tryLock();
                } finally {
                    if (!(myLock && yourLock)) {
                        if (myLock) {
                            lock.unlock();
                        }
                        if (yourLock) {
                            bower.lock.unlock();
                        }
                    }
                }
                return myLock && yourLock;
            }
    
            public void bow(Friend bower) {
                if (impendingBow(bower)) {
                    try {
                        System.out.format("%s: %s has" + " bowed to me!%n", this.name, bower.getName());
                        bower.bowBack(this);
                    } finally {
                        lock.unlock();
                        bower.lock.unlock();
                    }
                } else {
                    System.out.format(
                            "%s: %s started" + " to bow to me, but saw that" + " I was already bowing to" + " him.%n",
                            this.name, bower.getName());
                }
            }
    
            public void bowBack(Friend bower) {
                System.out.format("%s: %s has" + " bowed back to me!%n", this.name, bower.getName());
            }
        }
    
        static class BowLoop implements Runnable {
            private Friend bower;
            private Friend bowee;
    
            public BowLoop(Friend bower, Friend bowee) {
                this.bower = bower;
                this.bowee = bowee;
            }
    
            public void run() {
                Random random = new Random();
                for (;;) {
                    try {
                        Thread.sleep(random.nextInt(10));
                    } catch (InterruptedException e) {
                    }
                    bowee.bow(bower);
                }
            }
        }
    
        public static void main(String[] args) {
            final Friend alphonse = new Friend("Alphonse");
            final Friend gaston = new Friend("Gaston");
            new Thread(new BowLoop(alphonse, gaston)).start();
            new Thread(new BowLoop(gaston, alphonse)).start();
        }
    }

    2.2执行器

    前面例子中,新线程完成的任务(如runnable对象定义的)和这个线程本身(如一个线程对象所定义的)间有很强联系。这种方式不适合大型应用。大型应用中,把线程管理和应用的其他部分的创建分开,封装

    这些功能的对象即执行器(executor)

    1)执行器接口

    java.util.concurrent包中定义了三种执行器接口:

    1.1)Executor,支持运行新任务的简单接口

    1.2)ExecutorService,Executor的子接口,为帮助管理单个任务和执行器自己的生命周期添加了某些功能

    1.3)ScheduleExecutorService,ExecutorService的子接口,支持任务的定时执行等。

    引用执行器对象的变量会以这三种接口中的一种形式声明,没有执行器类这种类型。

    2)Executor接口

    接口只有一个 execute 方法,用来替代通常创建(启动)线程的方法。例如:r 是一个 Runnable 对象,e 是一个 Executor 对象。可以使用

    e.execute(r);

    代替

    (new Thread(r)).start();

    但 execute 方法没有定义具体的实现方式。对于不同的 Executor 实现,execute 方法可能是创建一个新线程并立即启动,但更有可能是使用已有的工作线程运行r,或者将 r放入到队列中等待可用的工作线程。(我们将在线程池一节中描述工作线程。)

    3)ExecutorService 接口

    在execute方法之外添加了一个submit方法。

    3.1)submit 方法除了和 execute 方法一样可以接受 Runnable 对象作为参数,还可以接受 Callable 对象作为参数。使用 Callable对象可以能使任务返还执行的结果。

    3.2)通过 submit 方法返回的 Future 对象可以读取 Callable 任务的执行结果,或是管理 Callable 任务和 Runnable 任务的状态。 ExecutorService 也提供了批量运行 Callable 任务的方法。

    3.3)最后,ExecutorService 还提供了一些关闭执行器的方法。如果需要支持即时关闭,执行器所执行的任务需要正确处理中断。

    4)ScheduleExecutorService 接口

    ScheduledExecutorService 扩展 ExecutorService接口并添加了 schedule 方法。

    4.1)调用 schedule 方法可以在指定的延时后执行一个Runnable 或者 Callable 任务。

    4.2)ScheduledExecutorService 接口还定义了按照指定时间间隔定期执行任务的 scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法。

    5)线程池

    大部分执行器实现都使用由工作者线程(worker threads)组成的线程池(thread pools),这种类型的线程独立于它执行的Runnable和callable任务存在,并且常用来执行多个任务。

    使用工作线程可以使创建线程的开销最小化。在大规模并发应用中,创建大量的 Thread 对象会占用占用大量系统内存,分配和回收这些对象会产生很大的开销。

    5.1)一种最常见的线程池是固定大小的线程池(fixed thread pool)。这种线程池始终有一定数量的线程在运行,如果一个线程由于某种原因终止运行了,线程池会自动创建一个新的线程来代替它。需要执行的任务通过一个内部队列提交给线程,当没有更多的工作线程可以用来执行任务时,队列保存额外的任务。

    使用固定大小的线程池一个很重要的好处是可以实现优雅退化(degrade gracefully)。例如一个 Web 服务器,每一个 HTTP 请求都是由一个单独的线程来处理的,如果为每一个 HTTP 都创建一个新线程,那么当系统的开销超出其能力时,会突然地对所有请求都停止响应。如果限制 Web 服务器可以创建的线程数量,那么它就不必立即处理所有收到的请求,而是在有能力处理请求时才处理。

    5.2) 创建一个使用线程池的执行器最简单的方法是调用 java.util.concurrent.ExecutorsnewFixedThreadPool 方法。Executors 类还提供了下列一下方法:

    • newCachedThreadPool 方法创建了一个可扩展的线程池。适合用来启动很多短任务的应用程序。
    • newSingleThreadExecutor 方法创建了每次执行一个任务的执行器。
    • 还有一些 ScheduledExecutorService 执行器创建的工厂方法。

    如果上面的方法都不满足需要,可以尝试
    java.util.concurrent.ThreadPoolExecutor 或者java.util.concurrent.ScheduledThreadPoolExecutor

    (关于四种方法区别待整理?)

    6)fork/join

    6.1) 该框架是 JDK 7 中引入的并发框架。fork/join 框架是 ExecutorService 接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。

    6.2) fork/join 框架会将任务分发给线程池中的工作线程。fork/join 框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。

    6.3) fork/join 框架的核心是 ForkJoinPool 类,它是对 AbstractExecutorService 类的扩展。ForkJoinPool 实现了工作窃取算法,并可以执行 ForkJoinTask 任务。

    6.3.1)基本用法:

    第一步编写执行部分工作的代码,类似于:
    if (当前这个任务工作量足够小)
        直接完成这个任务
    else
        将这个任务或这部分工作分解成两个部分
        分别触发(invoke)这两个子任务的执行,并等待结果

    将该代码封装称一个ForkJoinTask子类,通常情况下会使用一种更为具体的的类型,或者是 RecursiveTask(会返回一个结果),或者是 RecursiveAction。 当你的 ForkJoinTask 子类准备好了,创建一个代表所有需要完成工作的对象,然后将其作为参数传递给一个ForkJoinPool 实例的 invoke() 方法即可。

    6.3.2) 模糊操作

    想要了解 fork/join 框架的基本工作原理,接下来的这个例子会有所帮助。假设你想要模糊一张图片。原始的 source 图片由一个整数的数组表示,每个整数表示一个像素点的颜色数值。与 source 图片相同,模糊之后的 destination 图片也由一个整数数组表示。 对图片的模糊操作是通过对 source 数组中的每一个像素点进行处理完成的。处理的过程是这样的:将每个像素点的色值取出,与周围像素的色值(红、黄、蓝三个组成部分)放在一起取平均值,得到的结果被放入 destination 数组。因为一张图片会由一个很大的数组来表示,这个流程会花费一段较长的时间。如果使用 fork/join 框架来实现这个模糊算法,你就能够借助多处理器系统的并行处理能力。下面是上述算法结合 fork/join 框架的一种简单实现:

    public class ForkBlur extends RecursiveAction {
        private int[] mSource;
        private int mStart;
        private int mLength;
        private int[] mDestination;
    
        // Processing window size; should be odd.
        private int mBlurWidth = 15;
    
        public ForkBlur(int[] src, int start, int length, int[] dst) {
            mSource = src;
            mStart = start;
            mLength = length;
            mDestination = dst;
        }
    
        protected void computeDirectly() {
            int sidePixels = (mBlurWidth - 1) / 2;
            for (int index = mStart; index < mStart + mLength; index++) {
                // Calculate average.
                float rt = 0, gt = 0, bt = 0;
                for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                    int mindex = Math.min(Math.max(mi + index, 0),
                                        mSource.length - 1);
                    int pixel = mSource[mindex];
                    rt += (float)((pixel & 0x00ff0000) >> 16)
                          / mBlurWidth;
                    gt += (float)((pixel & 0x0000ff00) >>  8)
                          / mBlurWidth;
                    bt += (float)((pixel & 0x000000ff) >>  0)
                          / mBlurWidth;
                }
    
                // Reassemble destination pixel.
                int dpixel = (0xff000000     ) |
                       (((int)rt) << 16) |
                       (((int)gt) <<  8) |
                       (((int)bt) <<  0);
                mDestination[index] = dpixel;
            }
        }
    
      ...

    接下来你需要实现父类中的 compute() 方法,它会直接执行模糊处理,或者将当前的工作拆分成两个更小的任务。数组的长度可以作为一个简单的阀值来判断任务是应该直接完成还是应该被拆分。

    protected static int sThreshold = 100000;
    
    protected void compute() {
        if (mLength < sThreshold) {
            computeDirectly();
            return;
        }
    
        int split = mLength / 2;
    
        invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
                  new ForkBlur(mSource, mStart + split, mLength - split,
                               mDestination));
    }

    如果前面这个方法是在一个 RecursiveAction 的子类中,那么设置任务在ForkJoinPool 中执行就再直观不过了。通常会包含以下一些步骤:

    1. 创建一个表示所有需要完成工作的任务。

      // source image pixels are in src
      // destination image pixels are in dst
      ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

    2. 创建将要用来执行任务的 ForkJoinPool。

      ForkJoinPool pool = new ForkJoinPool();

    3. 执行任务。

      pool.invoke(fb);

     6.3.3)标准实现

    除了能够使用 fork/join 框架来实现能够在多处理系统中被并行执行的定制化算法(如前文中的 ForkBlur.java 例子),在 Java SE 中一些比较常用的功能点也已经使用 fork/join 框架来实现了。在 Java SE 8 中,java.util.Arrays 类的一系列parallelSort() 方法就使用了 fork/join 来实现。这些方法与 sort() 方法很类似,但是通过使用 fork/join框 架,借助了并发来完成相关工作。在多处理器系统中,对大数组的并行排序会比串行排序更快。这些方法究竟是如何运用 fork/join 框架并不在本教程的讨论范围内。想要了解更多的信息,请参见 Java API 文档。 其他采用了 fork/join 框架的方法还包括java.util.streams包中的一些方法,此包是作为 Java SE 8 发行版中 Project Lambda 的一部分。想要了解更多信息,请参见 Lambda 表达式一节。

    2.3并发集合

    并发集合简化了大型数据集合管理,且极大的减少了同步的需求。

    java.util.concurrent 包囊括了 Java 集合框架的一些附加类。它们也最容易按照集合类所提供的接口来进行分类:

    • BlockingQueue 定义了一个先进先出的数据结构,当你尝试往满队列中添加元素,或者从空队列中获取元素时,将会阻塞或者超时。
    • ConcurrentMapjava.util.Map 的子接口,定义了一些有用的原子操作。移除或者替换键值对的操作只有当 key 存在时才能进行,而新增操作只有当 key 不存在时。使这些操作原子化,可以避免同步。ConcurrentMap 的标准实现是 ConcurrentHashMap,它是 HashMap 的并发模式。
    • ConcurrentNavigableMap 是 ConcurrentMap 的子接口,支持近似匹配。ConcurrentNavigableMap 的标准实现是 ConcurrentSkipListMap,它是 TreeMap 的并发模式。

    所有这些集合,通过在集合里新增对象和访问或移除对象的操作之间,定义一个happens-before 的关系,来帮助程序员避免内存一致性错误。

    2.4原子变量

    java.util.concurrent.atomic 包定义了对单一变量进行原子操作的类。所有的类都提供了 get 和 set 方法,可以使用它们像读写 volatile 变量一样读写原子类。就是说,同一变量上的一个 set 操作对于任意后续的 get 操作存在 happens-before 关系。原子的 compareAndSet 方法也有内存一致性特点,就像应用到整型原子变量中的简单原子算法。

    为了看看这个包如何使用,让我们返回到最初用于演示线程干扰的 Counter 类:

    class Counter {
        private int c = 0;
    
        public void increment() {
            c++;
        }
    
        public void decrement() {
            c--;
        }
    
        public int value() {
            return c;
        }
    }

    使用同步是一种使 Counter 类变得线程安全的方法,如 SynchronizedCounter:

    class SynchronizedCounter {
        private int c = 0;
    
        public synchronized void increment() {
            c++;
        }
    
        public synchronized void decrement() {
            c--;
        }
    
        public synchronized int value() {
            return c;
        }
    }

    对于这个简单的类,同步是一种可接受的解决方案。但是对于更复杂的类,我们可能想要避免不必要同步所带来的活跃度影响。将 int 替换为 AtomicInteger 允许我们在不进行同步的情况下阻止线程干扰,如 AtomicCounter:

    import java.util.concurrent.atomic.AtomicInteger;
    
    class AtomicCounter {
        private AtomicInteger c = new AtomicInteger(0);
    
        public void increment() {
            c.incrementAndGet();
        }
    
        public void decrement() {
            c.decrementAndGet();
        }
    
        public int value() {
            return c.get();
        }
    }

    2.5并发随机数

    并发随机数(JDK7)提供了高效的多线程生成伪随机数的方法。

    在 JDK7 中,java.util.concurrent 包含了一个相当便利的类 ThreadLocalRandom,可以在当应用程序期望在多个线程或 ForkJoinTasks 中使用随机数时使用。

    对于并发访问,使用 TheadLocalRandom 代替 Math.random() 可以减少竞争,从而获得更好的性能。

    你只需调用 ThreadLocalRandom.current(), 然后调用它的其中一个方法去获取一个随机数即可。下面是一个例子:

    int r = ThreadLocalRandom.current() .nextInt(4, 77);

    参考:

    编程要点:https://github.com/waylau/essential-java

    文中内容参考:https://blog.csdn.net/kkkloveyou/article/details/50561269

  • 相关阅读:
    微信小程序学习随笔
    SqlServer索引假脱机的解决
    web服务器出现大量CLOSE_WAIT连接的前因后果
    SqlServer和mysql字段拼接方法
    使用beego创建员工加班调休系统
    在c#程序中初步使用redis
    使用golang实现批量发送面试邀请邮件
    记c# rabbitmq的使用
    项目中使用mongodb的尝试
    手机集成支付宝支付功能的注意事项
  • 原文地址:https://www.cnblogs.com/cslj2013/p/9153349.html
Copyright © 2011-2022 走看看