zoukankan      html  css  js  c++  java
  • 第五章 基础构建模块

    5.1 同步容器类

      Vector,Hashtable。对所有公有方法都是同步方法,使用synchronized修饰

      5.1.1 同步容器类的问题

      特殊情况还是需要客户端加锁。比如线程A遍历容器元素,线程B对同一个容器删除元素,会出现并发修改异常。解决办法是在客户端封装一层加锁方法。  

    public static Integer getValue(Vector<Integer> vector, int i) {
        synchronized (vector) {
            return vector.get(i);
        }
    }
    
    public static void deleteValue(Vector<Integer> vector, int i) {
        synchronized (vector) {
            vector.remove(i);
        }
    }

      5.1.2 迭代器域ConcurrentModificationException

      产生的原因,迭代器遍历会有校验,当遍历过程中发现集合元素被修改时,抛出异常  

    public static void main(String[] args) {
        List<Integer> list = (List)IntStream.range(1, 11).boxed().collect(Collectors.toList());
        Iterator var2 = list.iterator();
    
        while(var2.hasNext()) {
            Integer i = (Integer)var2.next();
            if (i == 10) {
                list.remove(i);
            }
        }
    }

      其中Iterator#next方法底层调用checkForComodification方法验证两个参数是否相等,一个是modCount,一个是expectedModCount。在迭代器初始化时,这两个参数设置成一样的。当集合修改元素时,modCount++,但是迭代器本身并不感知,expectedModCount无变化,导致下次遍历时验证失败抛出异常。解决办法是使用迭代器本身的remove方法,内里删除完后会重置使两者相等。

      5.1.3 隐藏迭代器

      有些方法隐式调用迭代器遍历,比如打印集合元素,foreach等。

    5.2 并发容器JUC

      同步容器不适合高并发场景使用,对集合的写入读取都会加锁。Java提供了并发容器改善性能。

      5.2.1 ConcurrentHashMap

      见数据结构的ConcurrentHashMap章节

      5.2.3 CopyOnWriteArrayList

      替代同步List,写时复制。每次修改都会加锁,复制原列表,在快照上修改,修改完成后替换引用并释放锁。读取是不用加锁的,读取的是历史版本。

    5.3 阻塞队列和生产者-消费者模式

      阻塞队列提供了可阻塞的put和take方法,支持定时的offer和poll方法。队列可以是有界和无界的,有界队列,当队列满时,put方法会阻塞;当队列空时,take方法会阻塞。

      BlockingQueue有多种实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,PriorityBlockingQueue支持按自定义优先级排序。

      5.3.3 双端队列和工作密取

      双端队列可以从头部和尾部插入和删除。工作密取:每个消费者对应一个各自的队列,从头部开始消费,当有新任务时插入到尾部,当自身的队列消费完毕后,会选择其他队列从尾部元素开始消费。

    5.4 阻塞方法和中断

      阻塞方法:在方法的执行过程中,线程会被阻塞进入等待。每个线程有一个中断标识,想要中断某个线程正在进行的操作可以设置这个标识。这时一种协作机制,是否真正中断是由目标线程自己选择的。当某个方法抛出InterruptedException时,这个方法的调用者必须要处理中断。有两种基本选择:

      1.传递异常:重新抛出InterruptedException异常,交由更上层的调用者解决

      2.恢复中断:有时候不能抛出异常,这个时候需要重新设置线程的中断状态,让上层知道。

      见:https://www.cnblogs.com/walker993/p/14325458.html

    5.5 同步工具类

      根据自身的状态协调线程的运行逻辑。比如阻塞队列、信号量、栅栏、闭锁。

      5.5.1 闭锁

      相当于一扇门,到达结束状态前,门是锁着的,线程无法通过,到达结束状态后,门打开并且不会再关闭。   

    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchTest {
        private static CountDownLatch latch = new CountDownLatch(2);
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                System.out.println("线程" + Thread.currentThread().getId() + "初始化线程池完毕");
                latch.countDown();
            }).start();
    
            new Thread(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("线程" + Thread.currentThread().getId() + "初始化数据库连接池完毕");
                latch.countDown();
            }).start();
    
            latch.await();
            System.out.println("启动完成");
        }
    }
    CountDownLatchTest

      5.5.2 FutureTask

      FutureTask是RunnableFuture的实现,RunnableFuture接口继承了Runnable和Future

      1. Runnable,定义了run方法无返回值,线程执行的任务

      2.Future,定义了一些跟踪任务执行的状态,阻塞获取执行结果的方法

      所以说FutureTask可以看成是一个可以阻塞获得结果的任务。可能会问Runnable不是没有返回值吗?那获取什么返回值呢?

      FutureTask有两个构造方法:

      public FutureTask(Runnable runnable, V result)

      和 public FutureTask(Callable<V> callable) 

      其中Callable定义了call方法有返回值,FutureTask可以阻塞获得执行的结果。通过Runnable定义的FutureTask可以指定执行完成的返回结果。

      5.5.3 信号量

      用来控制一个资源的访问量,获得了许可的线程才能访问。

      1.跟锁不同的时,一个线程获取了访问许可,可以由另一个线程中释放,也就是许可跟线程没有绑定关系。

      2.不支持重入  

    public class SemaphoreTest {
        public static void main(String[] args) throws InterruptedException {
            System.out.println("---演示线程无关性---");
            ConnectionPool pool = new ConnectionPool(Lists.newArrayList(1));
            new Thread(pool::borrow).start();
            new Thread(pool::borrow).start();
            Thread.sleep(1000);
            new Thread(() -> pool.release(1)).start();
    
            Thread.sleep(1000);
            System.out.println("---演示不支持重入---");
            ConnectionPool pool2 = new ConnectionPool(Lists.newArrayList(1));
            new Thread(() -> {
                pool2.borrow();
                pool2.borrow();
            }).start();
        }
    
        static class ConnectionPool {
            List<Object> pool;
            Semaphore semaphore;
    
            ConnectionPool(List<Object> conn) {
                this.pool = conn;
                semaphore = new Semaphore(conn.size());
            }
    
            private void borrow() {
                System.out.println("线程" + Thread.currentThread().getId() + "尝试获取了连接");
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                pool.remove(pool.size() - 1);
                System.out.println("线程" + Thread.currentThread().getId() + "获取了连接");
            }
    
            private void release(Object conn) {
                pool.add(conn);
                semaphore.release();
                System.out.println("线程" + Thread.currentThread().getId() + "释放了连接");
            }
        }
    }
    SemaphoreTest

      实验结果:

      

      5.5.4 栅栏  

      类似于闭锁,但是可以重置。

    public class CyclicBarrierTest {
    
        public static void main(String[] args) {
            testWait();
        }
    
        /**
         * 多线程任务等待测试
         * 场景:
         * 两阶段任务,3个线程一起执行,每阶段都需要全部执行完毕,才能开始下阶段
         */
        public static void testWait() {
            int allWorker = 3;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(allWorker);
            new Thread(() -> {
                try {
                    //阶段1
                    Thread.sleep(100);
                    int await = cyclicBarrier.await();//还需要等待的线程数
                    System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段1工作");
                    //尝试重置栅栏
                    tryReset(await, cyclicBarrier);
                    //阶段2
                    Thread.sleep(1000);
                    await = cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段2工作");
    
                } catch (Exception ignore) {
                }
            }, "线程a").start();
    
            new Thread(() -> {
                try {
                    //阶段1
                    Thread.sleep(400);
                    int await = cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段1工作");
                    //尝试重置栅栏
                    tryReset(await, cyclicBarrier);
                    //阶段2
                    Thread.sleep(600);
                    await = cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段2工作");
    
                } catch (Exception ignore) {
                }
            }, "线程b").start();
    
    
            new Thread(() -> {
                try {
                    //阶段1
                    Thread.sleep(300);
                    int await = cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段1工作");
                    //尝试重置栅栏
                    tryReset(await, cyclicBarrier);
                    //阶段2
                    Thread.sleep(100);
                    await = cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段2工作");
    
                } catch (Exception ignore) {
                }
            }, "线程c").start();
        }
    
        /**
         * 当最后一个完成的会还原栅栏
         *
         * @param await         剩余需要等待的线程数
         * @param cyclicBarrier 栅栏
         */
        private static void tryReset(int await, CyclicBarrier cyclicBarrier) {
            if (await == 0) {
                cyclicBarrier.reset();
            }
        }
    }
    CyclicBarrierTest 

      5.5.5 Exchanger

      线程间交换数据

    public class ExchangerTest {
    
        public static void main(String[] args) {
            testDataExchange();
        }
    
        /**
         * 多线程数据交换测试
         * 场景:两个线程交换本地数据
         */
        public static void testDataExchange() {
            Exchanger<Goods> exchanger = new Exchanger<>();
            new Thread(() -> {
                Goods goods = new Goods("线程1-商品1");
                System.out.println("线程1生产了" + goods);
                try {
                    goods = exchanger.exchange(goods);
                } catch (InterruptedException e) {
                    //
                }
                System.out.println("线程1交换完毕,目前商品:" + goods);
    
            }).start();
    
            new Thread(() -> {
                Goods goods = new Goods("线程2-商品2");
                System.out.println("线程2生产了" + goods);
                try {
                    goods = exchanger.exchange(goods);
                } catch (InterruptedException e) {
                    //
                }
                System.out.println("线程2交换完毕,目前商品:" + goods);
            }).start();
    
        }
    
        @AllArgsConstructor
        static class Goods {
            private String name;
    
            @Override
            public String toString() {
                return name;
            }
        }
    }
    ExchangerTest

      

    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    谈谈关于MVP模式中V-P交互问题
    Delphi MVC模 2
    Delphi MVC模式 1
    Java长整除问题
    Java中Scanner类的简单用法
    Java中throw和throws的区别
    Java必须掌握的运算符
    Java编程多重循环
    Java实现三种简单的排序
    使用Java向properties存数据
  • 原文地址:https://www.cnblogs.com/walker993/p/14589399.html
Copyright © 2011-2022 走看看