zoukankan      html  css  js  c++  java
  • 使用CompletableFuture实现多个异步任务并行完成后合并结果

    业务场景

    需要同时从多个副本数据库中查询数据,并对查询结果进行合并去重处理后返回前端。

    实现过程涉及多数据源切换,这里不作过多讨论。

    编码实现

    实现过程:

    1、定义异步查询数据方法;

    2、通过CompletableFuture的allOf方法对多个异步执行结果进行处理;

    public class CompletableFutureTests {
    
        @Autowired
        private UserDao userDao;
    
        @Test
        public void testSomeTaskAndJoin() throws Exception {
    
            // DynamicDataSourceContextHolder.dataSourceIds根据动态数据源数量
            // 异步执行每个数据源查询方法
            // 返回一个Future集合
            List<CompletableFuture<List<User>>> futures = DynamicDataSourceContextHolder.dataSourceIds.stream()
                    .map(this::queryUsers).collect(Collectors.toList());
    
            // 多个异步执行结果合并到该集合
            List<User> futureUsers = new ArrayList<>();
    
            // 通过allOf对多个异步执行结果进行处理
            CompletableFuture allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                                    .whenComplete((v, t) -> {
                                        // 所有CompletableFuture执行完成后进行遍历
                                        futures.forEach(future -> {
                                            synchronized (this) {
                                                // 查询结果合并
                                                futureUsers.addAll(future.getNow(null));
                                            }
                                        });
                                    });
    
    
            // 阻塞等待所有CompletableFuture执行完成
            allFuture.get();
            // 对合并后的结果集进行去重处理
            List<User> result = futureUsers.stream().distinct().collect(Collectors.toList());
    
            log.info(result.toString());
    
        }
    
        /**
         * 用户异步查询方法
         * @param datasourceKey 动态数据源Key
         * @return
         */
        public CompletableFuture<List<User>> queryUsers(String datasourceKey) {
    
            // 定义异步查询Future对象
            CompletableFuture<List<User>> queryFuture = CompletableFuture.supplyAsync(() -> {
                // 切换数据源
                DynamicDataSourceContextHolder.setDataSourceRouterKey(datasourceKey);
                // 执行ORM查询方法
                return userDao.selectAll();
            });
    
            // 异步完成执行方法
            queryFuture.whenCompleteAsync(new BiConsumer<List<User>, Throwable>() {
                @Override
                public void accept(List<User> users, Throwable throwable) {
                    // 这里主要记录异步执行结果
                    log.info("数据源[{}]查询完成,查询记录[{}]条", datasourceKey, users.size());
                }
            });
    
            // 返回future对象
            return queryFuture;
        }
    
    }
  • 相关阅读:
    c# 动态加载工具栏按钮代码
    根据结果集处理工作事务c#源码
    关于升级后药库中报表需要重新设置的问题
    vs2005打开工程后退出
    把照片写入到DataTable
    关于导入最新住院管理后界面控件乱的问题
    django实战2运维日常维护统计
    ip_conntrack_netbios_n 报错
    django_book学习笔记7django常用方法总结
    python模块整理14re模版
  • 原文地址:https://www.cnblogs.com/changxy-codest/p/14486492.html
Copyright © 2011-2022 走看看