zoukankan      html  css  js  c++  java
  • 责任链异步处理设计模型


    简介

    Advoid coupling the sender of a reuest to its receiver by giving more than one object a chance to handle the request. Chain the receiving objects and pass the request along the chain until an object handles it.

    使多个对象都有机会处理请求,从而避免了请求的发送者和接收者之间的耦合关系。将这些对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止。


    异步处理模型

    当业务逻辑比较复杂且是异步请求时,我们可以使用责任链模型来划分职责,分工明确,减轻业务逻辑复杂度;可以使用异步模型来优化责任链的处理性能。


    1.定义一个业务请求

    import lombok.Data;
    
    /**
     * <p>
     * Request
     * </p>
     *
     * @author: kancy
     * @date: 2019/10/21 9:42
     **/
    @Data
    public class Request {
        private long Id;
        private String name;
    }
    
    

    2.定义责任链接口

    /**
     * <p>
     * ITaskProcesser
     * </p>
     *
     * @author: kancy
     * @date: 2019/10/21 9:40
     **/
    public interface ITaskProcesser {
        void process(Request request);
    }
    

    3.定义责任链异步处理模板

    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * <p>
     * AbstractThreadTaskProcesser
     * </p>
     *
     * @author: kancy
     * @date: 2019/10/21 9:40
     **/
    public abstract class AbstractThreadTaskProcesser extends Thread implements ITaskProcesser {
    
        /**
         * 任务队列
         */
        private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();
    
        /**
         * 下一个处理器
         */
        private ITaskProcesser nextProcesser;
    
        /**
         * 是否停止线程
         */
        private volatile boolean isStop = false;
    
        /**
         * 线程是否运行
         */
        private boolean isRun = false;
    
        public AbstractThreadTaskProcesser() {
            setThreadTaskName();
        }
    
        public AbstractThreadTaskProcesser(ITaskProcesser nextProcesser) {
            setThreadTaskName();
            this.nextProcesser = nextProcesser;
        }
    
        /**
         * 责任链处理
         * @param request
         */
        @Override
        public void process(Request request) {
            // 添加请求任务
            addRequestToQueue(request);
            // 启动线程
            if (!isRun){
                synchronized (this){
                    if (!isRun){
                        super.start();
                        isRun = true;
                    }
                }
            }
        }
    
        /**
         * 异步线程处理:真正的在处理业务逻辑
         */
        @Override
        public void run() {
            while (!isStop){
                try {
                    // 取出一个任务
                    Request request = queue.take();
                    // 处理当前任务需要做的事情
                    boolean result = doRequest(request);
                    // 处理成功后,将任务请求交给下一个任务处理器
                    if (result && nextProcesser != null){
                        nextProcesser.process(request);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 防止子类开启线程,有父类维护,使用时启动任务线程
         */
        @Override
        public synchronized void start() {
            // 关闭入口
            throw new UnsupportedOperationException("自动启动线程,无需手动开启");
        }
    
        /**
         * 子类处理请求
         * @param request
         * return boolean 是否执行下一个处理器
         */
        protected abstract boolean doRequest(Request request);
    
        /**
         * 停止处理器
         */
        protected void stopProcesser(){
            isStop = true;
        }
    
        /**
         * 获取任务队列
         */
        protected LinkedBlockingQueue<Request> getTaskQueue() {
            return queue;
        }
    
        /**
         * 获取下一个处理器
         * @return
         */
        protected ITaskProcesser getNextProcesser() {
            return nextProcesser;
        }
    
        /**
         * 添加请求到任务队列
         * @param request
         */
        private void addRequestToQueue(Request request){
            queue.add(request);
        }
    
        /**
         * 线程线程名称
         */
        private void setThreadTaskName() {
            setName(String.format("%s_Thread_%s", getClass().getSimpleName(), getId()));
        }
    }
    
    

    4.编写具体任务处理器

    • FirstThreadTaskProcesser

      /**
       * <p>
       * FirstThreadTaskProcesser
       * </p>
       *
       * @author: kancy
       * @date: 2019/10/21 9:40
       **/
      public class FirstThreadTaskProcesser extends AbstractThreadTaskProcesser {
      
          public FirstThreadTaskProcesser(ITaskProcesser nextProcesser) {
              super(nextProcesser);
          }
      
          /**
           * 子类处理请求
           *
           * @param request
           */
          @Override
          protected boolean doRequest(Request request) {
              System.out.println(Thread.currentThread().getName() + " 开始处理!");
              return true;
          }
      }
      
      
    • PrintRequestThreadTaskProcesser

      /**
       * <p>
       * PrintRequestThreadTaskProcesser
       * </p>
       *
       * @author: kancy
       * @date: 2019/10/21 10:06
       **/
      public class PrintRequestThreadTaskProcesser extends AbstractThreadTaskProcesser {
      
          public PrintRequestThreadTaskProcesser(ITaskProcesser nextProcesser) {
              super(nextProcesser);
          }
      
          /**
           * 子类处理请求
           *
           * @param request
           */
          @Override
          protected boolean doRequest(Request request) {
              System.out.println("request: " + request);
              return true;
          }
      }
      
    • BusinessThreadTaskProcesser

      /**
       * <p>
       * BusinessThreadTaskProcesser
       * 针对耗时比较久的逻辑,可以用线程池多线程来优化性能
       * </p>
       *
       * @author: kancy
       * @date: 2019/10/21 12:28
       **/
      public class BusinessThreadTaskProcesser implements ITaskProcesser {
          private final ExecutorService executorService = Executors.newFixedThreadPool(10);
      
          private final ITaskProcesser nextProcesser;
      
          public BusinessThreadTaskProcesser(ITaskProcesser nextProcesser) {
              this.nextProcesser = nextProcesser;
          }
      
          @Override
          public void process(Request request) {
              BusinessRequestHandler handler = new BusinessRequestHandler(request);
              executorService.submit(handler);
              // callNextProcesser(request);
          }
      
          class BusinessRequestHandler implements Runnable{
              private Request request;
      
              public BusinessRequestHandler(Request request) {
                  this.request = request;
              }
      
              @Override
              public void run() {
                  try {
                      TimeUnit.SECONDS.sleep(3);
                      System.out.println(Thread.currentThread().getName() + " 处理成功!");
                      callNextProcesser(request);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
      
          private void callNextProcesser(Request request) {
              if (nextProcesser != null){
                  nextProcesser.process(request);
              }
          }
      
      }
      
    • LastThreadTaskProcesser

      /**
       * <p>
       * FirstThreadTaskProcesser
       * </p>
       *
       * @author: kancy
       * @date: 2019/10/21 9:40
       **/
      public class LastThreadTaskProcesser extends AbstractThreadTaskProcesser {
      
          /**
           * 子类处理请求
           *
           * @param request
           */
          @Override
          protected boolean doRequest(Request request) {
              System.out.println(Thread.currentThread().getName() + " 处理完成!");
              return true;
          }
      }
      

    5.编写测试类

    
    import java.util.concurrent.TimeUnit;
    
    /**
     * <p>
     * ChainTests
     * </p>
     *
     * @author: kancy
     * @date: 2019/10/21 9:39
     **/
    public class ChainTests {
    
        public static void main(String[] args) {
            ITaskProcesser lastThreadTaskProcesser = new LastThreadTaskProcesser();
            BusinessThreadTaskProcesser businessThreadTaskProcesser = new BusinessThreadTaskProcesser(lastThreadTaskProcesser);
            ITaskProcesser printRequestThreadTaskProcesser = new PrintRequestThreadTaskProcesser(businessThreadTaskProcesser);
            ITaskProcesser firstThreadTaskProcesser = new FirstThreadTaskProcesser(printRequestThreadTaskProcesser);
    
    
            Thread r1 = new CreateRequestThread(firstThreadTaskProcesser);
            r1.start();
            Thread r2 = new CreateRequestThread(firstThreadTaskProcesser);
            r2.start();
            Thread r3 = new CreateRequestThread(firstThreadTaskProcesser);
            r3.start();
    
        }
    
        static class CreateRequestThread extends Thread {
            ITaskProcesser iTaskProcesser;
    
            public CreateRequestThread(ITaskProcesser iTaskProcesser) {
                this.iTaskProcesser = iTaskProcesser;
            }
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        iTaskProcesser.process(new Request());
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    注:本文只做抛砖引玉,在实际开发过程中,可以自行扩展。

  • 相关阅读:
    Linux NFS服务器的安装与配置
    mysql 批量更新的四种方法
    解决 RHEL 7/ CentOS 7/Fedora 出现Unit iptables.service failed to load
    linux 搭建svn
    MYSQL的慢查询两个方法
    Apache 配置虚拟主机三种方式
    MYSQL explain详解
    php操作memcache的使用【转】
    PHPExcel中open_basedir restriction in effect的解决方法
    微信浏览器禁止app下载链接的两种处理方法
  • 原文地址:https://www.cnblogs.com/kancy/p/11712402.html
Copyright © 2011-2022 走看看