zoukankan      html  css  js  c++  java
  • 并发编程

    1.并发工具的使用及原理【上】

    时长:57min

    计划:2020

    1.并发工具的使用及原理【下】

    时长:1h37min

    计划:4/10/2020 12:48 ---13:20

    3.1.11.CountDownLatch

    6.1.测试代码

    package com.alipay.dsg.web;

    import java.util.concurrent.CountDownLatch;

    /**
     * @ClassName CountDownLatchDemo
     * @Description
    它的作用:等待1个或多个线程执行完以后,才进行后续的事情,可以使用它
     
    * @Author wf
     * @Date 2020/4/9 12:49
     * @Version 1.0
     */
    public class CountDownLatchDemo extends Thread{
        static CountDownLatch countDownLatch = new CountDownLatch(1);
    //    public static void main(String[] args) throws InterruptedException {
    //        CountDownLatch countDownLatch = new CountDownLatch(3);
    //        new Thread(()->{
    //            System.out.println("Thread1");
    //            countDownLatch.countDown();//
    减减操作 3-1 =2
    //        }).start();
    //        new Thread(()->{
    //            System.out.println("Thread2");
    //            countDownLatch.countDown();//
    减减操作 2-1=1
    //        }).start();
    //        new Thread(()->{
    //            System.out.println("Thread3");
    //            countDownLatch.countDown();//
    减减操作 1-1=0
    //            //
    当减到0,释放main线程
    //        }).start();
    //
    //        countDownLatch.await(); //
    阻塞main线程
    //    }
       
    public static void main(String[] args) {
            //阻塞1000个线程
           
    for(int i =0; i<1000;i++){
                new CountDownLatchDemo().start();
            }
            countDownLatch.countDown();
        }

        @Override
        public void run() {
            try {
                countDownLatch.await();//阻塞
           
    } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        }
    }

    使用场景:如使用缓存,初始化加载缓存,当所有缓存预热完成,才进行后续缓存操作。

             计数器。

    6.1.1.实现原理

    countdownLatch.await,countdown()

    内部使用共享锁。

    1.await方法

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    2.AQS.acquireSharedInterruptibly

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    A.NonFairSync.tryAcquireShared

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }

    3.AQS.doAcquireSharedInterruptibly

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//尝试获取锁
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                       
    failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//阻塞
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    3.1.16.并发编程-线程池原理【上】

    时长:1h6min

    学习内容:

      》线程池的基本使用

      》线程池的实现原理分析

      》线程池的使用注意事项

      》Callable/Future使用及原理分析

    8.1.线程池

    需求:  

      我们可以参照,数据库的连接池,进行理解。

      可以理解为一个缓冲区,对象缓存到那里,可以进行获取,使用完后又归还到缓冲区。

    它的核心概念:

      》复用已有资源

      》控制资源总数

    Thread线程理解:

      它是一个异步处理的api.当我们要使用线程时,使用方式如下:

    new Thread().start();

      这里存在的问题,是对象的创建,如果需要创建大量的对象,通过new的方式,

      对资源消耗较大,造成线程资源【线程数】不可控。当线程数超过对应的资源的限制,就会导致大量的上下文切换

    比如,8核心的cpu,同一时刻只能运行8个线程,如果创建80个线程,会导致线程不时地上下文切换,反而降低程序执行性能。

      还有一个问题,可能需要频繁创建和销毁线程对象,浪费资源,影响性能。

      针对,线程存在两个问题:

      》线程资源【线程数】不可控

      》需要频繁创建和销毁线程对象

      

      基于数据库连接池的设计,提出线程"池化"的改进。即产生线程池技术

    8.1.1.线程池的优势

      》限流----线程数量可控---通过参数设置,允许创建的最大线程数

      》 降低频繁创建和销毁线程对象的开销

      》对于任务的响应速度更快----复用已有线程

    8.1.2.java中提供的线程池

    8.1.2.1.创建线程池的工厂类Executors

    常用如下:

    Executors.newSingleThreadExecutor()//只有一个核心线程对象的线程池
    Executors.newFixedThreadPool(3);  //固定线程数的线程池
    Executors.newCachedThreadPool//可以实现动态调整线程数的线程池,可以无限创建线程对象,每一空闲线程,会在60s之后回收
    Executors.newScheduledThreadPool(3)//处理定时任务
    Executors.newWorkStealingPool();//fork/join线程池

    区分不同场景,应该使用哪一种池?

     1.newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    2.newSingleThreadExecutor()

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    3.newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    A.ThreadPoolExecutor

    keepAliveTim是怎么监控线程,进行回收的?

    public ThreadPoolExecutor(int corePoolSize,//核心线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//超时时间
                              TimeUnit unit,    //超时时间单位
                              BlockingQueue<Runnable> workQueue,//阻塞队列
                              ThreadFactory threadFactory,//线程工厂
                              RejectedExecutionHandler handler) {//拒绝策略
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    8.1.3.线程池的实现原理

    8.1.3.1.使用示例如下

    package com.wf.concurrent;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @ClassName ThreadPoolDemo
     * @Description TODO
     * @Author wf
     * @Date 2020/4/28 12:22
     * @Version 1.0
     */
    public class ThreadPoolDemo implements Runnable{
        static ExecutorService service = Executors.newFixedThreadPool(3);
        public static void main(String[] args) {
            for(int i=0; i<100; i++){
                service.execute(new ThreadPoolDemo());
            }
            service.shutdown();
            //正常情况下,循环100次,会创建100个线程对象,可以发现这里只有3个线程实例
            //一次性,只有3个线程进行任务处理,当处理完成后,线程实例,又归还到池中
            //这里有97个任务,未执行,会先放置到任务队列里面
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前线程的名称:"+Thread.currentThread().getName());
        }
    }

    8.1.3.2.原理分析

    分析入口:execute方法,如下所示:

    1.ThreadPoolExecutor#execute方法
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
    int c = ctl.get();//默认线程池中线程数量为0,11100000000000000000000000000000
        if (workerCountOf(c) < corePoolSize) {//0小于核心数3【传参】
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//32位
    
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    private static final int RUNNING    = -1 << COUNT_BITS;
    
    private static final int COUNT_BITS = Integer.SIZE - 3;//29
    
    public static final int SIZE = 32;

    计算:

    Rs=-1<<(32-3)=-1<<29= 11100000000000000000000000000000

    wc = 0

     

    Rs | wc = 11100000000000000000000000000000

     ctl表示含义

    高3位代表当前线程状态,低29位代表当前线程池的线程数量。

     

    默认情况:线程状态为Running,线程数量为0.

     privatestaticint workerCountOf(int c) { return c & CAPACITY; }

    计算:

    C=11100000000000000000000000000000

    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    capacity= 1<<29 -1 =00100000000000000000000000000000 -1= 00011111111111111111111111111111

    C & capacity=11100000000000000000000000000000 & 00011111111111111111111111111111=00000000000000000000000000000000=0

     A.方法逻辑结构图

     

       【1】addWorker方法

     privateboolean addWorker(Runnable firstTask, boolean core) {

        retry:
        for (;;) { //自旋,增加工作线程数
            int c = ctl.get();//11100000000000000000000000000000
            int rs = runStateOf(c);    //取得运行状态,running=c
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&      //rs>=0
                ! (rs == SHUTDOWN &&
                   firstTask == null && //提交任务为null
                   ! workQueue.isEmpty()))     //工作队列非空
                return false;
    
            for (;;) {//内层自旋
                int wc = workerCountOf(c);//获取工作线程数,首次为0
                if (wc >= CAPACITY ||   //数据数超限
                    wc >= (core ? corePoolSize : maximumPoolSize))//core传参true/false
                    return false;
                if (compareAndIncrementWorkerCount(c))//线程数加1
                    break retry;//跳出方法第一行 
                c = ctl.get();  // Re-read ctl 第二次进入,数量为1
                if (runStateOf(c) != rs)       //非运行状态,下一次迭代
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//把任务放到Worker中
            final Thread t = w.thread; //Worker构造器中new Thread,状态为-1
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;//重入锁
                mainLock.lock();//加锁
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN || //rs<0,表示running状态
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size(); //一个Worker对应一个线程
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;    //Worker保存成功标识
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();   //启动线程,执行runWorker方法
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//添加失败,创建线程失败,回滚
        }
        return workerStarted;
    }

            

      private static int runStateOf(int c) { return c & ~CAPACITY; }

    计算:

    C=11100000000000000000000000000000

    ~capacity=~00011111111111111111111111111111=11100000000000000000000000000000

    C & ~capacity=11100000000000000000000000000000

     private static final int SHUTDOWN = 0 << COUNT_BITS;

    计算:

    0<<29=0

     privateboolean compareAndIncrementWorkerCount(int expect) {

       return ctl.compareAndSet(expect, expect + 1);

    } //原子操作cas,加1

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);  //移除worker
            decrementWorkerCount();//数量减1
            tryTerminate();     //尝试终止
        } finally {
            mainLock.unlock();
        }
    }
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    2.Worker类:
    private final class Worker
        extends AbstractQueuedSynchronizer    //继承AQS,为何
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {//获得锁,状态改为1
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);//释放锁,状态改为0
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    为什么Worker要自己去实现AQS,而不使用重入锁?

     

    在new Worker时,会设置state默认为-1.

           当要释放锁时,状态改为state=0.

           而获得锁时,状态改为state=1.

    因为Worker实现Runnable接口,是一个线程任务。所以当,thread执行run方法时,实际上会执行Worker的run方法,内部调用runWorker方法。

    A.runworker方法:
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//取得第一个任务
        w.firstTask = null;
        w.unlock();//释放锁,允许中断,调用aqs.release(1),设置状态为1
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();//获得锁,防止在shutdown时不终止正在执行的任务。
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//核心逻辑,task是通过execute传递过来的任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

    注意

        wc.lock()加锁,一方面是为了支持并发。另一方面,是为了在shutdown时不去终止正在运行的线程。

        我们在使用线程池时,一次执行多个任务,等所有的任务执行完成后,才进行中断线程。

    即执行service.shutdown().

        可以由于任务时间较长,某几个任务还在执行,程序就已经执行到shutdown.

        如果没有wc.lock的作用,就会终止正在执行或还未执行的任务中断掉。这是不合理的。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);//移除已经执行的任务
        } finally {
            mainLock.unlock();
        }
    
        tryTerminate();
    
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    B.shutdown方法
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//关闭线程安全检查
        advanceRunState(SHUTDOWN);//更新线程状态为shutdown
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;//更新线程状态为shutdown
    
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {//尝试获得锁
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    销毁线程,有两种方式:

    shutdownNow()-------强制关闭。

    shutdown()----友善关闭。

    3.1.17.线程池的实现原理【下】

    时长:1h10min

    计划:2020/4/28 22:42-23:50

    8.2.线程池原理分析【二】

    8.2.1.提交任务创建线程执行任务

    如果有:corePoolSize=3,maxPoolSize=5.【线程池刚启动,还未被预热】

    就会创建:3个核心线程,2个最大线程0

    创建线程,首先会创建Worker实例。

    创建完成之后,会立马执行我们传入的任务。

    如果已经预热。工作线程数小于核心线程数,这个条件不能再满足。这时就需要,把

    任务把放到阻塞队列中,然后去从队列中取任务。然后执行。

    8.2.1.1.runWorker再分析
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//核心逻辑
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
     
    1.从阻塞队列获取任务
      while (task != null || (task = getTask()) != null) {//很关键

        可以获得当前传入的任务,也可以从阻塞队列中去获取。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            //状态不为running,或队列为空,均结束方法
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//线程数减1
                return null;    //null会终止runWorker循环,回收线程
            }
    
            int wc = workerCountOf(c);
    
            //允许回收核心线程标识【可以进行传参设置】,或工作线程大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {//核心逻辑
                Runnable r = timed ?//timed为true,poll超时处理
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();//不允许超时,take获取
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    2.线程什么时候被回收?

        在Thread的应用中,当run方法执行结束后,线程会自动销毁。

    而这里run方法,内部调用runWorker()方法。

        runWorker方法,内部有一个while循环,需要让这个循环结束,才能结束方法。

    8.3.线程池的参数

    8.3.1.相关问题思考

    1.不建议使用Executors.newXXX创建线程池,是为什么?【阿里开发手册中说明】

    原因:

        这种方式,使用默认构造,进行创建。它的参数也是使用默认值,这种不清楚设置参数的含义下,使用这种方式,是存在风险的。

        因为线程的使用,会影响cpu的资源及性能的。

    2.线程池大小的如何设置【常见面试题】

        取决于硬件环境和软件环境。

        硬件环境:主要cpu核心数

        软件环境:线程的执行情况---

    io密集型【线程用于io远程通信,设置更多线程数】,可以设置为cpu核心的2倍。

    cpu密集型【线程主要用于计算,执行要快,要求cpu利用率高,以cpu核心数为准】,设置最大线程数=cpu核心数+1

        在并发编程,书中给出公式:

        (线程等待时间 + 线程cpu时间)/cpu时间* cpu核心数。

    3.线程池的初始化

        线程池是可以进行预热的。使用方式如下:

    4.线程池的关闭

    有两种方式:

    service.shutdown();
    service.shutdownNow();//立马终止

    8.4.Callable/FutureTask原理

    8.4.1.submit与execute的区别

    submit提交任务,可以有两种类型:

    Callable类型和Runnable类型。

    submit可以实现一个带返回值的线程。

    对于异常的处理,execute会抛出异常,而execute不会抛出异常,但会在get时拿到异常。

    8.4.1.1.使用示例
     
    package com.wf.concurrent;
    
    import javax.sound.midi.Soundbank;
    import java.util.concurrent.*;
    
    /**
     * 类名称:FutureDemo
     * 类描述:TODO
     * 创建人:Administrator
     * 创建时间:2020/4/29 0:09
     * Version 1.0
     */
    public class FutureDemo implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("execute call");
            Thread.sleep(5000);
            return "Hello call";
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureDemo futureDemo = new FutureDemo();
            FutureTask futureTask = new FutureTask(futureDemo);
            new Thread(futureTask).start();
            System.out.println(futureTask.get());//阻塞获取结果
            
            //线程池的实现方式
            ExecutorService  executorService = Executors.newFixedThreadPool(3);
            Future<String> futureRes = executorService.submit(new FutureDemo());
            System.out.println(futureRes.get());
            
    
            executorService.shutdown();
    
        }
    
    }
    8.4.2.1.猜想实现原理

    1.阻塞:LockSupport.unpark

     2.使用状态机制【这里使用这种方式】

    8.4.2.2.源码分析

    1.FutureTask类图

    2.FutureTask#run方法

    FutureTask是一个Runnable实现类,它一定存在run方法,代码如下:

    public void run() {
        if (state != NEW ||//设置失败
            !UNSAFE.compareAndSwapObject(this, runnerOffset, 
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;    //通过构造器传过来的task
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();    //调用call
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);    //设置call方法返回结果
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));//runner
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    3.java.util.concurrent.FutureTask#get()
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)    //当前任务未完成,
            s = awaitDone(false, 0L);//阻塞
        return report(s);
    }
    
    private int awaitDone(boolean timed, long nanos)//传参false,0L
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {//自旋
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)    //第一次进入
                q = new WaitNode();    //创建阻塞队列
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {//等待超时时间处理
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);//阻塞
        }
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;    //取得返回值
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
  • 相关阅读:
    递归函数及Java范例
    笔记本的硬盘坏了
    “References to generic type List should be parameterized”
    配置管理软件(configuration management software)介绍
    WinCE文件目录定制及内存调整
    使用Silverlight for Embedded开发绚丽的界面(3)
    wince国际化语言支持
    Eclipse IDE for Java EE Developers 与Eclipse Classic 区别
    WinCE Heartbeat Message的实现
    使用Silverlight for Embedded开发绚丽的界面(2)
  • 原文地址:https://www.cnblogs.com/wfdespace/p/12659104.html
Copyright © 2011-2022 走看看