我们的项目用到了ThreadGroup 把thread放到了threadGroup中,名称统一起来了;
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(16, 32, 5L, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000), new ThreadFactory() { private final ThreadGroup threadGroup = new ThreadGroup("fileTemplateMethodThreadGroup"); private final AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(threadGroup, r, "fileTemplateMethod-thread-pool-" + threadNumber.getAndIncrement()); } }, (r, executor) -> { if (!executor.isShutdown()) { /* 丢弃队列最老的数据 */ if (executor.getQueue().poll() != null) { Cat.logMetricForCount(CatConstant.METRIC_DISCARD_FILE_TASK_COUNT); } executor.execute(r); } });
ThreadGroup 可以把thread的名字统一起来。一起处理catch。
ThreadGroup是Java提供的一种对线程进行分组管理的手段,可以对所有线程以组为单位进行操作,如设置优先级、守护线程等。
线程组也有父子的概念,如下图:
线程组的创建
public class ThreadGroupCreator { public static void main(String[] args) { //获取当前线程的group ThreadGroup currentGroup = Thread.currentThread().getThreadGroup(); //在当前线程执行流中新建一个Group1 ThreadGroup group1 = new ThreadGroup("Group1"); //Group1的父线程,就是main线程所在Group System.out.println(group1.getParent() == currentGroup); //定义Group2, 指定group1为其父线程 ThreadGroup group2 = new ThreadGroup(group1, "Group2"); System.out.println(group2.getParent() == group1); } }
ThreadGroup是位于java.lang包下的一个类,用于统一的线程管理.一旦一个线程加入到一个线程组后,就不能更换线程所在的线程组
将当前线程加入到线程组中
public class ThreadGroupCreator { public static void main(String[] args) { //获取当前线程的group ThreadGroup currentGroup = Thread.currentThread().getThreadGroup(); //在当前线程执行流中新建一个Group1 ThreadGroup group1 = new ThreadGroup("Group1"); //Group1的父线程,就是main线程所在Group System.out.println(group1.getParent() == currentGroup); //定义Group2, 指定group1为其父线程 ThreadGroup group2 = new ThreadGroup(group1, "Group2"); System.out.println(group2.getParent() == group1); } }
将ThreadGroup中活跃的线程引用复制到线程组
Thread[] threads = new Thread[num]; threadGroup.enumerate(threads); for (Thread t : threads) { System.out.println("线程名-" + t.getName()); }
测试源代码如下
public class MyThread implements Runnable { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " -> start"); TimeUnit.SECONDS.sleep(10); //随机发生异常 if (ThreadLocalRandom.current().nextInt(10) > 5) { throw new RuntimeException(Thread.currentThread().getName() + "发生异常"); } System.out.println(Thread.currentThread().getName() + " -> end"); } catch (InterruptedException e) { e.printStackTrace(); } }
public class ThreadGroupTest { public static void main(String[] args) { int num = 10; ThreadGroup threadGroup = new ThreadGroup("test-group") { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("ThreadGroup捕获到线程异常 - " + e.getMessage()); } }; List<Thread> threadList = new ArrayList<>(); for (int i = 0; i < num; i++) { Thread thread = new Thread(threadGroup, new MyThread(), "threadname-" + i); threadList.add(thread); } System.out.println("运行前线程组中活跃线程数 -> " + threadGroup.activeCount()); System.out.println("开始运行所有线程..."); for (Thread t : threadList) { t.start(); } //获取线程组中所有[活动]线程 Thread[] threads = new Thread[num]; threadGroup.enumerate(threads); for (Thread t : threads) { System.out.println("线程名-" + t.getName()); } System.out.println("所有线程运行后,线程组中活跃线程数-" + threadGroup.activeCount()); //不断的查看线程组中活跃的线程数 Thread thread = new Thread(() -> { int num1; try { while ((num1 = threadGroup.activeCount()) > 0) { System.out.println("当前线程组活跃线程数为 -> " + num1); TimeUnit.SECONDS.sleep(1); } System.out.println("All Thread HAS FINISHED"); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } }
运行结果如下
运行前线程组中活跃线程数 -> 0
开始运行所有线程...
threadname-0 -> start threadname-1 -> start threadname-2 -> start threadname-3 -> start threadname-4 -> start threadname-5 -> start threadname-6 -> start threadname-7 -> start threadname-8 -> start 线程名-threadname-0 threadname-9 -> start 线程名-threadname-1 线程名-threadname-2 线程名-threadname-3 线程名-threadname-4 线程名-threadname-5 线程名-threadname-6 线程名-threadname-7 线程名-threadname-8 线程名-threadname-9 所有线程运行后,线程组中活跃线程数-10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 当前线程组活跃线程数为 -> 10 threadname-5 -> end threadname-8 -> end ThreadGroup捕获到线程异常 - threadname-7发生异常 ThreadGroup捕获到线程异常 - threadname-2发生异常 threadname-4 -> end ThreadGroup捕获到线程异常 - threadname-3发生异常 ThreadGroup捕获到线程异常 - threadname-9发生异常 ThreadGroup捕获到线程异常 - threadname-1发生异常 threadname-6 -> end threadname-0 -> end All Thread HAS FINISHED ---------------------
线程组的基本操作
注意:后添加进线程组的线程,其优先级不能大于线程组的优先级
public class ThreadGroupBasic { public static void main(String[] args) throws InterruptedException { ThreadGroup group = new ThreadGroup("group1"); Thread thread = new Thread(group, () -> { while(true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "thread"); thread.setDaemon(true); thread.start(); TimeUnit.MILLISECONDS.sleep(1); ThreadGroup mainGroup = Thread.currentThread().getThreadGroup(); //递归获取mainGroup中活跃线程的估计值 System.out.println("activeCount = " + mainGroup.activeCount()); //递归获mainGroup中的活跃子group System.out.println("activeGroupCount = " + mainGroup.activeGroupCount()); //获取group的优先级, 默认为10 System.out.println("getMaxPriority = " + mainGroup.getMaxPriority()); //获取group的名字 System.out.println("getName = " + mainGroup.getName()); //获取group的父group, 如不存在则返回null System.out.println("getParent = " + mainGroup.getParent()); //活跃线程信息全部输出到控制台 mainGroup.list(); System.out.println("----------------------------"); //判断当前group是不是给定group的父线程, 如果两者一样,也会返回true System.out.println("parentOf = " + mainGroup.parentOf(group)); System.out.println("parentOf = " + mainGroup.parentOf(mainGroup)); } }
线程组的Interrupt
public class ThreadGroupInterrupt { public static void main(String[] args) throws InterruptedException { ThreadGroup group = new ThreadGroup("TestGroup"); new Thread(group, () -> { while(true) { try { TimeUnit.MILLISECONDS.sleep(2); } catch (InterruptedException e) { //received interrupt signal and clear quickly System.out.println(Thread.currentThread().isInterrupted()); break; } } System.out.println("t1 will exit"); }, "t1").start(); new Thread(group, () -> { while(true) { try { TimeUnit.MILLISECONDS.sleep(2); } catch (InterruptedException e) { //received interrupt signal and clear quickly System.out.println(Thread.currentThread().isInterrupted()); break; } } System.out.println("t2 will exit"); }, "t2").start(); //make sure all threads start TimeUnit.MILLISECONDS.sleep(2); group.interrupt(); } }
线程组的destroy
public class ThreadGroupDestroy { public static void main(String[] args) { ThreadGroup group = new ThreadGroup("TestGroup"); ThreadGroup mainGroup = Thread.currentThread().getThreadGroup(); //before destroy System.out.println("group.isDestroyed=" + group.isDestroyed()); mainGroup.list(); group.destroy(); //after destroy System.out.println("group.isDestroyed=" + group.isDestroyed()); mainGroup.list(); } }
线程组设置守护线程组
线程组设置为守护线程组,并不会影响其线程是否为守护线程,仅仅表示当它内部没有active的线程的时候,会自动destroy
public class ThreadGroupDaemon { public static void main(String[] args) throws InterruptedException { ThreadGroup group1 = new ThreadGroup("group1"); new Thread(group1, () -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }, "group1-thread1").start(); ThreadGroup group2 = new ThreadGroup("group2"); new Thread(group2, () -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }, "group1-thread2").start(); group2.setDaemon(true); TimeUnit.SECONDS.sleep(3); System.out.println(group1.isDestroyed()); System.out.println(group2.isDestroyed()); } }