zoukankan      html  css  js  c++  java
  • hystrix源码之请求合并

    请求合并

      使用HystrixObservableCollapser可以将参数不同,但执行过程相同的调用合并执行。当调用observe、toObservable方法时,会向RequestCollapser提交getRequestArgument方法获取的参数。用户需要实现getRequestArgument方法来设定请求参数,RequestCollapser是执行批量的类。

        public Observable<ResponseType> toObservable(Scheduler observeOn) {
            return Observable.defer(new Func0<Observable<ResponseType>>() {
                @Override
                public Observable<ResponseType> call() {
                    final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get();
                    //是否开启缓存,如果开启缓存并且实现了getCacheKey方法,则首先从HystrixRequestCache获取结果数据
                    if (isRequestCacheEnabled) {
                        HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey());
                        if (fromCache != null) {
                            metrics.markResponseFromCache();
                            return fromCache.toObservable();
                        }
                    }
               //获取合并请求对象,提交请求
                    RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
                    Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
                    metrics.markRequestBatched();
            // 是否开启缓存,如果开启缓存并且实现了getCacheKey方法,将结果放到HystrixRequestCache
    if (isRequestCacheEnabled) { HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response); HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache); if (fromCache == null) { return toCache.toObservable(); } else { return fromCache.toObservable(); } } return response; } }); }

    RequestCollapserFactory

      创建RequestCollapser,如果是GLOBAL作用域,会创建一个RequestCollapser,并且使用一个static的ConcurrentHashMap作为一个全局的缓存。如果是REQUEST作用域,会创建一个RequestCollapser,使用HystrixRequestVariableHolder来存储。

    复制代码
    private static ConcurrentHashMap<String, RequestCollapser<?, ?, ?>> globalScopedCollapsers = new ConcurrentHashMap<String, RequestCollapser<?, ?, ?>>();
    private static ConcurrentHashMap<String, HystrixRequestVariableHolder<RequestCollapser<?, ?, ?>>> requestScopedCollapsers = new ConcurrentHashMap<String, HystrixRequestVariableHolder<RequestCollapser<?, ?, ?>>>();
    
    public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) {
            if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {
                return getCollapserForUserRequest(commandCollapser);
            } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {
                return getCollapserForGlobalScope(commandCollapser);
            } else {
                logger.warn("Invalid Scope: {}  Defaulting to REQUEST scope.", getScope());
                return getCollapserForUserRequest(commandCollapser);
            }
        }
    复制代码

     RequestCollapser

       内部有一个RequestBatch对象,代表当前的一批请求,当定时器到达一定间隔,会执行该批量请求对象中所有的请求,然后新建一个RequestBatch对象。

    
    

    ...
    private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();
    ....

    private class CollapsedTask implements TimerListener {
            final Callable<Void> callableWithContextOfParent;
    
            CollapsedTask() {
                // this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
                // so we create the callable now where we can capture the thread context
                callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
                    // the wrapCallable call allows a strategy to capture thread-context if desired
    
                    @Override
                    public Void call() throws Exception {
                        try {
                            // we fetch current so that when multiple threads race
                            // we can do compareAndSet with the expected/new to ensure only one happens
                            RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
                            // 1) it can be null if it got shutdown
                            // 2) we don't execute this batch if it has no requests and let it wait until next tick to be executed
                            if (currentBatch != null && currentBatch.getSize() > 0) {
                                // do execution within context of wrapped Callable
                                createNewBatchAndExecutePreviousIfNeeded(currentBatch);
                            }
                        } catch (Throwable t) {
                            logger.error("Error occurred trying to execute the batch.", t);
                            t.printStackTrace();
                            // ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
                        }
                        return null;
                    }
    
                });
            }
    
            @Override
            public void tick() {
                try {
                    callableWithContextOfParent.call();
                } catch (Exception e) {
                    logger.error("Error occurred trying to execute callable inside CollapsedTask from Timer.", e);
                    e.printStackTrace();
                }
            }
    
            @Override
            public int getIntervalTimeInMilliseconds() {
                return properties.timerDelayInMilliseconds().get();
            }
    
        }

       使用HystrixContextCallable是为了batch执行共享父亲HystrixRequestContext。

    @Override
        public K call() throws Exception {
            HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
            try {
                // set the state of this thread to that of its parent
                HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
                // execute actual Callable with the state of the parent
                return actual.call();
            } finally {
                // restore this thread back to its original state
                HystrixRequestContext.setContextOnCurrentThread(existingState);
            }
        }

     RequestBatch

      内部有一个ConcurrentMap来存储请求对象。key为:请求参数对象,value为CollapsedRequest对象。

    private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =
                new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();

       当执行批量请求方法时,遍历存储的CollapsedRequest对象,通过commandCollapser的createObservableCommand方法创建执行批量请求的命令,通过commandCollapser的mapResponseToRequests方法将执行结果和请求进行匹配。

    try {
                    // shard batches
                    Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
                    // for each shard execute its requests 
                    for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
                        try {
                            // create a new command to handle this batch of requests
                            Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
    
                            commandCollapser.mapResponseToRequests(o, shardRequests)
    ...

      

  • 相关阅读:
    OGG常用命令
    postgres psql常用命令学习笔记
    oracle DG搭建方式两种总结
    配置rhel系统kdump安装RHEL的debuginfo软件包
    oracle开机自启,监听自启,任意秒crontab
    cx_Oracle.DatabaseError: DPI-1047: 64-bit Oracle Client library cannot be loaded 解决方法
    rhel | centos7上配置python3环境和pip
    shared_pool知识点整理
    记一次性能测试实践3-单接口压测
    我是如何做性能测试-文档收集并深入学习
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/7851283.html
Copyright © 2011-2022 走看看