zoukankan      html  css  js  c++  java
  • 并发编程 —— 自己写一个异步回调 API

    1. 前言

    在并发编程中,异步回调的效率不言而喻,在业务开发中,如果由阻塞的任务需要执行,必然要使用异步线程。并且,如果我们想在异步执行之后,根据他的结果执行一些动作。

    JDK 8 之前的 Future 只能解决上面需求的一半问题,即异步执行,返回一个 Future,需要程序员调用 get 方法等待,或者使用 isDone 轮询。

    效率不高。

    JDK 8 新出的 CompletableFuture API 可以解决这个问题。但他的 API, 说实话,不太好用。

    我们只想要一个简单的 API,能实现我们的回调功能。

    我需要 3 个功能:

    1. 能通过 get 之类的方法返回结果。
    2. 能设置监听器进行回调。
    3. 可以在业务线程中设置成功或者失败。

    楼主写一个简单的例子,借鉴了 Netty 的异步 API,希望能起到抛砖引玉的作用。

    2. 设计

    根据我们的需求:
    第一,我们需要一个类,拥有 get 方法和 addListener 方法。
    第二,我们需要一个类,能够回调我们设置的监听器。
    第三,我们需要一个类,能够在业务线程中设置成功或者失败。

    3. 初步实现

    设计一个监听器接口:

    /**
     * 监听器
     * @author stateis0 
     */
    public interface MyListener {
      /**
       * 子类需要重写此方法,在异步任务完成之后会回调此方法。
       * @param promise 异步结果占位符。
       */
      void operationComplete(MyPromise promise);
    }
    

    设计一个异步占位符,类似 Future:

    /**
     * 异步执行结果占位符
     *
     * @author stateis0
     */
    public class MyPromise {
    
      /** 监听器集合*/
      List<MyListener> listeners = new ArrayList<MyListener>();
    
      /** 是否成功*/
      boolean success;
    
      /** 执行结果**/
      Object result;
    
      /** 设置事变计数器**/
      int failCount;
    
      /**
       * 设置成功,并通知所有监听器。
       * @param result 结果
       * @return 是否成功
       */
      public boolean setSuccess(Object result) {
        if (success) {
          return false;
        }
    
        success = true;
        this.result = result;
    
        signalListeners();
        return true;
      }
    
      /**
       * 通知所有监听器,回调监听器方法。
       */
      private void signalListeners() {
        for (MyListener l : listeners) {
          l.operationComplete(this);
        }
      }
    
      /**
       * 设置失败
       * @param e 异常对象
       * @return 设置是否成功
       */
      public boolean setFail(Exception e) {
        if (failCount > 0) {
          return false;
        }
        ++failCount;
        result = e;
        signalListeners();
        return true;
      }
    
      /**
       * 是否成功执行
       */
      public boolean isSuccess() {
        return success;
      }
    
      /**
       * 添加监听器
       * @param myListener 监听器
       */
      public void addListener(MyListener myListener) {
        listeners.add(myListener);
      }
    
      /**
       * 删除监听器
       * @param myListener 监听器
       */
      public void removeListener(MyListener myListener) {
        listeners.remove(myListener);
      }
    
      /**
       * 获取执行结果
       */
      public Object get() {
        return result;
      }
    }
    

    我们希望使用线程池执行此类任务,所以需要一个自定义的 Runnable,而在这个 Runnable 中,我们需要做一些简单的手脚:

    /**
     * 一个任务类,通过重写 doWork 方法执行任务
     * @param <V> 返回值类型
     * @author stateis0 
     */
    public abstract class MyRunnable<V> implements Runnable {
    
      final MyPromise myPromise;
    
      protected MyRunnable(MyPromise myPromise) {
        this.myPromise = myPromise;
      }
    
      @Override
      public void run() {
        try {
          V v = doWork();
          myPromise.setSuccess(v);
        } catch (Exception e) {
          myPromise.setFail(e);
        }
      }
    
      /**
       * 子类需要重写此方法。并返回值,这个值由 Promise 的 get 方法返回。
       */
      public abstract V doWork();
    }
    

    4. 写个 Demo 测试一下

    /**
     * @author stateis0
     */
    public class MyDemo {
    
      public static void main(String[] args) {
    
        // 占位对象
        final MyPromise myPromise = new MyPromise();
    
        final Dto dto = new Dto();
    
        // 线程池
        Executor executor = Executors.newFixedThreadPool(1);
    
        // 异步执行任务,
        executor.execute(new MyRunnable<String>(myPromise) {
          @Override
          public String doWork() {
            return dto.doSomething();
          }
        });
    
        // 添加一个监听器
        myPromise.addListener(new MyListener() {
          // 当任务完成后,就执行此方法。
          @Override
          public void operationComplete(MyPromise promise) {
            // 获取结果
            String result;
            // 如果任务成功执行了
            if (promise.isSuccess()) {
              // 获取结果并打印
              result = (String) promise.get();
              System.out.println("operationComplete ----> " + result);
            }
            // 如果失败了, 打印异常堆栈
            else {
              ((Exception) promise.get()).printStackTrace();
            }
          }
        });
      }
    
    }
    
    class Dto {
    
      public String doSomething() {
        System.out.println("doSomething");
    //    throw new RuntimeException("cxs");
        return "result is success";
      }
    }
    

    执行结果:

    doSomething
    operationComplete ----> result is success
    

    符合我们的预期。我们希望在业务对象 Dto 的 doSomething 成功返回之后,回调监听器的 operationComplete 方法。如果失败,打印异常堆栈。

    当然,整体代码比较简单,仅仅只是抛砖引玉。

    实际上,如果直接向 Callable 或者 Runnable 传入一个业务对象,当 call 方法或者 run 方法执行完毕,就可以根据执行结果执行我们的业务对象的方法了。这样就是一个最简单直接的异步回调。

    只是这样过于耦合。

    异步任务和业务的任务耦合在了一起,并且不能添加多个监听器,也无法使用 promise 的 setSuccess 功能和 setFail 功能,这两个功能可以在业务线程中设置成功或者失败,灵活性更高。

    关于异步,我们可以多看看 Netty 的 API 设计,易懂好用。

  • 相关阅读:
    Python3入门基础--str常用方法
    大学jsp实验4include,forword
    大学jsp实验3include指令的使用
    初识MFC----运行时类信息机制
    状态栏
    工具栏
    菜单栏
    程序启动画面
    字符串的截取
    字符串相关类
  • 原文地址:https://www.cnblogs.com/stateis0/p/9062055.html
Copyright © 2011-2022 走看看