灵感来源于一个猪队友给我的题目
看到这个,我抓住的关键字是:任何子任务失败,要通知所有子任务执行取消逻辑。
这不就是消息广播吗?观察者模式!
干活
首先是收听者
package com.example.broadcast; /** * 每个节点即是广播者,也是收听者 */ public interface Listener { /** * 设置调度中心 */ void setCenter(DispatchCenter center); /** * 主动通知其它收听者 */ void notice(String msg); /** * 自己收到通知的处理逻辑 * @param msg */ void whenReceived(String msg); /** * 收听者标志:唯一 * @return */ String identify(); }
然后是调度中心
package com.example.broadcast; /** * 调度中心 */ public interface DispatchCenter { /** * 广播 * @param own 广播的时候,要排除自己 * @param msg 广播消息 */ void broadcast(String own, String msg); /** * 添加收听者 * @param listener */ void addListener(Listener listener); }
调度中心实现
package com.example.broadcast; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class DispatchCenterImpl implements DispatchCenter { private static final Map<String, Listener> MAP = new ConcurrentHashMap<>(); @Override public void broadcast(String own, String msg) { MAP.forEach((k,v) -> { // 不用给自己发通知 if (!k.equals(own)){ v.whenReceived(msg); } }); } @Override public void addListener(Listener listener) { listener.setCenter(this); MAP.put(listener.identify(), listener); } }
剩下三个收听者
package com.example.broadcast; import java.util.UUID; public class ListenerA implements Listener { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } }
B和C除了类名不一样,其他都一样,不再赘述。目录如下
测试
package com.example.broadcast; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { DispatchCenter center = new DispatchCenterImpl(); ListenerA listenerA = new ListenerA(); ListenerB listenerB = new ListenerB(); ListenerC listenerC = new ListenerC(); center.addListener(listenerA); center.addListener(listenerB); center.addListener(listenerC); ExecutorService executorService = Executors.newFixedThreadPool(3); // A触发1条事件 executorService.submit(() -> { int i = 1; while (i > 0){ listenerA.notice(listenerA.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); i--; } }); // B触发2条事件 executorService.submit(() -> { int i = 2; while (i > 0){ listenerB.notice(listenerB.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); i--; } }); // C触发3条事件 executorService.submit(() -> { int i = 3; while (i > 0){ listenerC.notice(listenerC.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); i--; } }); executorService.shutdown(); } }
输出:
流程图
当其中的B节点,发生了错误,除了把自己处理好之外
1. 向调度中心发送广播请求,并携带需要的消息
2. 调度中心遍历收听者,挨个通知(执行)每一个收听者接受消息的逻辑
关于停止任务
因为题目要求,【快速取消】所有子任务
关于线程停止的方法也有很多:
1. 优雅退出run方法
2. 暴力stop
3. run方法抛出异常
如果说要求,A异常了,B和C收到消息之后,线程立即停止,不能有一点迟疑,说实话我还没想到该怎么做。因为你要知道,实际上的任务的run方法内部,不太可能是个while循环,人家可能就是个顺序执行,所以停止标志位的方式,并不适用。
我先写个按照标志位停止的“玩具”。
修改三个收听者代码和测试类
package com.example.broadcast; import lombok.SneakyThrows; import java.util.Random; import java.util.UUID; public class ListenerA implements Listener,Runnable { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { // 5秒之后,模拟发生异常 Thread.sleep(5000); notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); System.out.println(this.getClass().getName() + "程序异常,并已经传播了消息..."); } }
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerB implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean stopFlag = false; public ListenerB() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 停止当前线程 stopFlag = true; } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { while (!stopFlag){ Thread.sleep(1000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务"); } System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B Dead"); } }
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerC implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean stopFlag = false; public ListenerC() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 停止当前线程 stopFlag = true; } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { while (!stopFlag){ Thread.sleep(1000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务"); } System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C Dead"); } }
测试
package com.example.broadcast; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { DispatchCenter center = new DispatchCenterImpl(); ListenerA listenerA = new ListenerA(); ListenerB listenerB = new ListenerB(); ListenerC listenerC = new ListenerC(); center.addListener(listenerA); center.addListener(listenerB); center.addListener(listenerC); ExecutorService executorService = Executors.newFixedThreadPool(3); // A executorService.submit(listenerA); // B executorService.submit(listenerB); // C executorService.submit(listenerC); executorService.shutdown(); } }
再想一想
这个问题想想并不简单:
1.这不是单一线程处理异常的情况(如果只是单一线程,自己的异常自己捕获并处理即可)
2. A出现了异常,B收到了取消任务通知,问题在于, 1)不知道B目前执行到哪里了,没办法让B停下手中的工作。2)如果杀死B线程,那么执行一半的任务,会不会导致什么程序异常,或者脏数据之类的?
3. 稳妥一点的方法就是,A出现了异常,B收到了通知之后,照常执行任务,只是当收到了异常通知的时候,会在正常逻辑的后面调用一个任务回退方法;而所有任务正常工作,则不会调用这个回退方法。
4. 这个思路让我想到了分布式事务,是不是有内味了?
改动收听者接口,增加回滚方法
package com.example.broadcast; /** * 每个节点即是广播者,也是收听者 */ public interface Listener { /** * 设置调度中心 */ void setCenter(DispatchCenter center); /** * 主动通知其它收听者 */ void notice(String msg); /** * 自己收到通知的处理逻辑 * @param msg */ void whenReceived(String msg); /** * 收听者标志:唯一 * @return */ String identify(); /** * 发生异常时,任务回退方法 */ void rollback(); }
A
package com.example.broadcast; import lombok.SneakyThrows; import java.util.Random; import java.util.UUID; public class ListenerA implements Listener,Runnable { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } @Override public void rollback() { System.out.println(this.getClass().getName() + "任务回退!!!"); } @SneakyThrows @Override public void run() { // 5秒之后,模拟发生异常 Thread.sleep(5000); notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); // A异常,回滚 rollback(); } }
B
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerB implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean rollbackFlag = false; public ListenerB() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 任务需要回滚 rollbackFlag = true; } @Override public String identify() { return identify; } @Override public void rollback() { System.out.println(this.getClass().getName() + "任务回退!!!"); } @SneakyThrows @Override public void run() { // 模拟任务耗时,执行6秒的任务 for (int i = 0; i < 3; i++){ Thread.sleep(2000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务"); } if (rollbackFlag){ rollback(); } } }
C
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerC implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean rollbackFlag = false; public ListenerC() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 任务需要回滚 rollbackFlag = true; } @Override public String identify() { return identify; } @Override public void rollback() { System.out.println(this.getClass().getName() + "任务回退!!!"); } @SneakyThrows @Override public void run() { // 模拟任务耗时,执行9秒的任务 for (int i = 0; i < 3; i++){ Thread.sleep(3000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务"); } if (rollbackFlag){ rollback(); } } }
测试Main不变,执行