zoukankan      html  css  js  c++  java
  • java使用默认线程池踩过的坑(二)

    云智慧(北京)科技有限公司  陈鑫

     

    是的,一个线程不能够启动两次。那么它是怎么判断的呢?

    public synchronized void start() {

            /**

             * A zero status valuecorresponds to state "NEW".    0对应的是state NEW

             */

            if (threadStatus!= 0)    //如果不是NEW state,就直接抛出异常!

                throw newIllegalThreadStateException();

            group.add(this);

            boolean started = false;

            try {

                start0();    // 启动线程的native方法

                started = true;

            } finally {

                try {

                   if (!started) {

                       group.threadStartFailed(this);

                    }

                } catch(Throwable ignore) {

                }

            }

        }

    恩,只有是NEW状态才能够调用native方法启动一个线程。好吧,到这里了,就普及也自补一下jvm里的线程状态:

    所有的线程状态::

    l  NEW —— 还没有启动过

    l  RUNNABLE  —— 正在jvm上运行着

    l  BLOCKED  —— 正在等待锁/信号量被释放

    l  WAITING  —— 等待其他某个线程的某个特定动作

    l  TIMED_WAITING —— A thread that iswaiting for another thread to perform an action for up to a specified waitingtime is in this state.

    l  TERMINATED —— 退出,停止

    线程在某个时间点上只可能存在一种状态,这些状态是jvm里的,并不反映操作系统线程的状态。查一下Thread的API,没有对其状态进行修改的API。那么这条路是不通的吗?

    仔细考虑一下……

    如果把任务做成Runnable实现类,然后在把这个实现类丢进线程池调度器之前,利用此Runnable构造一个Thread,是不是这个Thread对象就能够控制这个runnable对象,进而控制在线程池中运行着的task了呢?非也!让我们看看Thread和ThreadPoolExecutor对Runnable的处理吧。

     Thread

        /* What will berun. */

    private Runnabletarget;

    结合上面的start()方法,很容易猜出,start0()会把target弄成一个线程来进行运行。

     ThreadPoolExecutor

    public void execute(Runnable command){

            if (command== null)

                thrownew NullPointerException();

            int c =ctl.get();

            if(workerCountOf(c) < corePoolSize) {

                if (addWorker(command, true))

                   return;

                c =ctl.get();

            }

            if(isRunning(c) && workQueue.offer(command)) {

                intrecheck = ctl.get();

                if (!isRunning(recheck) && remove(command))

                   reject(command);

                else if(workerCountOf(recheck) == 0)

                   addWorker(null, false);

            }

            else if (!addWorker(command, false))

               reject(command);

    }

     

    private boolean addWorker(RunnablefirstTask, boolean core) {

            …

            booleanworkerStarted = false;

            booleanworkerAdded = false;

            Worker w =null;

            try {

                finalReentrantLock mainLock = this.mainLock;

                w = newWorker(firstTask);

                finalThread t = w.thread;

                if (t!= null) {

                   mainLock.lock();

                    try{

                       int c = ctl.get();

                       int rs = runStateOf(c);

     

                       if (rs < SHUTDOWN ||

                           (rs == SHUTDOWN && firstTask == null)) {

                           if (t.isAlive()) // precheck that t is startable

                                throw newIllegalThreadStateException();

    workers.add(w);

                           int s = workers.size();

                           if (s > largestPoolSize)

                                largestPoolSize =s;

                           workerAdded = true;

                       }

                    }finally {

                       mainLock.unlock();

                    }

                    if(workerAdded) {

    t.start();

                       workerStarted = true;

                    }

                }

            } finally {

                if (!workerStarted)

                   addWorkerFailed(w);

            }

            return workerStarted;

        }

    那么Worker又是怎样的呢?

     Worker

    private final class Worker

            extendsAbstractQueuedSynchronizer

            implementsRunnable

        {

            finalThread thread;

            RunnablefirstTask;

            volatilelong completedTasks;

           Worker(Runnable firstTask) {

               setState(-1); //调用runWorker之前不可以interrupt

               this.firstTask = firstTask;

               this.thread = getThreadFactory().newThread(this);

            }

            public voidrun() {

               runWorker(this);

            }

               ……   

               …….

            voidinterruptIfStarted() {

                Threadt;

                if(getState() >= 0 && (t = thread) != null &&!t.isInterrupted()) {

                    try{

    t.interrupt();

                    }catch (SecurityException ignore) {

                    }

                }

            }

        }

    可见worker里既包装了Runnable对象——task,又包装了一个Thread对象——以自己作为初始化参数,因为worker也是Runnable对象。然后对外提供了运行与停止接口,run()和interruptIfStarted()。回顾上面使用Thread的例子不禁有了新的领悟,我们把一个Thread对象交给ThreadPoolExecutor执行后,实际的调用是对Thread(FileTask())对象,我们暂时称之为workerWrapper。那么我们在池外进行FileTask.interrupt()操作影响的是FileTask对象,而不是workerWrapper。所以可能上面对于start()方法二次调用不是特别适当。更恰当的应该是在fileTask.interrupt()的时候就跑出异常,因为从来没有对fileTask对象执行过start()方法,这时候去interrupt就会出现错误。具体如下图:

                                

    分析到此,我们已经明确除了调用ThreadPoolExecutor了的interruptWorkers()方法别无其他途径操作这些worker了。

    private void interruptWorkers() {

            finalReentrantLock mainLock = this.mainLock;

           mainLock.lock();

            try {

                for(Worker w : workers)

    w.interruptIfStarted();

            } finally {

               mainLock.unlock();

            }

    }

  • 相关阅读:
    X 如何理解关系型数据库的常见设计范式?
    X 使用DMV,诊断和调优DB性能。
    X MSSQL-并发控制-2-Isolation msql 的各种隔离级别 sqlserver
    X SQL Server AG集群启动不起来的临时自救大招
    X 搭建非域AlwaysOn win2016+SQL2016
    X 从0开始搭建SQL Server AlwaysOn 第四篇(配置异地机房节点)
    X 从0开始搭建SQL Server AlwaysOn 第三篇(配置AlwaysOn)
    X 从0开始搭建SQL Server AlwaysOn 第二篇(配置故障转移集群)
    Python Cookie Session和分页
    Python django应用之corsheaders[跨域设置]
  • 原文地址:https://www.cnblogs.com/amy26/p/4629613.html
Copyright © 2011-2022 走看看