zoukankan      html  css  js  c++  java
  • 关于DeferredResult的思考

    使用SpringBoot搭建web程序,里面内置了tomcat,一般都不会关心内部实现机制,上来就可以写程序,并且可以跑起来。但是是思考了每次的请求是如何工作的。
    简单的来讲就是tomcat是将每次请求都将封装成一个Servlet,该Servlet来运行完业务逻辑代码,然后再有tomcat将信息返回给调用方。每个Servlet是同步的。即在该servlet的业务逻辑做完了然后才释放掉该Servlet。
    但是servlet3提供了一个异步的机制,即每次请求过来之后,可以先释放该请求,但是会保存一些信息。业务逻辑由程序其他线程来处理,处理完成后将其值设置到DeferredResult里面。然后再由容器将返回值返回给前端。
    这样做的好处:实现出现请求与业务IO分开,程序能够处理更多的请求。
    网上可以找到其他例子来学习DeferredResult是如何运行的,即在请求内部开启一个线程来处理

    @GetMapping
    public DeferredResult<String> queryDevice(){
    	DeferredResult<String> def = new DeferredResult<>();
    	new Thread(()->{
    		//处理业务逻辑
    		def.setResult("处理后的结果");
    	}).start();
    	return def;
    }
    

    这样很好理解,但是不能这样做,为什么,因为每一次线程的创建销毁是消耗资源的,这样频繁的创建和销毁非常影响性能。这个时候,可以提使用线程池来处理,对是可以这样做的。是的,可以这样做,但是需要考虑到,在某一时刻,可能会产生几千个线程,这样是非常多的,如果加上tomcat创建的Servlet线程数,那确实挺消耗资源的。
    上面已经有了一个可行的方案,这里提供我的一个思考,该思考是Java8新特性之后常用到的一个。

    下面有三个类:

    public abstract class Actor {
    
      public enum ActorType {
        ITC,  /* 立刻消费. */
    
        BLOCKING;  /* 阻塞.*/
      }
      /**
       * actor类型.
       */
      public ActorType type;
      /**
       * actor名.
       */
      public String name;
      public Actor(ActorType type) {
        this.type = type;
        this.name = this.getClass().getSimpleName();
      }
      /**
       * 任务消费
       */
      public void future(Consumer<Void> c) {
        if (this.type.ordinal() == ActorType.BLOCKING.ordinal()) {//阻塞
          ((ActorBlocking) this).push(c);
          return;
        } else {
          Misc.exeConsumer(c, null);
        }
      }
    }
    
    public class ActorBlocking extends Actor {
    
      /**
       * 等待处理的Consumer.
       */
      private ConcurrentLinkedQueue<Consumer<Void>> cs = new ConcurrentLinkedQueue<>();
      /**
       * 拥有线程的个数
       */
      private int tc = 1;
    
      /**
       * cs的size
       */
      private AtomicInteger size = new AtomicInteger(0);
    
      /**
       * 线程忙?
       */
      public volatile boolean busy = false;
    
      public ActorBlocking() {
        super(ActorType.BLOCKING);
      }
    
      /**
       * 添加任务.
       */
      public void push(Consumer<Void> c) {
        this.cs.add(c);
        this.size.incrementAndGet();
        synchronized (this) {//通知线程消费信息
          this.notify();
        }
      }
    
      /**
       * 线程忙?
       */
      public boolean isBusy() {
        return this.busy;
      }
    
      /**
       * 队列尺寸.
       */
      public int size() {
        return this.size.get();
      }
    
      public int getTc() {
        return tc;
      }
    
      public void setTc(int tc) {
        this.tc = tc < 1 ? 1 : tc;
      }
    
      /**
       * 启动线程
       */
      protected void start() {
        ActorBlocking ab = this;
        ExecutorService ex = Executors.newFixedThreadPool(this.tc);//创建线程池
        for (int i = 0; i < tc; i++) {
          ex.execute(() -> {
            while (true) {
              ab.run();
            }
          });
        }
      }
    
      /**
       * 抢占式消费任务
       */
      private void run() {
        Consumer<Void> c = this.cs.poll();
        if (c == null) {
          synchronized (this) {
            try {
              this.wait();
            } catch (InterruptedException e) {
            }
          }
          c = this.cs.poll();
        }
        if (c != null) /* 抢占式. */ {
          this.size.decrementAndGet();
          this.busy = true;
          Misc.exeConsumer(c, null);
          this.busy = false;
        }
      }
    }
    
    @Component
    public class AppActorBlocking extends ActorBlocking {
       //可以设置CPU*2的 	
      private int threadSize = 4;
    
      public AppActorBlocking() {
        this.setTc(threadSize);//设置线程数量
        this.start();
      }
    }
    

    该方法是工具Misc类总的方法:

     /**
       * 执行Consumer并将异常化解在内部.
       */
      public static final <T> boolean exeConsumer(Consumer<T> c, T t) {
        try {
          c.accept(t);
          return true;
        } catch (Exception e) {
          if (logger.isWarnEnabled()) {
            logger.warn("{}", Misc.trace(new Throwable()));
          }
          if (logger.isWarnEnabled()) {
            logger.warn("t: {}, e: {}", t, Misc.trace(e));
          }
          return false;
        }
      }
    

    如何调用:

    @Autowired
    public AppActorBlocking appBlocking;
    
    public void method(){
    	appBlocking.future(v->{
    		//处理逻辑代码
    	});
    }
    

    上面的代码理解是所有的业务逻辑都是一个个Task,每一次请求过来,那么我就将业务逻辑代码生成一个Task,放入到队列中,然后由线程去取其中的任务来消费。
    这里仅仅是换了一个思路,不是由线程池去创建线程来处理,而是创建几个线程,然后抢占式的去消费任务,而过来的每次请求,都会放入到队列中。

    DeferredResult的异步处理能够提升一些服务器的性能,处理更多的连接数,但是一个WEB程序,处理连接数还与内置默认的tomcat相关(SpringBoot下还有其他容器),即tomcat默认的处理最大连接数为200,除了最大连接数,还有一个tomcat的最大处理线程数,如果该处设置小了,那么并发也一定会小,在设置这些之外,需要设置一个等待队列的大小,总有一些请求是不能被处理的,但又不能拒绝掉,否则用户体验特别不好,那么就进入到等待队列中,等tomcat有空闲的线程再来处理等待队列中的线程。

    至于什么时候用到该DeferredResult,如果是访问量不大的程序,如管理系统,没必要使用到这个,毕竟没有访问量,反而增大了开发量,但是如果做了很好的封装,那么就没关系了,这个就考量各自程序员的水平了。

  • 相关阅读:
    为什么要用do-while(0)?
    网络字节序&大小端存储
    sql语句w3school教程
    C++编码规范
    std::deque双端队列介绍
    gdb基本操作
    gdb调试多线程
    数据库基础
    删除vector所有元素
    stl迭代器失效
  • 原文地址:https://www.cnblogs.com/skyice/p/10080612.html
Copyright © 2011-2022 走看看