zoukankan      html  css  js  c++  java
  • 使用CompletableFuture实现多个异步任务并行完成后合并结果

    业务场景

    需要同时从多个副本数据库中查询数据,并对查询结果进行合并去重处理后返回前端。

    实现过程涉及多数据源切换,这里不作过多讨论。

    编码实现

    实现过程:

    1、定义异步查询数据方法;

    2、通过CompletableFuture的allOf方法对多个异步执行结果进行处理;

    public class CompletableFutureTests {
    
        @Autowired
        private UserDao userDao;
    
        @Test
        public void testSomeTaskAndJoin() throws Exception {
    
            // DynamicDataSourceContextHolder.dataSourceIds根据动态数据源数量
            // 异步执行每个数据源查询方法
            // 返回一个Future集合
            List<CompletableFuture<List<User>>> futures = DynamicDataSourceContextHolder.dataSourceIds.stream()
                    .map(this::queryUsers).collect(Collectors.toList());
    
            // 多个异步执行结果合并到该集合
            List<User> futureUsers = new ArrayList<>();
    
            // 通过allOf对多个异步执行结果进行处理
            CompletableFuture allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                                    .whenComplete((v, t) -> {
                                        // 所有CompletableFuture执行完成后进行遍历
                                        futures.forEach(future -> {
                                            synchronized (this) {
                                                // 查询结果合并
                                                futureUsers.addAll(future.getNow(null));
                                            }
                                        });
                                    });
    
    
            // 阻塞等待所有CompletableFuture执行完成
            allFuture.get();
            // 对合并后的结果集进行去重处理
            List<User> result = futureUsers.stream().distinct().collect(Collectors.toList());
    
            log.info(result.toString());
    
        }
    
        /**
         * 用户异步查询方法
         * @param datasourceKey 动态数据源Key
         * @return
         */
        public CompletableFuture<List<User>> queryUsers(String datasourceKey) {
    
            // 定义异步查询Future对象
            CompletableFuture<List<User>> queryFuture = CompletableFuture.supplyAsync(() -> {
                // 切换数据源
                DynamicDataSourceContextHolder.setDataSourceRouterKey(datasourceKey);
                // 执行ORM查询方法
                return userDao.selectAll();
            });
    
            // 异步完成执行方法
            queryFuture.whenCompleteAsync(new BiConsumer<List<User>, Throwable>() {
                @Override
                public void accept(List<User> users, Throwable throwable) {
                    // 这里主要记录异步执行结果
                    log.info("数据源[{}]查询完成,查询记录[{}]条", datasourceKey, users.size());
                }
            });
    
            // 返回future对象
            return queryFuture;
        }
    
    }
  • 相关阅读:
    【源码解析】Flink 是如何处理迟到数据
    Flink assignAscendingTimestamps 生成水印的三个重载方法
    【翻译】生成 Timestamps / Watermarks
    【翻译】The Broadcast State Pattern(广播状态)
    基于Broadcast 状态的Flink Etl Demo
    git 更新fork的远程仓库
    Flink 在IDEA执行时的webui
    配置ssh免密,仍需要密码
    第二章 Kubernetes进阶之使用二进制包部署集群
    Kubernetes之Ingress
  • 原文地址:https://www.cnblogs.com/changxy-codest/p/14486492.html
Copyright © 2011-2022 走看看