zoukankan      html  css  js  c++  java
  • 多线程使用注意

    命名

    来源:https://www.cnblogs.com/guozp/p/10344446.html

    • 我们在创建线程池的时候,一定要给线程池名字,如下这种写法,线程是默认直接生成的:

      public static void main(String[] args) {
              ExecutorService executorService = Executors.newFixedThreadPool(3);
              for (int i = 0; i < 10; i++) {
                  final int finalI = i;
                  executorService.execute(new Runnable() {
                      @Override
                      public void run() {
                          System.out.println(Thread.currentThread().getName() + ":" + finalI);
                      }
                  });
              }		
           }  
      

      最后的输出:

      pool-1-thread-3:2
      pool-1-thread-2:1
      pool-1-thread-3:4
      pool-1-thread-1:3
      pool-1-thread-3:6
      pool-1-thread-2:5
      pool-1-thread-3:8
      pool-1-thread-1:7
      pool-1-thread-2:9
      
    • Executors中有默认的线程工厂的实现:

      static class DefaultThreadFactory implements ThreadFactory {
              private static final AtomicInteger poolNumber = new AtomicInteger(1);
              private final ThreadGroup group;
              private final AtomicInteger threadNumber = new AtomicInteger(1);
              private final String namePrefix;
      
              DefaultThreadFactory() {
                  SecurityManager s = System.getSecurityManager();
                  group = (s != null) ? s.getThreadGroup() :
                                        Thread.currentThread().getThreadGroup();
                  namePrefix = "pool-" +
                                poolNumber.getAndIncrement() +
                               "-thread-";
              }
      
              public Thread newThread(Runnable r) {
                  Thread t = new Thread(group, r,
                                        namePrefix + threadNumber.getAndIncrement(),
                                        0);
                  if (t.isDaemon())
                      t.setDaemon(false);
                  if (t.getPriority() != Thread.NORM_PRIORITY)
                      t.setPriority(Thread.NORM_PRIORITY);
                  return t;
              }
          }
      
     
    - 我们可以改造一下
    
    	```
    	 public class NamedThreadFactory implements ThreadFactory {
    	    private final AtomicInteger threadNumber;
    	    private final String name;
    	    private final boolean isDaemon;
    	
    	    public NamedThreadFactory(String name) {
    	        this(name, false);
    	    }
    	
    	    public NamedThreadFactory(String name, boolean daemon) {
    	        this.threadNumber = new AtomicInteger(1);
    	        this.isDaemon = daemon;
    	        this.name = name + "-thread-pool-";
    	    }
    	
    	    public Thread newThread(Runnable r) {
    	        Thread t = new Thread(r, this.name + this.threadNumber.getAndIncrement());
    	        t.setDaemon(this.isDaemon);
    	        if (t.getPriority() != Thread.NORM_PRIORITY){
    	            t.setPriority(Thread.NORM_PRIORITY);
    	        }
    	        return t;
    	    }
    	}
    	```
    
        那我们看下改造之后的输出结果:
    
    	```
    	有名字的线程池-thread-pool-1:0
    	有名字的线程池-thread-pool-3:2
    	有名字的线程池-thread-pool-1:3
    	有名字的线程池-thread-pool-2:1
    	有名字的线程池-thread-pool-1:5
    	有名字的线程池-thread-pool-1:7
    	有名字的线程池-thread-pool-1:8
    	有名字的线程池-thread-pool-3:4
    	有名字的线程池-thread-pool-1:9
    	有名字的线程池-thread-pool-2:6
    	```
    
       这样的话,当我们应用线上出现问题,需要通过jstack查看线程堆栈的时候,就可以知道是哪些线程出现的问题,否则看到的都是统一的命名方式,看到都是清一色的线程,增加排查问题的难度
    
    #### Thread异常处理
    Java中线程执行的任务接口java.lang.Runnable 要求不抛出Checked异常,
    
    

    public interface Runnable {

    public abstract void run();
    

    }

    
    那么如果 run() 方法中抛出了RuntimeException,将会怎么处理了?
    
    线程出现未捕获异常后,JVM将调用Thread中的dispatchUncaughtException方法把异常传递给线程的未捕获异常处理器
    
    

    private void dispatchUncaughtException(Throwable e) {
    getUncaughtExceptionHandler().uncaughtException(this, e);
    }

    public UncaughtExceptionHandler getUncaughtExceptionHandler() {
    return uncaughtExceptionHandler != null ?
    uncaughtExceptionHandler : group;
    }

    
    Thread中存在两个UncaughtExceptionHandler。一个是静态的defaultUncaughtExceptionHandler,另一个是非静态uncaughtExceptionHandler。
    
    

    // null unless explicitly set
    private volatile UncaughtExceptionHandler uncaughtExceptionHandler;

    // null unless explicitly set
    private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;

    
    - defaultUncaughtExceptionHandler:设置一个静态的默认的UncaughtExceptionHandler。来自所有线程中的Exception在抛出并且未捕获的情况下,都会从此路过。进程fork的时候设置的就是这个静态的defaultUncaughtExceptionHandler,管辖范围为整个进程
    - uncaughtExceptionHandler:为单个线程设置一个属于线程自己的uncaughtExceptionHandler,辖范围比较小。
    
    如果没有设置uncaughtExceptionHandler,将使用线程所在的线程组来处理这个未捕获异常。线程组ThreadGroup实现了UncaughtExceptionHandler,所以可以用来处理未捕获异常。ThreadGroup类定义:
    
    

    private ThreadGroup group;

    class ThreadGroup implements Thread.UncaughtExceptionHandler{}

    
    ThreadGroup实现的uncaughtException如下:
    
    

    public void uncaughtException(Thread t, Throwable e) {
    if (parent != null) {
    parent.uncaughtException(t, e);
    } else {
    Thread.UncaughtExceptionHandler ueh =
    Thread.getDefaultUncaughtExceptionHandler();
    if (ueh != null) {
    ueh.uncaughtException(t, e);
    } else if (!(e instanceof ThreadDeath)) {
    System.err.print("Exception in thread ""
    + t.getName() + "" ");
    e.printStackTrace(System.err);
    }
    }
    }

    默认情况下,线程组处理未捕获异常的逻辑是,首先将异常消息通知给父线程组,然后尝试利用一个默认的defaultUncaughtExceptionHandler来处理异常,如果没有默认的异常处理器则将错误信息输出到System.err。也就是JVM提供给我们设置每个线程的具体的未捕获异常处理器,也提供了设置默认异常处理器的方法,通常java.lang.Thread对象运行设置一个默认的异常处理方法:
    		
    

    public static void setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
    SecurityManager sm = System.getSecurityManager();
    if (sm != null) {
    sm.checkPermission(
    new RuntimePermission("setDefaultUncaughtExceptionHandler")
    );
    }

         defaultUncaughtExceptionHandler = eh;
     }
    
    而这个默认的静态全局的异常捕获方法是直接输出异常堆栈。
    当然,我们可以覆盖此默认实现,只需要实现java.lang.Thread.UncaughtExceptionHandler接口即可
    
    

    public interface UncaughtExceptionHandler {

    void uncaughtException(Thread t, Throwable e);
    

    }

    
    
    #### submit异常吞并
    - 我们平时都是通过submit来提交一个Callable,那如果提交的是Runnable呢,为方便起见我们核心的代码都放在一起了
    
    

    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
    }

    protected RunnableFuture newTaskFor(Runnable runnable, T value) {
    return new FutureTask(runnable, value);
    }

    public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; // ensure visibility of callable
    }
    public static Callable callable(Runnable task, T result) {
    if (task == null)
    throw new NullPointerException();
    return new RunnableAdapter(task, result);
    }
    //最终FutureTask中的callable指向的是一个RunnableAdapter,而RunnableAdapter的call方法也是调用了我们传进来的task的run方法,返回的是null
    static final class RunnableAdapter implements Callable {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
    }
    public T call() {
    task.run();
    return result;
    }
    }

    
    - 那从这里我们就知道,我们通过submit传递进去的Runnale,最后在FutureTask的run方法里面调用的callable.call()实质上还是我们传递进去的runnable的run方法,在源码FutureTask的run方法的时候发现,FutureTask中执行任务如果出现异常,是不会抛出来的,必须通过get方法才可以获取到,当然也可以重写afterExecute()这个回调方法,在这个里面来调用get获取异常信息,
    **还是要重点强调下,我们在通过submit执行任务的时候,一定要调用get()方法**
    - 这里我们重写afterExecute()方法,来获取submit(Runnable task)的执行异常:
    
    
    

    protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    //执行的Callable,对应的t一定是Null
    if (t == null && r instanceof Future) {
    try {
    Future future = (Future) r;
    if (future.isDone()){
    // 判断任务是否执行完成
    future.get();
    }
    } catch (CancellationException ce) {
    t = ce;
    } catch (ExecutionException ee) {
    t = ee.getCause();
    } catch (InterruptedException ie) {
    Thread.currentThread().interrupt();
    }
    }
    }

    
    #### CountDownLatch 丢失事件
    - 我们在处理一批任务的时候,往往会把任务进行partition,然后再交给每个线程去处理,那主线程需要等待所有的线程处理完,来统计本次处理的时间,以及其他统计的数据,差不多和下面这段代码类似:
    
    

    public void execute3(){
    List data = new ArrayList(100);
    for (int i = 0; i < 100; i++) {
    data.add(i + 10);
    }

    List<List<Integer>> partition = Lists.partition(data, 20);
    final CountDownLatch countDownLatch = new CountDownLatch(partition.size());
    for (final List<Integer> dataToHandle : partition) {
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    for (Integer i : dataToHandle) {
                        doSomeThing(i);
                    }
                }catch (Exception e){
                   logger.error(e.getMessage(), e);
                }finally {  
                    countDownLatch.countDown();
                }
            }
        });
    }
    
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        logger.error(e.getMessage(), e);
    }
    
    logger.info("任务执行结束...");
    

    }

    
    - 之前这么写代码没有出现过问题,直到最近出现问题才发现这么写会导致主线程无法结束的问题。我们看下,虽然在每个任务的finally中进行处理
    - countDownLatch.countDown();但是有一点忽视了,我们在异常那块其实有提到过,如果线程池满了,抛出RejectExecuteException的话,那这次任务的countDownLatch就会被忽视,当然我们这是在主线程里执行,直接会抛出异常导致主线程结束,但是如果和上面提到的在单独的子线程里面去执行这个线程池,那这样的话由于主线程无法捕获到子线程的异常,就会出现主线程无法结束的问题,所以我们在子线程中执行线程池一定要避免这点 即如果在子线程中执行,需要改为下面这样:
    
    

    public void execute3(){
    List data = new ArrayList(100);
    for (int i = 0; i < 100; i++) {
    data.add(i + 10);
    }

    final List<List<Integer>> partition = Lists.partition(data, 20);
    final CountDownLatch countDownLatch = new CountDownLatch(partition.size());
    new Thread(new Runnable() {
        @Override
        public void run() {
            for (final List<Integer> dataToHandle : partition) {
                try {
                    threadPoolExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try{
                                for (Integer i : dataToHandle) {
                                    doSomeThing(i);
                                }
                            }catch (Exception e){
                                logger.error(e.getMessage(), e);
                            }finally {
    
                                countDownLatch.countDown();
                            }
                        }
                    });
                } catch (RejectedExecutionException e) {
                    logger.error(e.getMessage(), e);
                    //处理完异常之后需要补充countDownLatch事件
                    countDownLatch.countDown();
                }
            }
    
        }
    }).start();
    
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        logger.error(e.getMessage(), e);
    }
    
    logger.info("任务执行结束...");
    

    }

    
    来源:https://www.cnblogs.com/guozp/p/10344446.html
  • 相关阅读:
    var、let、const之间的区别
    es5和es6的区别
    [2020CCPC威海G] Caesar Cipher
    [CF1437E] Make It Increasing
    [CF1437C] Chef Monocarp
    [CF1436D] Bandit in a City
    [CF1418D] Trash Problem
    [CF1419E] Decryption
    [CF1420C2] Pokémon Army (hard version)
    [CF1424M] Ancient Language
  • 原文地址:https://www.cnblogs.com/guozp/p/10344446.html
Copyright © 2011-2022 走看看