zoukankan      html  css  js  c++  java
  • (05)使用DeferredResult多线程异步处理请求

      通常处理HTTP请求时使用同步处理的方式,但有时根据业务场景和性能要求异步处理可能更合适。简单说下概念。

      同步处理:一个HTTP请求进入一个主线程,主线程处理完后给出一个HTTP响应。

      异步处理:一个HTTP请求进入一个主线程,主线程调用一个副线程,副线程处理业务逻辑,当副线程处理完后,主线程把结果返回给给客户端。在副线程处理逻辑的同时,主线程可以空闲出来处理其他请求。因为服务器同时处理的线程数量有限,所以异步处理提升了服务器的吞吐量。

      异步处理请求有Runnable和DeferredResult两种方式,下面举例说一下。

      1、Runnable异步处理REST服务

    @RestController
    public class AsyncController {    
        
        private Logger logger=LoggerFactory.getLogger(getClass());
        
        @RequestMapping("/order")
        public Callable<String> order(){
            logger.info("主线程开始");
            Callable<String> result=new Callable<String>() {
                @Override 
                public String call() throws Exception {
                    logger.info("副线程开始");
                    Thread.sleep(5000);
                    logger.info("副线程结束");
                    return "success"; 
                }
            };
            logger.info("主线程返回");
            return result;
        }
    }

      启动服务,浏览器访问http://localhost/order,页面5秒后返回success,查看日志如下 :

      

      日志中显示了两个线程,主线程返回后过了5秒副线程结束。

      2、DeferredResult异步处理Rest服务

      通过上面的例子可以看到使用Runnable异步处理,副线程由主线程调起。这样有些业务场景不能满足,所以使用DeferredResult。举例如下:

      

      接受下单请求和处理下单逻辑的应用不在一台服务器上。线程1接受HTTP请求,并把该请求放到一个消息队列里。应用2监听消息队列,当监听到消息队列里有下单请求后处理下单逻辑,处理完后把结果再放到这个消息队列里。应用1中线程2监听消息队列,当监听到订单处理结果的消息后,会根据这个消息的结果返回一个HTTP响应给客户端。在这个场景中,线程1和线程2是完全隔离的,谁也不知道谁的存在。

      模拟消息队列:MockQueue.java,setPlaceOrder方法表示队列中处理下单请求,创建一个Thread模拟其它应用环境。

    @Component
    public class MockQueue {
        
        private Logger logger=LoggerFactory.getLogger(getClass());
        
        private String placeOrder;
        private String completeOrder;
        
        public String getPlaceOrder() {
            return placeOrder;
        }
        public void setPlaceOrder(String placeOrder){
            new Thread(()->{
                logger.info("接到下单请求:"+placeOrder);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.completeOrder = placeOrder;
                logger.info("下单请求处理完毕:"+placeOrder);
            }).start();
        }
        public String getCompleteOrder() {
            return completeOrder;
        }
        public void setCompleteOrder(String completeOrder) {
            this.completeOrder = completeOrder;
        }
    }

      在线程1和线程2之间传递DeferredResult对象:DeferredResultHolder.java,Map中key是订单号,value是处理结果。

    @Component
    public class DeferredResultHolder {
    
        private Map<String,DeferredResult<String>> map=new HashMap<String,DeferredResult<String>>();
    
        public Map<String, DeferredResult<String>> getMap() {
            return map;
        }
        public void setMap(Map<String, DeferredResult<String>> map) {
            this.map = map;
        }
    }

      处理请求:AsyncController.java

    @RestController
    public class AsyncController {    
        
        private Logger logger=LoggerFactory.getLogger(getClass());
        
        @Autowired
        private MockQueue mockQueue;
        @Autowired
        private DeferredResultHolder deferredResultHolder;
        
        @RequestMapping("/order")
        public DeferredResult<String> order(){
            logger.info("主线程开始");
            String orderNumber=RandomStringUtils.randomNumeric(8);
            mockQueue.setPlaceOrder(orderNumber);
            DeferredResult<String> result=new DeferredResult<>();
            deferredResultHolder.getMap().put(orderNumber, result);
            logger.info("主线程返回");
            return result;
        } 
    }

      模拟线程2的代码,监听器:QueueListener.java。ContextRefreshedEvent事件是整个Spring容器初始化完毕的事件。

    @Component
    public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {
        
        private Logger logger=LoggerFactory.getLogger(getClass());
        
        @Autowired
        private MockQueue mockQueue;
        @Autowired
        private DeferredResultHolder deferredResultHolder;
        
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            new Thread(()->{//开启一个线程,否则Spring容器阻塞,无法启动
                while(true) {
                    if(StringUtils.isNotBlank(mockQueue.getCompleteOrder())) {
                        String orderNumber=mockQueue.getCompleteOrder();
                        logger.info("返回订单处理结果:"+orderNumber);
                        deferredResultHolder.getMap().get(orderNumber).setResult("place order success");
                        mockQueue.setCompleteOrder(null);
                    }else {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }

      启动容器:请求3次http://localhost/order,页面输出place order success,打印日志如下:

      

  • 相关阅读:
    在Windows中,U盘或者移动硬盘关不掉时,怎么知道是被哪个程序占用了呢?
    选择的文件中包含不支持的格式
    FTO Obesity Variant Circuitry and Adipocyte Browning in Humans
    SNPsnap | 筛选最佳匹配的SNP | 富集分析 | CP loci
    PhastCons | 序列保守性打分
    hg19基因组 | 功能区域 | 位置提取
    投稿SCI杂志 | 如何撰写cover letter | 如何绘制illustrated abstract
    variant的过滤 | filtering and prioritizing genetic variants
    会议录音的处理 | 提高音量 + 降噪 + 自动添加字幕
    小型数据工作站 | 管理和维护 | Jupyter | Rstudio server | Mac & Win10
  • 原文地址:https://www.cnblogs.com/javasl/p/12983131.html
Copyright © 2011-2022 走看看