zoukankan      html  css  js  c++  java
  • 当Parallel遇上了DI

    分析淘宝PDP

    让我们先看个图, Taobao的PDP(Product Detail Page)页.

    打开Chrome Network面板, 让我们来看taobao是怎么加载这个页面数据的. 根据经验, 一般是异步加载的, 要么是XHR,要么就是js(jsonp), 你应该很快可以找到

    还能看到这个接口的性能

    神奇的是, taobao竟然在一次请求中拉下了整个PDP页的完整数据, 而且服务端处理耗时不到125ms

    首先, 这么做有什么好处?

    • 前后端开发对接简单
    • 在一次网络连接中尽可能多的传输数据(数据大小要不影响用户体验, 一般不会超过300kb), 减少建立连接的次数和请求头浪费的流量.

    然后, 这又是怎么做到的呢?

    你可能会说缓存, 但你要知道, 这样一个对电商极为重要的页面, 绝对涉及到了非常多的团队, 比如:

    • 商品团队
    • 卖家团队
    • 评价团队
    • 订单团队
    • 会员团队
    • 优惠团队
    • 问答团队
    • 推荐团队
    • 物流系统
    • etc/等等

    即使每个团队的数据全都是缓存的, 你一个个去拿, 要在125ms内拿完也不容易. 而且作为跟钱相关的页面, 部分数据必须保证绝对实时有效, 能用缓存的地方不多. 怎么办, 如果是你, 你会怎么做? 离线打标? 数据预热? etc..

    此时, 并行调用不失为一种好办法.

    分析一下这个页面, 你会发现, 每一个模块除了属于同一个商品(入参相同), 其实各个模块的数据之间, 并没有依赖性, 完全可以并行去获取.

    并行就没有问题了吗?

    并行获取数据, 可以提高我们的接口性能. 但也会引入一些问题, 如:

    • 依赖的项可能很多, 怎么使代码简洁清晰?
    • 依赖关系很可能是一个有向图, 如果做到有向图中的每个节点都可以并行执行?
    • 异步处理后, 超时怎么处理? 业务代码抛出异常了怎么处理?
    • 依赖关系如果有死循环怎么办?
    • 异步之后, ThreadLocal中的内容怎么处理? 一些基于ThreadLocal实现的Context不work怎么办?
    • 事务被线程隔离了怎么办?
    • 如何监控每一次异步执行, 每个节点的性能?

    下面, 我们来讨论下如何简单易用高效的并行获取数据; 如何解决上述异步问题.

    常见的并行方式

    假如你现在需要用户的基础信息博客列表粉丝列表 3份数据. 哪么你有哪些方式可以并行获取呢?

    Java ThreadPool并行

    最简单原始的办法, 直接使用Java提供了的线程池和Future机制.

    public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        Future<User> userFuture = executorService.submit(() -> {
            try{
                return userService.get(userId);
            }finally {
                countDownLatch.countDown();
            }
        });
        Future<List<Post>> postsFuture = executorService.submit(() -> {
            try{
                return postService.getPosts(userId);
            }finally {
                countDownLatch.countDown();
            }
        });
        Future<List<User>> followersFuture = executorService.submit(() -> {
            try{
                return followService.getFollowers(userId);
            }finally {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        User user = userFuture.get();
        user.setFollowers(followersFuture.get());
        user.setPosts(postsFuture.get());
        return user;
    }
    

    Spring的异步并行

    我们知道, Spring支持@Async注解, 可以方便的实现异步, 并且支持获取返回值. 参考: https://www.baeldung.com/spring-async#2-methods-with-return-type

    @Async实现的原理实际是在Bean的代理类的方法中, 拦截方法调用, 向taskExecutor Bean中提交Callable任务. 原理跟自己用Java ThreadPool写其实区别不大.

    那么要用Spring Async实现上述功能. 首先需要修改下面3个方法的返回值, 并且修改返回值类型, 并为方法添加 @Async注解

    class UserServiceImpl implements UserService {
        @Async
        public Future<User> get(Long userId) {
            // ... something
        }
    }
    class PostServiceImpl implements PostService {
        @Async
        public Future<List<Post> getPosts(Long userId) {
            // ... something
        }
    }
    class FollowServiceImpl implements FollowService {
        @Async
        public Future<List<User> getFollowers(Long userId) {
            // ... something
        }
    }
    

    并行获取3份用户数据然后聚合, 代码如下:

    public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
        Future<User> userFuture = userService.get(userId);
        Future<List<Post>> postsFuture = postService.getPosts(userId);
        Future<List<User>> followersFuture = followService.getFollowers(userId);
        
        User user = whileGet(userFuture);
        user.setFollowers(whileGet(followersFuture));
        user.setPosts(whileGet(postsFuture));
        return user;
    }
    
    private <T> T whileGet(Future<T> future) throws ExecutionException, InterruptedException {
        while(true) {
            if (future.isDone()) {
                break;
            }
        }
        return future.get();
    }
    

    这里使用自旋去获取异步数据. 当然你也可以像前面那样, 传递一个闭锁(CountDownLatch)到Service中去, 然后让主调线程在一个闭锁上面等待.

    并行结合DI(依赖注入)

    上面2种方式的确能实现功能, 但首先, 它们都很不直观, 而且没有处理前面讲到的异步问题, 一旦出现超时异常ThreadLocal, 代码可能不会按照你预期的方式工作. 那有没有更简单方便可靠的方法呢?

    试想这样一种方式, 如果你需要的数据, 都可以通过方法入参自动并行获取, 然后传递给你, 那是不是很方便? 就像这样:

    @Component
    public class UserAggregate {
        @DataProvider("userWithPosts")
        public User userWithPosts(
                @DataConsumer("user") User user,
                @DataConsumer("posts") List<Post> posts,
                @DataConsumer("followers") List<User> followers) {
            user.setPosts(posts);
            user.setFollowers(followers);
            return user;
        }
    }
    

    这里的@DataConsumer声明了你要异步获取的数据id. @DataProvider声明了这个方法提供数据, 并且id为userWithPosts.

    或者你不想写这样一个Aggregate类, 你不需要复用, 你想直接创建一个"匿名Provider". 那么你可以直接在任何地方像下面这样调用拿结果

    User user = dataBeanAggregateQueryFacade.get(
         Collections.singletonMap("userId", 1L), 
         new Function3<User, List<Post>,List<User>, User>() {
                @Override
                public User apply(@DataConsumer("user") User user, 
                                  @DataConsumer("posts") List<Post> posts,
                                  @DataConsumer("followers") List<User> followers) {
                    user.setPosts(posts);
                    user.setFollowers(followers);
                    return user;
                }
         });
    Assert.notNull(user,"user not null");
    Assert.notNull(user.getPosts(),"user posts not null");
    

    这里的Function3接收4个泛型参数, 最后一个User表示返回值类型, 前3个参数依次对应apply方法的3个入参类型. 项目预定义了Function2-Function5, 支持不超过5个参数, 如果你需要更多参数, 可以编写一个接口(FunctionInterface), 继承MultipleArgumentsFunction接口即可.

    很显然

    • 每一个 @DataConsumer 只会对应一个 @DataProvider .
    • 一个 @DataProvider 可能被多个 @DataConsumer 消费 .
    • 一个 @DataProvider 通过多个 @DataConsumer 依赖上多个 @DataProvider.

    现在, 就有这样一个项目, 实现了上述功能. 只需要在你的方法上, 添加一些注解. 就可以迅速地让你的调用树转为并行.

    项目地址: https://github.com/lvyahui8/spring-boot-data-aggregator

    你不用care底层如何实现. 只有在你有定制化的需求时, 才去关心一些配置参数. 去扩展一些能力.

    实现原理

    1. 在Spring启动之时, 扫描应用中的 @DataProvider@DataConsumer 注解. 分析记录下依赖关系(有向非连通图), 并且记录好@DataProvider和Spring Bean的映射关系.
    2. 当进行查询时, 从已经记录好的依赖关系中拿出依赖树, 使用线程池和闭锁(CountLatchDown), 递归异步调用孩子节点对应的Bean方法, 拿到结果后作为入参注入当前节点 (近似广度优先, 但因为并行的原因, 节点的访问顺序是不确定的).
    3. 在发起递归调用前, 传入进一个map, 用来存放查询参数, 方法中没有@DataConsumer注解的入参, 将从此map中取值.
    4. @DataProvider@DataConsumer 注解可以支持一些参数, 用来控制超时时间异常处理方式是否幂等缓存等等.

    怎么解决并行/异步后引入的新问题

    超时怎么控制 ?

    @DataProvider 注解支持 timeout 参数, 用来控制超时. 实现原理是通过闭锁的超时等待方法.

    java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)
    

    异常怎么处理 ?

    对异常提供两种处理方式: 吞没或者向上层抛出.

    @DataConsumer 注解支持exceptionProcessingMethod 参数, 用来表示这个Consumer想怎么处理Provider抛出的异常.

    当然, 也支持在全局维度配置. 全局配置的优先级低于(<)Consumer配置的优先级.

    依赖关系有死循环怎么办 ?

    Spring Bean初始化, 因为Bean创建和Bean属性赋值分了两步走, 因此可以用所谓的"早期引用"解决循环依赖的问题.

    但如果你循环依赖的Bean, 依赖关系定义在构造函数入参上, 那么是没法解决循环依赖的问题的.

    同理, 我们通过方法入参, 异步注入依赖数据, 在方法入参没有变化的情况下, 也是无法结束死循环的. 因此必须禁止循环依赖.

    那么问题变为了怎么禁止循环依赖. 或者说, 怎么检测有向非联通图中的循环依赖, 两个办法:

    • 带染色的DFS遍历: 节点入栈访问前, 先标记节点状态为"访问中", 之后递归访问孩子节点, 递归完成后, 将节点标记为"访问完成". **如果在DFS递归过程中, 再次访问到"访问中"的节点, 说明有环. **
    • 拓扑排序: 把有向图的节点排成一个序列, 不存在索引号较高的节点指向索引号较低的节点, 表示图存在拓扑排序. 拓扑排序的实现方法是, 先删除入度为0的节点, 并将领接节点的入度 - 1, 直到所有节点都被删除. 很显然, 如果有向图中有环, 那么环里节点的入度不可能为0 , 那么节点不可能删完. 因此, 只要满足节点未删完 && 不存在入度为0的节点, 那么一定有环.

    这里我们用领接表+DFS染色搜索, 来实现环的检查

    private void checkCycle(Map<String,Set<String>> graphAdjMap) {
        Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2);
        for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) {
            if (visitStatusMap.containsKey(item.getKey())) {
                continue;
            }
            dfs(graphAdjMap,visitStatusMap,item.getKey());
        }
    }
    
    private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) {
        if (visitStatusMap.containsKey(node)) {
            if(visitStatusMap.get(node) == 1) {
                List<String> relatedNodes = new ArrayList<>();
                for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) {
                    if (item.getValue() == 1) {
                        relatedNodes.add(item.getKey());
                    }
                }
                throw new IllegalStateException("There are loops in the dependency graph. Related nodes:" + StringUtils.join(relatedNodes));
            }
            return ;
        }
        visitStatusMap.put(node,1);
        log.info("visited:{}", node);
        for (String relateNode : graphAdjMap.get(node)) {
            dfs(graphAdjMap,visitStatusMap,relateNode);
        }
        visitStatusMap.put(node,2);
    }
    

    ThreadLocal怎么处理?

    许多的框架都使用了ThreadLocal来实现Context来保存单次请求中的一些共享数据, Spring也不例外.

    众所周知, ThreadLocal实际是访问Thread中一个特殊Map的入口. ThreadLocal只能访问当前Thread的数据(副本), 如果跨越了线程, 是拿不到到其他ThreadLocalMap的数据的.

    解决方法

    如图

    1. 在当前线程提交异步任务前, 将当前线程ThreadLocal执行的数据"捆绑"到任务实例中
    2. 当任务开始执行时, 从任务实例中取出数据, 恢复到当前异步线程的ThreadLocal中
    3. 当任务结束后, 清理当前异步线程的ThreadLocal.

    这里, 我们先定义一个接口, 来描述这3个动作

    public interface AsyncQueryTaskWrapper {
        /**
         * 任务提交之前执行. 此方法在提交任务的那个线程中执行
         */
        void beforeSubmit();
    
        /**
         * 任务开始执行前执行. 此方法在异步线程中执行
         * @param taskFrom 提交任务的那个线程
         */
        void beforeExecute(Thread taskFrom);
    
        /**
         * 任务执行结束后执行. 此方法在异步线程中执行
         * 注意, 不管用户的方法抛出何种异常, 此方法都会执行.
         * @param taskFrom 提交任务的那个线程
         */
        void afterExecute(Thread taskFrom);
    }
    

    为了让我们定义的3个动作起作用. 我们需要重写一下 java.util.concurrent.Callable#call方法.

    public abstract class AsyncQueryTask<T> implements Callable<T> {
        Thread      taskFromThread;
        AsyncQueryTaskWrapper asyncQueryTaskWrapper;
    
        public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
            this.taskFromThread = taskFromThread;
            this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
        }
    
        @Override
        public T call() throws Exception {
            try {
                if(asyncQueryTaskWrapper != null) {
                    asyncQueryTaskWrapper.beforeExecute(taskFromThread);
                }
                return execute();
            } finally {
                if (asyncQueryTaskWrapper != null) {
                    asyncQueryTaskWrapper.afterExecute(taskFromThread);
                }
            }
        }
    
        /**
         * 提交任务时, 业务方实现这个替代方法
         *
         * @return
         * @throws Exception
         */
        public abstract T  execute() throws Exception;
    }
    

    接下来, 向线程池提交任务时, 不再直接提交Callable匿名类实例, 而是提交AsyncQueryTask实例. 并且在提交前触发 taskWrapper.beforeSubmit();

    AsyncQueryTaskWrapper taskWrapper = new CustomAsyncQueryTaskWrapper();
    // 任务提交前执行动作.
    taskWrapper.beforeSubmit();
    Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) {
        @Override
        public Object execute() throws Exception {
            try {
                // something to do
            } finally {
                stopDownLatch.countDown();
            }
        }
    });
    
    你要做什么?

    你只需要定义一个类, 实现这个接口, 并将这个类加到配置文件中去.

    @Slf4j
    public class CustomAsyncQueryTaskWrapper implements AsyncQueryTaskWrapper {
        /**
         * "捆绑" 在任务实例中的数据
         */
        private Long tenantId;
        private User user;
    
        @Override
        public void beforeSubmit() {
            /* 提交任务前, 先从当前线程拷贝出ThreadLocal中的数据到任务中 */
            log.info("asyncTask beforeSubmit. threadName: {}",Thread.currentThread().getName());
            this.tenantId = RequestContext.getTenantId();
            this.user = ExampleAppContext.getUser();
        }
    
        @Override
        public void beforeExecute(Thread taskFrom) {
            /* 任务提交后, 执行前, 在异步线程中用数据恢复ThreadLocal(Context) */
            log.info("asyncTask beforeExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
            RequestContext.setTenantId(tenantId);
            ExampleAppContext.setLoggedUser(user);
        }
    
        @Override
        public void afterExecute(Thread taskFrom) {
            /* 任务执行完成后, 清理异步线程中的ThreadLocal(Context) */
            log.info("asyncTask afterExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
            RequestContext.removeTenantId();
            ExampleAppContext.remove();
        }
    }
    

    添加配置使TaskWapper生效.

    io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.example.wrapper.CustomAsyncQueryTaskWrapper
    

    怎么监控每一次的异步调用?

    解决办法

    我们先把一次查询, 分为以下几个生命周期

    • 查询任务初次提交 (querySubmitted)
    • 某一个Provider节点开始执行前 (queryBefore)
    • 某一个Provider节点执行完成后 (queryAfter)
    • 查询全部完成 (queryFinished)
    • 查询异常 (exceptionHandle)

    转换成接口如下

    public interface AggregateQueryInterceptor {
        /**
         * 查询正常提交, Context已经创建
         *
         * @param aggregationContext 查询上下文
         * @return 返回为true才继续执行
         */
        boolean querySubmitted(AggregationContext aggregationContext) ;
    
        /**
         * 每个Provider方法执行前, 将调用此方法. 存在并发调用
         *
         * @param aggregationContext 查询上下文
         * @param provideDefinition 将被执行的Provider
         */
        void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition);
    
        /**
         * 每个Provider方法执行成功之后, 调用此方法. 存在并发调用
         *
         * @param aggregationContext 查询上下文
         * @param provideDefinition 被执行的Provider
         * @param result 查询结果
         * @return 返回结果, 如不修改不, 请直接返回参数中的result
         */
        Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result);
    
        /**
         * 每个Provider执行时, 如果抛出异常, 将调用此方法. 存在并发调用
         *
         * @param aggregationContext  查询上下文
         * @param provideDefinition 被执行的Provider
         * @param e Provider抛出的异常
         */
        void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e);
    
        /**
         * 一次查询全部完成.
         *
         * @param aggregationContext 查询上下文
         */
        void queryFinished(AggregationContext aggregationContext);
    }
    

    在Spring应用启动之初, 获取所有实现了AggregateQueryInterceptor接口的Bean, 并按照Order注解排序, 作为拦截器链.

    至于拦截器如何执行. 很简单, 在递归提交查询任务时, 插入执行一些钩子(hook)函数即可. 涉及到的代码很多, 就不贴在这里, 感兴趣的可以去github clone代码查看.

    你要做什么?

    你可以实现一个拦截器, 在拦截器中输出日志, 监控节点执行状态(耗时, 出入参), 如下:

    @Component
    @Order(2)
    @Slf4j
    public class SampleAggregateQueryInterceptor implements AggregateQueryInterceptor {
        @Override
        public boolean querySubmitted(AggregationContext aggregationContext) {
            log.info("begin query. root:{}",aggregationContext.getRootProvideDefinition().getMethod().getName());
            return true;
        }
    
        @Override
        public void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition) {
            log.info("query before. provider:{}",provideDefinition.getMethod().getName());
        }
    
        @Override
        public Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result) {
            log.info("query after. provider:{},result:{}",provideDefinition.getMethod().getName(),result.toString());
            return result;
        }
    
        @Override
        public void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e) {
            log.error(e.getMessage());
        }
    
        @Override
        public void queryFinished(AggregationContext aggregationContext) {
            log.info("query finish. root: {}",aggregationContext.getRootProvideDefinition().getMethod().getName());
        }
    }
    

    项目地址

    最后, 再次贴一下项目地址: . spring-boot-data-aggregator

    欢迎拍砖, 欢迎star, 欢迎使用

  • 相关阅读:
    【git】强制覆盖本地代码(与git远程仓库保持一致)
    ffmpeg CLI常用命令
    旧机改造步骤
    macbook air 2012 mid 安装 windows10 双系统遇到错误 no bootable device insert boot disk and press any key
    window、Linux 文本文件转换
    phalcon bug: model的findFirst会自动忽略一些空格
    oss2罗列所有文件
    如何让linux的history命令显示时间记录
    nginx 常用配置
    shell脚本 切换用户
  • 原文地址:https://www.cnblogs.com/lvyahui/p/12169119.html
Copyright © 2011-2022 走看看