zoukankan      html  css  js  c++  java
  • 异步责任链

    1、创建抽象记录器类

    public interface IRequestProcessor {
        void process(Request request);
    }

    2、记录器实现类1

    import java.util.concurrent.LinkedBlockingQueue;
    
    public class PrevProcessor extends Thread implements IRequestProcessor {
        // 阻塞队列
        private LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<>();
    
        private IRequestProcessor nextProcessor;
    
        private volatile boolean isFinish = false;
    
        public PrevProcessor(IRequestProcessor nextProcessor) {
            this.nextProcessor = nextProcessor;
        }
        public PrevProcessor() { }
    
        public void shutdown(){
            isFinish = true;
        }
    
        @Override
        public void run() {
            while (!isFinish){
                try {
                    // 阻塞式获取
                    Request request = requests.take();
                    // 处理逻辑
                    System.out.println("prevProcessor::" + request);
    
                    // 交给下一个责任链
                    if(null != nextProcessor){
                        nextProcessor.process(request);
                    }
    
                    // 线程退出机制
                    shutdown();
                } catch (InterruptedException e) {
                    // 所有和阻塞相关的方法都会抛出 InterruptedException
                    e.printStackTrace();
                }
            }
        }
    
    
        @Override
        public void process(Request request) {
            // 加到队列
            requests.add(request);
        }
    
    
    }

    3、记录器实现类2

    import java.util.concurrent.LinkedBlockingQueue;
    
    public class SaveProcessor extends Thread implements IRequestProcessor {
        // 阻塞队列
        private LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<>();
    
        private IRequestProcessor nextProcessor;
    
        private volatile boolean isFinish = false;
    
        public SaveProcessor() {    }
    
        public SaveProcessor(IRequestProcessor nextProcessor) {
            this.nextProcessor = nextProcessor;
        }
    
        public void shutdown(){
            isFinish = true;
        }
    
        @Override
        public void run() {
            while (!isFinish){
                try {
                    // 阻塞式获取
                    Request request = requests.take();
                    // 处理逻辑
                    System.out.println("saveProcessor::" + request);
    
                    // 交给下一个责任链
                    if(null != nextProcessor){
                        nextProcessor.process(request);
                    }
    
                    // 线程退出机制
                    shutdown();
                } catch (InterruptedException e) {
                    // 所有和阻塞相关的方法都会抛出 InterruptedException
                    e.printStackTrace();
                }
            }
        }
    
    
        @Override
        public void process(Request request) {
            // 加到队列
            requests.add(request);
        }

    4、测试类

    public class App {
        private static IRequestProcessor processor;
        
        static {
            SaveProcessor saveProcessor = new SaveProcessor();
            saveProcessor.start();
            processor = new PrevProcessor(saveProcessor);
            ((PrevProcessor) processor).start();
        }
        
        public static void main(String[] args) {
            Request request = new Request("aaaa");
            processor.process(request);
        }
    
    }
  • 相关阅读:
    Linux文件管理详解
    Linux用户管理详解
    Error: No suitable device found: no device found for connection "System eth0" 解决方法
    Linux配置网络详解
    万能的数据传输格式XML入门教程
    【万能的数据传输格式XML入门教程】八、Xpath查询语言
    【万能的数据传输格式XML入门教程】七、SimpleXML模型(查询操作)
    h5标签兼容
    vue 常用指令
    echarts3更新
  • 原文地址:https://www.cnblogs.com/zh-ch/p/12196755.html
Copyright © 2011-2022 走看看