zoukankan      html  css  js  c++  java
  • Java设计模式——观察者模式的灵活应用

    灵感来源于一个猪队友给我的题目

     看到这个,我抓住的关键字是:任何子任务失败,要通知所有子任务执行取消逻辑。

     这不就是消息广播吗?观察者模式!

    干活

    首先是收听者

    package com.example.broadcast;
    
    /**
     * 每个节点即是广播者,也是收听者
     */
    public interface Listener {
    
        /**
         * 设置调度中心
         */
        void setCenter(DispatchCenter center);
    
        /**
         * 主动通知其它收听者
         */
        void notice(String msg);
    
        /**
         * 自己收到通知的处理逻辑
         * @param msg
         */
        void whenReceived(String msg);
    
        /**
         * 收听者标志:唯一
         * @return
         */
        String identify();
    
    }

    然后是调度中心

    package com.example.broadcast;
    
    /**
     * 调度中心
     */
    public interface DispatchCenter {
    
        /**
         * 广播
         * @param own 广播的时候,要排除自己
         * @param msg 广播消息
         */
        void broadcast(String own, String msg);
    
        /**
         * 添加收听者
         * @param listener
         */
        void addListener(Listener listener);
    
    }

    调度中心实现

    package com.example.broadcast;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    public class DispatchCenterImpl implements DispatchCenter {
    
        private static final Map<String, Listener> MAP = new ConcurrentHashMap<>();
    
        @Override
        public void broadcast(String own, String msg) {
            MAP.forEach((k,v) -> {
                // 不用给自己发通知
                if (!k.equals(own)){
                    v.whenReceived(msg);
                }
            });
        }
    
        @Override
        public void addListener(Listener listener) {
            listener.setCenter(this);
            MAP.put(listener.identify(), listener);
        }
    }

    剩下三个收听者

    package com.example.broadcast;
    
    import java.util.UUID;
    
    public class ListenerA implements Listener {
    
        private DispatchCenter center;
        private String identify;
    
        public ListenerA() {
            identify = UUID.randomUUID().toString();
        }
    
        @Override
        public void setCenter(DispatchCenter center) {
            this.center = center;
        }
    
        @Override
        public void notice(String msg) {
            center.broadcast(identify, msg);
        }
    
        @Override
        public void whenReceived(String msg) {
            System.out.println(this.getClass().getName() + "收到消息:" + msg);
        }
    
        @Override
        public String identify() {
            return identify;
        }
    }

    B和C除了类名不一样,其他都一样,不再赘述。目录如下

    测试

    package com.example.broadcast;
    
    
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
    
    
        public static void main(String[] args) {
            DispatchCenter center = new DispatchCenterImpl();
            ListenerA listenerA = new ListenerA();
            ListenerB listenerB = new ListenerB();
            ListenerC listenerC = new ListenerC();
            center.addListener(listenerA);
            center.addListener(listenerB);
            center.addListener(listenerC);
    
            ExecutorService executorService = Executors.newFixedThreadPool(3);
    
            // A触发1条事件
            executorService.submit(() -> {
                int i = 1;
                while (i > 0){
                    listenerA.notice(listenerA.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                    i--;
                }
            });
            // B触发2条事件
            executorService.submit(() -> {
                int i = 2;
                while (i > 0){
                    listenerB.notice(listenerB.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                    i--;
                }
            });
            // C触发3条事件
            executorService.submit(() -> {
                int i = 3;
                while (i > 0){
                    listenerC.notice(listenerC.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                    i--;
                }
            });
    
            executorService.shutdown();
    
        }
    
    
    }

    输出:

     流程图

     当其中的B节点,发生了错误,除了把自己处理好之外

    1. 向调度中心发送广播请求,并携带需要的消息

    2. 调度中心遍历收听者,挨个通知(执行)每一个收听者接受消息的逻辑

    关于停止任务

    因为题目要求,【快速取消】所有子任务

    关于线程停止的方法也有很多:

    1. 优雅退出run方法

    2. 暴力stop

    3. run方法抛出异常

    如果说要求,A异常了,B和C收到消息之后,线程立即停止,不能有一点迟疑,说实话我还没想到该怎么做。因为你要知道,实际上的任务的run方法内部,不太可能是个while循环,人家可能就是个顺序执行,所以停止标志位的方式,并不适用。

    我先写个按照标志位停止的“玩具”。

    修改三个收听者代码和测试类

    package com.example.broadcast;
    
    import lombok.SneakyThrows;
    
    import java.util.Random;
    import java.util.UUID;
    
    public class ListenerA implements Listener,Runnable {
    
        private DispatchCenter center;
        private String identify;
    
        public ListenerA() {
            identify = UUID.randomUUID().toString();
        }
    
        @Override
        public void setCenter(DispatchCenter center) {
            this.center = center;
        }
    
        @Override
        public void notice(String msg) {
            center.broadcast(identify, msg);
        }
    
        @Override
        public void whenReceived(String msg) {
            System.out.println(this.getClass().getName() + "收到消息:" + msg);
        }
    
        @Override
        public String identify() {
            return identify;
        }
    
        @SneakyThrows
        @Override
        public void run() {
            // 5秒之后,模拟发生异常
            Thread.sleep(5000);
            notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
            System.out.println(this.getClass().getName() + "程序异常,并已经传播了消息...");
        }
    }
    package com.example.broadcast;
    
    import lombok.SneakyThrows;
    
    import java.util.UUID;
    
    public class ListenerB implements Listener,Runnable {
    
        private DispatchCenter center;
        private String identify;
        private volatile Boolean stopFlag = false;
    
        public ListenerB() {
            identify = UUID.randomUUID().toString();
        }
    
        @Override
        public void setCenter(DispatchCenter center) {
            this.center = center;
        }
    
        @Override
        public void notice(String msg) {
            center.broadcast(identify, msg);
        }
    
        @Override
        public void whenReceived(String msg) {
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
            // 停止当前线程
            stopFlag = true;
        }
    
        @Override
        public String identify() {
            return identify;
        }
    
        @SneakyThrows
        @Override
        public void run() {
            while (!stopFlag){
                Thread.sleep(1000);
                System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务");
            }
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B Dead");
        }
    }
    package com.example.broadcast;
    
    import lombok.SneakyThrows;
    
    import java.util.UUID;
    
    public class ListenerC implements Listener,Runnable {
    
        private DispatchCenter center;
        private String identify;
        private volatile Boolean stopFlag = false;
    
        public ListenerC() {
            identify = UUID.randomUUID().toString();
        }
    
        @Override
        public void setCenter(DispatchCenter center) {
            this.center = center;
        }
    
        @Override
        public void notice(String msg) {
            center.broadcast(identify, msg);
        }
    
        @Override
        public void whenReceived(String msg) {
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
            // 停止当前线程
            stopFlag = true;
        }
    
        @Override
        public String identify() {
            return identify;
        }
    
        @SneakyThrows
        @Override
        public void run() {
            while (!stopFlag){
                Thread.sleep(1000);
                System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务");
            }
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C Dead");
        }
    }

    测试

    package com.example.broadcast;
    
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
    
    
        public static void main(String[] args) {
            DispatchCenter center = new DispatchCenterImpl();
            ListenerA listenerA = new ListenerA();
            ListenerB listenerB = new ListenerB();
            ListenerC listenerC = new ListenerC();
            center.addListener(listenerA);
            center.addListener(listenerB);
            center.addListener(listenerC);
    
            ExecutorService executorService = Executors.newFixedThreadPool(3);
    
            // A
            executorService.submit(listenerA);
            // B
            executorService.submit(listenerB);
            // C
            executorService.submit(listenerC);
    
            executorService.shutdown();
    
        }
    
    
    }

    再想一想

    这个问题想想并不简单:

    1.这不是单一线程处理异常的情况(如果只是单一线程,自己的异常自己捕获并处理即可)

    2. A出现了异常,B收到了取消任务通知,问题在于, 1)不知道B目前执行到哪里了,没办法让B停下手中的工作。2)如果杀死B线程,那么执行一半的任务,会不会导致什么程序异常,或者脏数据之类的?

    3. 稳妥一点的方法就是,A出现了异常,B收到了通知之后,照常执行任务,只是当收到了异常通知的时候,会在正常逻辑的后面调用一个任务回退方法;而所有任务正常工作,则不会调用这个回退方法。

    4. 这个思路让我想到了分布式事务,是不是有内味了?

    改动收听者接口,增加回滚方法

    package com.example.broadcast;
    
    /**
     * 每个节点即是广播者,也是收听者
     */
    public interface Listener {
    
        /**
         * 设置调度中心
         */
        void setCenter(DispatchCenter center);
    
        /**
         * 主动通知其它收听者
         */
        void notice(String msg);
    
        /**
         * 自己收到通知的处理逻辑
         * @param msg
         */
        void whenReceived(String msg);
    
        /**
         * 收听者标志:唯一
         * @return
         */
        String identify();
    
        /**
         * 发生异常时,任务回退方法
         */
        void rollback();
    
    }

    A

    package com.example.broadcast;
    
    import lombok.SneakyThrows;
    
    import java.util.Random;
    import java.util.UUID;
    
    public class ListenerA implements Listener,Runnable {
    
        private DispatchCenter center;
        private String identify;
    
        public ListenerA() {
            identify = UUID.randomUUID().toString();
        }
    
        @Override
        public void setCenter(DispatchCenter center) {
            this.center = center;
        }
    
        @Override
        public void notice(String msg) {
            center.broadcast(identify, msg);
        }
    
        @Override
        public void whenReceived(String msg) {
            System.out.println(this.getClass().getName() + "收到消息:" + msg);
        }
    
        @Override
        public String identify() {
            return identify;
        }
    
        @Override
        public void rollback() {
            System.out.println(this.getClass().getName() + "任务回退!!!");
        }
    
        @SneakyThrows
        @Override
        public void run() {
            // 5秒之后,模拟发生异常
            Thread.sleep(5000);
            notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
            // A异常,回滚
            rollback();
        }
    }

    B

    package com.example.broadcast;
    
    import lombok.SneakyThrows;
    
    import java.util.UUID;
    
    public class ListenerB implements Listener,Runnable {
    
        private DispatchCenter center;
        private String identify;
        private volatile Boolean rollbackFlag = false;
    
        public ListenerB() {
            identify = UUID.randomUUID().toString();
        }
    
        @Override
        public void setCenter(DispatchCenter center) {
            this.center = center;
        }
    
        @Override
        public void notice(String msg) {
            center.broadcast(identify, msg);
        }
    
        @Override
        public void whenReceived(String msg) {
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
            // 任务需要回滚
            rollbackFlag = true;
        }
    
        @Override
        public String identify() {
            return identify;
        }
    
        @Override
        public void rollback() {
            System.out.println(this.getClass().getName() + "任务回退!!!");
        }
    
        @SneakyThrows
        @Override
        public void run() {
            // 模拟任务耗时,执行6秒的任务
            for (int i = 0; i < 3; i++){
                Thread.sleep(2000);
                System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务");
            }
            if (rollbackFlag){
                rollback();
            }
        }
    }

    C

    package com.example.broadcast;
    
    import lombok.SneakyThrows;
    
    import java.util.UUID;
    
    public class ListenerC implements Listener,Runnable {
    
        private DispatchCenter center;
        private String identify;
        private volatile Boolean rollbackFlag = false;
    
        public ListenerC() {
            identify = UUID.randomUUID().toString();
        }
    
        @Override
        public void setCenter(DispatchCenter center) {
            this.center = center;
        }
    
        @Override
        public void notice(String msg) {
            center.broadcast(identify, msg);
        }
    
        @Override
        public void whenReceived(String msg) {
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
            // 任务需要回滚
            rollbackFlag = true;
        }
    
        @Override
        public String identify() {
            return identify;
        }
    
        @Override
        public void rollback() {
            System.out.println(this.getClass().getName() + "任务回退!!!");
        }
    
        @SneakyThrows
        @Override
        public void run() {
            // 模拟任务耗时,执行9秒的任务
            for (int i = 0; i < 3; i++){
                Thread.sleep(3000);
                System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务");
            }
            if (rollbackFlag){
                rollback();
            }
        }
    }

    测试Main不变,执行

  • 相关阅读:
    轮播插件unsilder 源码解析(二)
    轮播插件unsilder 源码解析(一)---源码解析
    轮播插件unsilder 源码解析(一)---使用
    jquery插件扩展的学习
    基于canvas的陈列订货的分析
    grunt安装和使用教程
    2016订货会项目总结2
    2016工作项目完成总结
    不使用递归法求值
    easy bootstrap模板
  • 原文地址:https://www.cnblogs.com/LUA123/p/14042974.html
Copyright © 2011-2022 走看看