zoukankan      html  css  js  c++  java
  • 多线程源代码学习笔记

    多线程源代码分析

    1. Thread

    Every thread has a name for identification purposes

    一个Thread类的实例代表一个线程,包含了线程的启动,执行,销毁,参数等一系列参数和方法

    JVM可以再运行时调用这个实例的方法

    Thread 继承自Runnable借口, 而Runable接口有一个抽象方法run

    public interface Runnable {
       /**
        * When an object implementing interface <code>Runnable</code> is used
        * to create a thread, starting the thread causes the object's
        * <code>run</code> method to be called in that separately executing
        * thread.
        * <p>
        * The general contract of the method <code>run</code> is that it may
        * take any action whatsoever.
        *
        * @see     java.lang.Thread#run()
        */
       public abstract void run();
    }

    Thread类中的run方法实现

    @Override
    public void run() {
       if (target != null) {
           target.run();
      }
    }
    /* What will be run. */
    private Runnable target;

    这里看到Thread的run方法其实就是调用一个实现了Runnable接口的target的run方法

    而在Thread类的构造函数中调用了init实现初始化

    public Thread(Runnable target) {
       init(null, target, "Thread-" + nextThreadNum(), 0);
    }
    private void init(ThreadGroup g, Runnable target, String name,
                     long stackSize, AccessControlContext acc,
                     boolean inheritThreadLocals) {
      ...
       this.target = target;
      ...
    }

    在init初始化的时候, 将字段target初始化

    所以我们知道, 自定义自己的线程的两种方法是

    1. 写继承Thread类, 重写它的run方法, 这样我们就不用考虑target的问题

    2. 写一个实现Runnable接口的类, 实现run方法,在new Thread()对象的时候把这个类作为参数传进去作为线程对象的target字段,以后这个线程对象执行run方法的时候就可以调用这个target的run方法。

    public synchronized void start() {
       /**
        * This method is not invoked for the main method thread or "system"
        * group threads created/set up by the VM. Any new functionality added
        * to this method in the future may have to also be added to the VM.
        *
        * A zero status value corresponds to state "NEW".
        */
       if (threadStatus != 0)
           throw new IllegalThreadStateException();

       /* Notify the group that this thread is about to be started
        * so that it can be added to the group's list of threads
        * and the group's unstarted count can be decremented. */
       group.add(this);

       boolean started = false;
       try {
           start0();
           started = true;
      } finally {
           try {
               if (!started) {
                   group.threadStartFailed(this);
              }
          } catch (Throwable ignore) {
               /* do nothing. If start0 threw a Throwable then
                 it will be passed up the call stack */
          }
      }
    }

    这里start 方法是void返回类型, 并且调用start0

    private native void start0();

    然后start0是一个native方法,它会开辟线程并调用线程的run方法,如果我们自己在main线程中调用run方法, 是不能实现多线程的

     

    2. Runnable

    class PrimeRun implements Runnable {
            long minPrime;
            PrimeRun(long minPrime) {
                this.minPrime = minPrime;
            }

            public void run() {
                // compute primes larger than minPrime
                . . .
            }
        }
    PrimeRun p = new PrimeRun(143);
        new Thread(p).start();

    jdk里有一句话

    In most cases, the Runnable interface should be used if you are only planning to override the run() method and no other Thread methods.

    意思是只想重写run方法的话可以用Runnable, 但是如果想重写其他的Thread类的方法的话, 就需要继承Thread

     

    3. ThreadFactory

    An object that creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread, enabling applications to use special thread subclasses, priorities, etc.

    例子

     class SimpleThreadFactory implements ThreadFactory {
      public Thread newThread(Runnable r) {
        return new Thread(r);
      }
    }

    创建线程的时候可以省去new Thread()

     

    4. Callable

    使用继承Thread类和实现Runnable接口有一个问题: 不能得到返回值

    JDK1.5引入了一种新的方式 Callable 和 Future

    public interface Callable<V> {
       /**
        * Computes a result, or throws an exception if unable to do so.
        *
        * @return computed result
        * @throws Exception if unable to compute a result
        */
       V call() throws Exception;
    }

    The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

    意思是Callable可以有返回或者返回异常

    5.Future(Interface)

    A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.

    和Callable相关的一个类是Future类,意思是Future类表示异步的Runnable或者Callable执行结果,它有很多方法可以去对执行做操作,比如检查是否执行成功,取回执行结果等。其实和Future类与Callable的关系和Thread与Runnable的关系很像,一个关心运行时的各种参数,一个关心具体执行的操作。

    Future是一个对异步执行的任务的跟踪类

    The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready.

    只可以用get方法来取回执行结果,如果没执行完,就block

    Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled.

    可以用cancel方法来取消执行,

    public interface Future<V> {
       boolean cancel(boolean mayInterruptIfRunning);

       boolean isCancelled();
       
       boolean isDone();
       
       V get() throws InterruptedException, ExecutionException;
       
       V get(long timeout, TimeUnit unit)
           throws InterruptedException, ExecutionException, TimeoutException;
    }

    可以看到Future类里的几个常见的方法

    6.RunnableFuture(Interface)

    /**
    * A {@link Future} that is {@link Runnable}. Successful execution of
    * the {@code run} method causes completion of the {@code Future}
    * and allows access to its results.
    * @see FutureTask
    * @see Executor
    * @since 1.6
    * @author Doug Lea
    * @param <V> The result type returned by this Future's {@code get} method
    */
    public interface RunnableFuture<V> extends Runnable, Future<V> {
       /**
        * Sets this Future to the result of its computation
        * unless it has been cancelled.
        */
       void run();
    }

    jdk的解释是

    A Future that is Runnable.Successful execution of the run method causes completion of the Future and allows access to its results.

    RunnableFuture是一个可以执行的Future,再Future的基础上扩展出run方法,并提供访问执行结果的入口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

     

    7.FutureTask(Class)

    FutureTask 是RunnableFuture的实现类,既可以作为runnable被线程执行,又可以作为Future得到task的返回值

    事实上,FutureTask是Future接口的一个唯一实现类。(摘自其他博客,暂时没有吃透)

    首先是两个构造方法

    public FutureTask(Callable<V> callable) {
       if (callable == null)
           throw new NullPointerException();
       this.callable = callable;
       this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
       this.callable = Executors.callable(runnable, result);
       this.state = NEW;       // ensure visibility of callable
    }

    再看实现Future的方法

    get 获取结果

    public V get() throws InterruptedException, ExecutionException {
       int s = state;
       if (s <= COMPLETING)
           s = awaitDone(false, 0L);
       return report(s);
    }

    awaitDone return state upon completion 返回状态值

    /**
    * Returns result or throws exception for completed task.
    *
    * @param s completed state value
    */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
       Object x = outcome;
       if (s == NORMAL)
           return (V)x;
       if (s >= CANCELLED)
           throw new CancellationException();
       throw new ExecutionException((Throwable)x);
    }

    可以看到返回的是outcome属性

    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes

     

    FutureTask继承Runnable, 实现了run方法

    public void run() {
       if (state != NEW ||
           !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                        null, Thread.currentThread()))
           return;
       try {
           Callable<V> c = callable;
           if (c != null && state == NEW) {
               V result;
               boolean ran;
               try {
                   result = c.call();
                   ran = true;
              } catch (Throwable ex) {
                   result = null;
                   ran = false;
                   setException(ex);
              }
               if (ran)
                   set(result);
          }
      } finally {
           // runner must be non-null until state is settled to
           // prevent concurrent calls to run()
           runner = null;
           // state must be re-read after nulling runner to prevent
           // leaked interrupts
           int s = state;
           if (s >= INTERRUPTING)
               handlePossibleCancellationInterrupt(s);
      }
    }

    可以看到

    if (ran)
     set(result);

    这个语句是在try,catch之后的,也就是说等调用Callable任务的call方法之后再执行

    protected void set(V v) {
       if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
           outcome = v;
           UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
           finishCompletion();
      }
    }

    在这段代码的第三行,把result传给了FutureTask类的outcome属性,也就是说调用完call方法之后的泛型返回值再赋值给outcome,然后在get方法中,可以得到outcome的值

     

    8.ExecutorService 和 Executor(Interface)

    Callable一般情况下是配合ExecutorService来使用的

    An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

    ExecutorService是一个提供了方法去管理执行结果和产生Future类结果来跟踪一个或多个异步任务的 Executor

    它继承自Executor,所以先来看看什么是Executor

    An object that executes submitted Runnable tasks.

    Executor的对象可以执行实现了Runnable接口的任务

    This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc.

    这个接口可以分离任务的提交和执行计划,这个好理解,实现了Runnable接口的run方法就是定义执行的计划,调用Executor来执行就是提交任务.

    An Executor is normally used instead of explicitly creating threads. For example, rather than invoking new Thread(new(RunnableTask())).start() for each of a set of tasks, you might use:

    以前把实现了Runnable接口的对象作为参数传到Thread类中,然后用Thread的start方法调用,但Executor可以不用显式的创造线程,可以用这种方法

    Executor executor = anExecutor;
    executor.execute(new RunnableTask1());
    executor.execute(new RunnableTask2());
    public interface Executor {

       /**
        * Executes the given command at some time in the future. The command
        * may execute in a new thread, in a pooled thread, or in the calling
        * thread, at the discretion of the {@code Executor} implementation.
        *
        * @param command the runnable task
        * @throws RejectedExecutionException if this task cannot be
        * accepted for execution
        * @throws NullPointerException if command is null
        */
       void execute(Runnable command);
    }

    这个注释上说可以用execute方法来执行这个Runnable的task, 但是如何执行,是在Thread里面,在线程池等,则需要实现这个方法,可以看到jdk给的例子:

    class DirectExecutor implements Executor {
        public void execute(Runnable r) {
            r.run();
        }
    }
    class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) {
            new Thread(r).start();
        }
    }

    The Executor implementations provided in this package implement ExecutorService, which is a more extensive interface. The ThreadPoolExecutor class provides an extensible thread pool implementation. The Executors class provides convenient factory methods for these Executors.

     

    再回到ExecutorService, 它是Exector接口的一个继承接口, 声明了很多关于任务的结束,提交等的方法

    public interface ExecutorService extends Executor {

       void shutdown();

       List<Runnable> shutdownNow();

       boolean isShutdown();

       boolean isTerminated();

       boolean awaitTermination(long timeout, TimeUnit unit)
           throws InterruptedException;

       
       <T> Future<T> submit(Callable<T> task);

       
       <T> Future<T> submit(Runnable task, T result);

       
       Future<?> submit(Runnable task);

     
       <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
           throws InterruptedException;

       
       <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
           throws InterruptedException;

       <T> T invokeAny(Collection<? extends Callable<T>> tasks)
           throws InterruptedException, ExecutionException;

       <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
           throws InterruptedException, ExecutionException, TimeoutException;
    }

    submit方法用来方法提交task

    invoke方法用来执行task

    一般使用

        <T> Future<T> submit(Callable<T> task);

    Future<?> submit(Runnable task);

     

    submit 方法接受参数Callable或者Runnable的对象,返回Future类型

    因为ExecutorService是接口,我们看一下实现类

    Executors.newCachedThreadPool();

    Executors.newfixedthreadpool();

    其实现类要实现Executor的execute方法和ExecutorService的shutdown,submit,invoke等方法

     

    9.Executors

    Factory and utility methods for Executor, ExecutorService and so on and Callable classes defined in this package.

    Executors是一个工具类和一个工厂

    以newFixedThreadPool为例

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>(),
                                     threadFactory);
    }

    返回类型是ExecutorService实际new了一个ThreadPoolExecutor

    public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory) {
       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
    }

    image-20210613202954129

    果不其然ThreadPoolExecutor类继承了ExecutorService类, 所以我们在ThreadPoolExecutor里找一下Executor和ExecutorService两个接口的实现方法

    首先是Executor的方法execute

    void execute(Runnable command);
    public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       /*
        * Proceed in 3 steps:
        *
        * 1. If fewer than corePoolSize threads are running, try to
        * start a new thread with the given command as its first
        * task. The call to addWorker atomically checks runState and
        * workerCount, and so prevents false alarms that would add
        * threads when it shouldn't, by returning false.
        *
        * 2. If a task can be successfully queued, then we still need
        * to double-check whether we should have added a thread
        * (because existing ones died since last checking) or that
        * the pool shut down since entry into this method. So we
        * recheck state and if necessary roll back the enqueuing if
        * stopped, or start a new thread if there are none.
        *
        * 3. If we cannot queue task, then we try to add a new
        * thread. If it fails, we know we are shut down or saturated
        * and so reject the task.
        */
       int c = ctl.get();
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
      }
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           if (! isRunning(recheck) && remove(command))
               reject(command);
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
      }
       else if (!addWorker(command, false))
           reject(command);
    }
    * The main pool control state, ctl, is an atomic integer packing
     workerCount, indicating the effective number of threads

    ctl 表示活跃的线程

    1.如果线程小于corePoolSize, 则开辟线程执行

    2.如果入queue等待

    3.如果不能入queue,reject

    然后再看看submit的实现方法

    查看源码发现ExecutorService的submit方法是在AbstractExecutorService中实现的

    public Future<?> submit(Runnable task) {
       if (task == null) throw new NullPointerException();
       RunnableFuture<Void> ftask = newTaskFor(task, null);
       execute(ftask);
       return ftask;
    }

     

    public <T> Future<T> submit(Runnable task, T result) {
       if (task == null) throw new NullPointerException();
       RunnableFuture<T> ftask = newTaskFor(task, result);
       execute(ftask);
       return ftask;
    }

     

    public <T> Future<T> submit(Callable<T> task) {
       if (task == null) throw new NullPointerException();
       RunnableFuture<T> ftask = newTaskFor(task);
       execute(ftask);
       return ftask;
    }

    可以看到在submit中调用了一个newTaskFor方法

     

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
       return new FutureTask<T>(callable);
    }

    可以看到根据FutureTask的构造方法new了FutureTask的对象

    然后因为FutureTask继承了Runnable和Future,它重写了Runnable的run方法,在run方法中调用task的call方法,然后把call方法的返回值保存到outcome里,之后就可以用get方法来返回结果了。

    public Future<?> submit(Runnable task) {
       if (task == null) throw new NullPointerException();
       RunnableFuture<Void> ftask = newTaskFor(task, null);
       execute(ftask);
       return ftask;
    }

    再将这个FutureTask方法返回, 所以实现了ExectorService的submit方法,即提交一个任务task,并返回一个Future类来绑定这个task的结果等。

    再来看ExecutorService的执行方法invoke是如何被实现的, 在AbstractExecutorService中有:

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
       throws InterruptedException, ExecutionException {
       try {
           return doInvokeAny(tasks, false, 0);
      } catch (TimeoutException cannotHappen) {
           assert false;
           return null;
      }
    }
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                             boolean timed, long nanos)
       throws InterruptedException, ExecutionException, TimeoutException {
       if (tasks == null)
           throw new NullPointerException();
       int ntasks = tasks.size();
       if (ntasks == 0)
           throw new IllegalArgumentException();
       ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
       ExecutorCompletionService<T> ecs =
           new ExecutorCompletionService<T>(this);

       // For efficiency, especially in executors with limited
       // parallelism, check to see if previously submitted tasks are
       // done before submitting more of them. This interleaving
       // plus the exception mechanics account for messiness of main
       // loop.

       try {
           // Record exceptions so that if we fail to obtain any
           // result, we can throw the last exception we got.
           ExecutionException ee = null;
           final long deadline = timed ? System.nanoTime() + nanos : 0L;
           Iterator<? extends Callable<T>> it = tasks.iterator();

           // Start one task for sure; the rest incrementally
           futures.add(ecs.submit(it.next()));
           --ntasks;
           int active = 1;

           for (;;) {
               Future<T> f = ecs.poll();
               if (f == null) {
                   if (ntasks > 0) {
                       --ntasks;
                       futures.add(ecs.submit(it.next()));
                       ++active;
                  }
                   else if (active == 0)
                       break;
                   else if (timed) {
                       f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                       if (f == null)
                           throw new TimeoutException();
                       nanos = deadline - System.nanoTime();
                  }
                   else
                       f = ecs.take();
              }
               if (f != null) {
                   --active;
                   try {
                       return f.get();
                  } catch (ExecutionException eex) {
                       ee = eex;
                  } catch (RuntimeException rex) {
                       ee = new ExecutionException(rex);
                  }
              }
          }

           if (ee == null)
               ee = new ExecutionException();
           throw ee;

      } finally {
           for (int i = 0, size = futures.size(); i < size; i++)
               futures.get(i).cancel(true);
      }
    }

    其实看着这么长 重要的就一句, 执行Callable对象,并将返回的Future放入结果中

    futures.add(ecs.submit(it.next()));
    ExecutorCompletionService<T> ecs =
       new ExecutorCompletionService<T>(this);

     

    A CompletionService that uses a supplied Executor to execute tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. The class is lightweight enough to be suitable for transient use when processing groups of tasks.

    这是CompletionService的submit, 可以看到同样将Callable接口转成FutureTask类,然后执行然后返回这个FutureTask

    public Future<V> submit(Callable<V> task) {
       if (task == null) throw new NullPointerException();
       RunnableFuture<V> f = newTaskFor(task);
       executor.execute(new QueueingFuture(f));
       return f;
    }

    关于Callable如何被调用可以看下这篇博客https://www.cnblogs.com/hapjin/p/11407011.html

    总结

    1. Runnable接口

    package com.wzh;

    import java.util.concurrent.*;

    public class ThreadPoolDemo {
       public static void main(String[] args) {
           ExecutorService service1 = Executors.newCachedThreadPool();
           ExecutorService service2 = Executors.newFixedThreadPool(10);
           ExecutorService service3 = Executors.newSingleThreadExecutor();

           ThreadPoolExecutor t = new ThreadPoolExecutor(10, 20, 10,
                   TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
           service1.execute(new MyTask(1));
      }
    }

    class MyTask implements Runnable {
       int i;
       public MyTask(int i) {
           this.i = i;
      }

       @Override
       public void run() {
           System.out.println(Thread.currentThread().getName() + "---> " + i);
      }
    }

    image-20210613202954129image-20210613212107538

    线程池其实就是一个实现了ExecutorService接口的类, ExecutorService接口又对Executor进行了扩展,通过Executors里面的方法生成ExecutorService的实现类ThreadPoolExecutor。

    ThreadPoolExecutor实现类实现了Executor里的execute(Runnable command) 方法,ExecutorService里的submit方法。

    对于实现了Runnable接口的task,当我们使用线程池的时候,直接调用execute就可以了

     

    1. Callable

      public class Test {
       public static void main(String[] args) {
         ExecutorService executor = Executors.newCachedThreadPool();
         Task task = new Task();
         Future<Integer> result = executor.submit(task);
         executor.shutdown();
         
         try{
           Thread.sleep(1000);
        } catch (InterruptedException e1) {
           e1.printStackTrace();
        }
         
         System.out.println(``"主线程在执行任务");
         
         try{
           System.out.println(``"task运行结果"``+result.get());
        } catch (InterruptedException e) {
           e.printStackTrace();
        } catch(ExecutionException e) {
           e.printStackTrace();
        }
         
         System.out.println(``"所有任务执行完毕"``);
      }
      }
      class Task implements Callable<Integer>{
       @Override
       public` `Integer call() throws Exception {
         System.out.println("子线程在进行计算");
         Thread.sleep(3000);
         int sum = 0;
         for (int i=0;i<100;i++)
           sum += i;
         return sum;
      }
      }

    对于Callable接口,用submit来执行。ExecutorService里的submit方法返回一个Future类型。

    RunnableFuture<T> ftask = newTaskFor(task);

    Callable在submit里面被包装成了一个RunnableFuture, FutureTask是RunnableFuture的继承

    ExecutorService executor = Executors.newCachedThreadPool();
       Task task = new Task();
       FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
       executor.submit(futureTask);
       executor.shutdown();

    一种方式是把Callable接口的task直接submit,返回Future

    另一种方式是把这个task包装成FutureTask,再submit,这样的话返回的submit其实也就是返回的就是FutureTask本身,因此可以直接用这个FutureTask对象来get结果

     

    Future接受一个实现Callable的task,然后在callable执行完之后把返回值保存到Future类的outcome

     

    ===================================

    所以:

    对于Runnable, ExecutorService用Executor的execute来执行

    对于Callable, ExecutorService用submit来接受task,并且把它包装成RunnableFuture类,相当于绑定返回结果到Future,并且把task转换成了Runnable接口的task, 然后execute执行

  • 相关阅读:
    超简单的网页选项卡---jQuery
    【java+selenium3】Actions模拟鼠标 (十一)
    【java+selenium3】JavaScript的调用执行 (十)
    【java+selenium3】时间控件 (九)
    【java+selenium3】select 下拉选 (八)
    【java+selenium3】隐式等待+显式等待 (七)
    【java+selenium3】线程休眠方法 (六)
    【java+selenium3】模态框处理(五)
    【java+selenium3】多窗口window切换及句柄handle获取(四)
    【java+selenium3】特殊元素iframe的定位及详解(三)
  • 原文地址:https://www.cnblogs.com/wangzhihang/p/14881767.html
Copyright © 2011-2022 走看看