zoukankan      html  css  js  c++  java
  • 读书笔记-实战java高并发程序设计

    并发级别: 阻塞、无饥饿、无障碍、无锁、无等待

    阻塞: 一个线程是阻塞的,那么在其他线程释放资源之前,当前线程无法继续执行

      当我们使用synchronized关键字或者重入锁时,我们得到的就是阻塞的线程
    

    无饥饿: 对于非公平锁来说,系统允许高优先级的线程插队。这样有可能导致低优先级线程产生饥饿。

       但如果锁是公平的,按照先来后到的规则,那么饥饿就不会产生,不管新来的线程优先级多高,要想获得资源,就必须乖乖排队,这样所有的线程都有机会执行。
    

    无障碍: 无障碍是一种最弱的非阻塞调度。两个线程如果无障碍地执行,那么不会因为临界区的问题导致一方被挂起。

       换言之,大家都可以大摇大摆地进入临界区了。那么大家一起修改共享数据,把数据改坏了怎么办呢?
       对于无障碍的线程来说,一旦检测到这种情况,它就会立即对自己所做的修改进行回滚,确保数据安全。但如果没有数据竞争发生,那么线程就可以顺利完成自己的工作,走出临界区。
    

    无锁: 在无锁的调用中,一个典型的特点是可能会包含一个无穷循环。在这个循环中,线程会不断尝试修改共享变量。

      如果没有冲突,修改成功,那么程序退出,否则继续尝试修改。但无论如何,无锁的并行总能保证有一个线程是可以胜出的,不至于全军覆没。
      至于临界区中竞争失败的线程,它们必须不断重试,直到自己获胜。如果运气很不好,总是尝试不成功,则会出现类似饥饿的现象,线程会停止。
    

    无等待: 无锁只要求有一个线程可以在有限步内完成操作,而无等待则在无锁的基础上更进一步扩展。它要求所有的线程都必须在有限步内完成,这样就不会引起饥饿问题。

       如果限制这个步骤的上限,还可以进一步分解为有界无等待 
       和线程数无关的无等待等几种,它们之间的区别只是对循环次数的限制不同。一种典型的无等待结构就是RCU(Read Copy Update)。
       它的基本思想是,对数据的读可以不加控制。因此,所有的读线程都是无等待的,它们既 
       不会被锁定等待也不会引起任何冲突。但在写数据的时候,先取得原始数据的副本,接着只修改副本数据(这就是为什么读可以不加控制),修改完成后,在合适的时机回写数据。
    

    重入锁: ReentrantLock

     开发人员必须手动指定何时加锁,何时释放锁。也正因为这样,重入锁对逻辑控制的灵活性要远远优于关键字synchronized
     可重入: 如果同一个线程多次获得锁,那么在释放锁的时候,也必须释放相同次数。如果释放锁的次数多了,那么会得到一个java.lang.IllegalMonitorStateException异常,
     反之,如果释放锁的次数少了,那么相当于线程还持有 这个锁,因此,其他线程也无法进入临界区
     对于关键字synchronized来说,如果一个线程在等待锁,那么结果只有两种情况,要么它获得这把锁继续执行,要么它就保持等待。而使用重入锁,则提供另外一种可能,那就是线程可以被中断。
    统一使用lockInterruptibly()方法。这是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中,可以响应中断
    
    public class IntLock implements Runnable {
        public static ReentrantLock lock1 = new ReentrantLock();
        public static ReentrantLock lock2 = new ReentrantLock();
        int lock;
    
        public IntLock(int lock) {
            this.lock = lock;
        }
    
        @Override
        public void run() {
            try {
                if (lock == 1) {
                    lock1.lockInterruptibly();
                    Thread.sleep(500);
                    lock2.lockInterruptibly();
                } else {
                    lock2.lockInterruptibly();
                    Thread.sleep(500);
                    lock1.lockInterruptibly();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (lock1.isHeldByCurrentThread()) {
                    lock1.unlock();
                }
                if (lock2.isHeldByCurrentThread()) {
                    lock2.unlock();
                }
                System.out.println(Thread.currentThread().getId() + ":线程退出");
            }
    
        }
    
        /**
         * 中断响应lockInterruptibly
         *
         * @param args
         * @throws InterruptedException
         */
        public static void main(String args[]) throws InterruptedException {
            IntLock r1 = new IntLock(1);
            IntLock r2 = new IntLock(2);
    
            Thread thread1 = new Thread(r1);
            Thread thread2 = new Thread(r2);
    
            thread1.start();
            thread2.start();
    
            Thread.sleep(1000);
    
            thread2.interrupt();
    
        }
    }
    

    tryLock()方法进行一次限时的等待

      public class TimeLock implements Runnable {
        public static ReentrantLock lock = new ReentrantLock();
    
        @Override
        public void run() {
            try {
                if (lock.tryLock(5, TimeUnit.SECONDS)) {
                    System.out.println(Thread.currentThread().getName());
                    System.out.println("get lock success");
                    Thread.sleep(6000);
                } else {
                    System.out.println(Thread.currentThread().getName());
                    System.out.println("get lock failed");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
        }
    
        /**
         * 锁申请等待限时tryLock
         * @param args
         */
        public static void main(String args[]) {
            TimeLock timeLock = new TimeLock();
            Thread thread1 = new Thread(timeLock);
            Thread thread2 = new Thread(timeLock);
    
            thread1.start();
            thread2.start();
        }
    }
    
    

    公平锁

    public static ReentrantLock fairLock = new ReentrantLock(true); //设置true指定锁是公平的,也可以不设置,分别运行观察公平锁与非公平锁间的区别

    非公平锁

    public static ReentrantLock unfairLock = new ReentrantLock();

    synchronize wait和notify搭配

    ReentrantLock和Condition对象搭配

    public class ReenterLockCondition implements Runnable {
        public static ReentrantLock lock = new ReentrantLock();
        public static Condition condition = lock.newCondition();
    
        @Override
        public void run() {
    
            try {
                lock.lock();
                condition.await();
                System.out.println("Thread is going on");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
    
        }
    
        public static void main(String args[]) throws InterruptedException {
            ReenterLockCondition reenterLockCondition = new ReenterLockCondition();
            Thread thread1 = new Thread(reenterLockCondition);
            thread1.start();
            System.out.println("睡眠2秒钟");
            Thread.sleep(2000);
            lock.lock();
            condition.signal();
            lock.unlock();
        }
    }
    

    当线程使用Condition.await()方法时,要求线程持有相关的重入锁,在Condition.await()方法调用后,这个线程会释放这把锁。
    同理,在Condition.signal()方法调用时,也要求线程先获得相关的锁。在signal()方法调用后,系统会从当前Condition对象的等待队列中唤醒一个线程。
    一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取,就可以继续执行了。
    因此,在signal()方法调用之后,一般需要释放相关的锁,让给被唤醒的线程,让它可以继续执行。

    允许多个线程同时访问:信号量(Semaphore)

    public class SemapDemo implements Runnable {
        final Semaphore semp = new Semaphore(5);
    
        @Override
        public void run() {
            try {
                semp.acquire();
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getId() + ":d one!");
                semp.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 总共20个线程,系统会以5个线程一组为单位,依次执行并输出
         *
         * @param args
         */
        public static void main(String args[]) {
            ExecutorService executorService = Executors.newFixedThreadPool(20);
            final SemapDemo demo = new SemapDemo();
            for (int i = 0; i < 20; i++) {
                executorService.submit(demo);
            }
        }
    }
    

    ReadWriteLock读写锁

    使用读写锁和使用可重入锁的性能对比

    public class ReadWriteLockDemo {
        private static Lock lock = new ReentrantLock();
        private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        private static Lock readLock = reentrantReadWriteLock.readLock();
        private static Lock writeLock = reentrantReadWriteLock.writeLock();
        private int value;
    
        public Object handleRead(Lock lock) throws InterruptedException {
            try {
                lock.lock();
                Thread.sleep(1000);//模拟读操作
                System.out.println("读操作:" + value);
                return value;
            } finally {
                lock.unlock();
            }
        }
    
        public void handleWrite(Lock lock, int index) throws InterruptedException {
            try {
                lock.lock();
                Thread.sleep(1000);//模拟写操作
                System.out.println("写操作:" + value);
                value = index;
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String args[]) {
            final ReadWriteLockDemo demo = new ReadWriteLockDemo();
    
            Runnable readRunnable = new Runnable() {
                @Override
                public void run() {
                    //分别使用两种锁来运行,性能差别很直观的就体现出来,使用读写锁后读操作可以并行,节省了大量时间
                    try {
                        demo.handleRead(readLock);
                        //demo.handleRead(lock);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
    
            Runnable writeRunnable = new Runnable() {
                @Override
                public void run() {
                    //分别使用两种锁来运行,性能差别很直观的就体现出来
                    try {
                        demo.handleWrite(writeLock, new Random().nextInt(100));
                        //demo.handleWrite(lock, new Random().nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            for (int i = 0; i < 18; i++) {
                new Thread(readRunnable).start();
            }
            for (int i = 18; i < 20; i++) {
                new Thread(writeRunnable).start();
            }
        }
    }
    

    倒计数器:CountDownLatch: 把门锁起来,不让里面的线程跑出来,它可以让某一个线程等待直到倒计数结束,再开始执行

    模拟火箭发射前多个线程的检查工作,检查完成后才点火发射
    public class CountDownLatchDemo implements Runnable {
        static final CountDownLatch end = new CountDownLatch(10);
        static final CountDownLatchDemo demo = new CountDownLatchDemo();
    
        @Override
        public void run() {
    
            try {
                Thread.sleep(new Random().nextInt(3) * 1000);
                System.out.println("check complete");
                end.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String args[]) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 10; i++) {
                executorService.submit(demo);
            }
            //等待检查
            end.await();
            //发射火箭
            System.out.println("Fire!");
            executorService.shutdown();
        }
    }
    

    循环栅栏CyclicBarrier

    通常在私人宅邸的周围就可以围上一圈栅栏,阻止闲杂人等入内
    这里当然就是用来阻止线程继续执行,要求线程在栅栏外等待

    public CyclicBarrier(int parties)
    public CyclicBarrier(int parties, Runnable barrierAction)
    

    parties 是参与线程的个数
    第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务

    只要出现barrier.await();则必须要等到所有线程执行到这个地方才开始往下执行
    public class CyclicBarrierDemo {
        static class TaskThread extends Thread {
    
            CyclicBarrier barrier;
    
            public TaskThread(CyclicBarrier barrier) {
                this.barrier = barrier;
            }
    
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(getName() + " 到达栅栏 A");
                    barrier.await();
                    System.out.println(getName() + " 冲破栅栏 A");
    
                    Thread.sleep(2000);
                    System.out.println(getName() + " 到达栅栏 B");
                    barrier.await();
                    System.out.println(getName() + " 冲破栅栏 B");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            int threadNum = 5;
            CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
    
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 完成最后任务");
                }
            });
    
            for(int i = 0; i < threadNum; i++) {
                new TaskThread(barrier).start();
            }
        }
    }
    

    线程阻塞工具类:LockSupport

    LockSupport是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。

    public class LockSupportDemo {
        public static Object u = new Object();
        static ChangeObjectThread t1 = new ChangeObjectThread("t1");
        static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    
        public static class ChangeObjectThread extends Thread {
            public ChangeObjectThread(String name) {
                super.setName(name);
            }
    
            public void run() {
                synchronized (u) {
                    System.out.println("in " + getName());
                    LockSupport.park();
                }
            }
        }
    
    
        public static void main(String args[]) throws InterruptedException {
            t1.start();
            Thread.sleep(100);
            t2.start();
            LockSupport.unpark(t1);
            LockSupport.unpark(t2);
            t1.join();
            t2.join();
        }
    }
    

    Guava和RateLimiter限流

    线程池

    ①newSingleThreadExecutor
    单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
    ②newFixedThreadExecutor(n)
    固定数量的线程池,没提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行
    ③newCacheThreadExecutor(推荐使用)
    可缓存线程池,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。
    ④newScheduleThreadExecutor
    大小无限制的线程池,支持定时和周期性的执行线程
    阿里巴巴Java开发手册,明确指出不允许使用Executors静态工厂构建线程池
    原因如下:
    线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

    • 线程池常用参数:
      corePoolSize:核心线程数量,会一直存在,除非allowCoreThreadTimeOut设置为true
      maximumPoolSize:线程池允许的最大线程池数量
      keepAliveTime:线程数量超过corePoolSize,空闲线程的最大超时时间
      unit:超时时间的单位
      workQueue:工作队列,保存未执行的Runnable 任务
      threadFactory:创建线程的工厂类
      handler:当线程已满,工作队列也满了的时候,会被调用。被用来实现各种拒绝策略

    队列介绍

    直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,则提交的任务不会被真实地保存,而总是将新任务提交给线程执行,如果没有空闲的进程,则尝试创建新的进程,如果进程数量已经达到最大值,则执行拒绝策略。因此,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略
    有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue类实现。ArrayBlockingQueue类的构造函数必须带一个容量参数,表示该队列的最大容量
    无界的任务队列:无界任务队列可以通过LinkedBlockingQueue类实现
    优先任务队列:它通过PriorityBlockingQueue类实现

    拒绝策略

    AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
    CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降
    DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
    DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,

    自定义拒绝策略
    public class RejectThreadPoolDemo {
        public static class MyTask implements Runnable {
    
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public static void main(String args[]) throws InterruptedException {
            MyTask myTask = new MyTask();
    
            ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), Executors.defaultThreadFactory()
                    , new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    System.out.println(r.toString() + " is discard");
                }
            });
    
            for (int i = 0; i < 100; i++) {
                executorService.submit(myTask);
                Thread.sleep(10);
            }
        }
    }
    
    将submit改成execute,在报错时会将异常打印出来,submit会直接吞掉异常,如果没有加get()方法,但是如果加了get()方法,就会变成同步的了,并不是并发执行
          public static void main(String args[]) {
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
            for (int i = 0; i < 5; i++) {
                //poolExecutor.submit(new DivTask(100, i));//没有报错提示
                poolExecutor.execute(new DivTask(100, i));//有报错提示
            }
        }    
    

    扩展我们的ThreadPoolExecutor线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息

    public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
        
        public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public void execute(Runnable task) {
            super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
        }
    
        private Runnable wrap(final Runnable task, final Exception clientTrace, String name) {
            return new Runnable() {
                @Override
                public void run() {
                    try {
                        task.run();
                    } catch (Exception e) {
                        clientTrace.printStackTrace();
                        throw e;
                    }
                }
            };
        }
    
        private Exception clientTrace() {
            return new Exception("Client stack trace");
        }
    }
    

    submit()和execute()的区别

      (1) 接收的参数不一样

      (2) submit()方法有返回值Future,而execute()方法没有返回值

      (3) submit()方法方便处理Exception异常,意思就是,你在task里会抛出checked或者unchecked exception, 而又希望外面的调用者能够感知这些exception并作出及时的处理,  
    用 submit,通过捕获Future.get抛出的异常

    线程池中,无界队列导致的内存飙升问题:

    Executors中FixedThreadPool使用的是LinkedBlockingQueue队列,近乎于无界,队列大小默认为Integer.MAX_VALUE,几乎可以无限制的放任务到队列中,线程池中数量是固定的,当线程池中线程数量达到corePoolSize,不会再创建新的线程,所有任务都会入队到workQueue中,线程从workQueue中获取任务,但这个队列几乎永远不会满,只要队列不满,就不会再去创建新的线程,就跟maximumPoolSize和keepAliveTime没有关系,此时,如果线程池中的线程处理任务的时间特别长,导致无法处理新的任务,队列中的任务就会不断的积压,这个过程,会导致机器的内存使用不停的飙升,极端情况下会导致JVM OOM,系统就挂了。
    线程池如何调优
    (1)首先,根据不同的需求选择线程池,如果需要单线程顺序执行,使用SingleThreadExecutor,如果已知并发压力,使用FixedThreadPool,固定线程数的大小,执行时间小的任务,可以使用CachedThreadPool,创建可缓存的线程池,可以无限扩大线程池,可以灵活回收空闲线程,最多可容纳几万个线程,线程空余60s会被回收,需要后台执行周期任务的,可以使用ScheduledThreadPool,可以延时启动和定时启动线程池,

    (2)如何确认线程池的最大线程数目,分CPU密集型和IO密集型,如果是CPU密集型或计算密集型,因为CPU的利用率高,核心线程数可设置为n(核数)+1,如果是IO密集型,CPU利用率不高,可多给几个线程数,来进行工作,核心线程数可设置为2n(核数)

    CAS算法的过程是:它包含三个参数CAS(V,E,N),其中V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。

  • 相关阅读:
    搭建vue的开发环境
    笔墨录历程
    LockBit病毒oracle数据库恢复xifenfei
    Exception [type: SIGSEGV, Address not mapped to object] [] [ kgegpa()+36]
    ORA00603 ORA01092 ORA600 kcbzib_kcrsds_1
    frm和ibd文件数据库恢复惜分飞
    校验代码为 6054 坏块故障修复
    pip常用命令
    我是pear。
    Visual Studio 2008 Shell Isolated Mode(独立/隔离模式)
  • 原文地址:https://www.cnblogs.com/Baronboy/p/14298018.html
Copyright © 2011-2022 走看看