zoukankan      html  css  js  c++  java
  • hystrix总结之请求批量执行

      hystrix可以将同一个命令的多次执行合并到一起执行。

    public class HelloWorldCommandCollapser extends HystrixCollapser<List<String>,String,String> {
        private String name;
        public HelloWorldCommandCollapser(String name){
            this.name = name;
        }
        @Override
        public String getRequestArgument() {
            return name;
        }
        @Override
        protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) {
            return new BatchHystrixCommand(collapsedRequests);
        }
        @Override
        protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) {
            int i =0;
            for(CollapsedRequest collapsedRequest:collapsedRequests){
                collapsedRequest.setResponse(batchResponse.get(i));
                i++;
            }
        }
        private class BatchHystrixCommand extends HystrixCommand{
            private Collection<CollapsedRequest<String, String>> collapsedRequests;
            public BatchHystrixCommand(Collection<CollapsedRequest<String, String>> collapsedRequests){
                super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
                this.collapsedRequests =collapsedRequests;
            }
            @Override
            protected Object run() throws Exception {
                List<String> result = new ArrayList<String>();
                for(CollapsedRequest collapsedRequest:collapsedRequests){
                    result.add("helloworld:"+collapsedRequest.getArgument());
                }
                return result;
            }
        }

      方法调用

    HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try{
                String result1 = new HelloWorldCommandCollapser("one").execute();
                String result2 = new HelloWorldCommandCollapser("two").execute();
                String result3 = new HelloWorldCommandCollapser("three").execute();
                String result4 = new HelloWorldCommandCollapser("four").execute();
                String result5 = new HelloWorldCommandCollapser("five").execute();
                String result6 = new HelloWorldCommandCollapser("six").execute();
                System.out.println(result1);
                System.out.println(result2);
                System.out.println(result3);
                System.out.println(result4);
                System.out.println(result5);
                System.out.println(result6);
            }finally {
                context.shutdown();
            }

      继承HystrixCollapser的命令,命令将会被集合到一起,当数量或时间到达设定的触发点时,统一执行。

      getRequestArgument 获取请求参数,命令执行时,实际是将该方法的参数设置到批量执行对象中。

      createCommand 批量执行对象通过该方法获得实际执行批量的命令,并返回结果。

      mapResponseToRequests 批量执行对象获得执行结果后,将结果与请求进行匹配。

      本质原理如下:

      当执行继承HystrixCollapser方法时,命令不会被实际执行,会获取getRequestArgument获得执行参数,添加到批量执行的对象中去。

    public Observable<ResponseType> toObservable(Scheduler observeOn) {
            return Observable.defer(new Func0<Observable<ResponseType>>() {
                @Override
                public Observable<ResponseType> call() {
                   ...
                    RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
                    Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
    ...
                    return response;
                }
            });
        }

      RequestCollapser是批量执行的对象,它有两种作用域,一个是全局范围,一个是一个请求范围内。全局范围通过今天变量实现,一个请求范围通过HystrixRequestVariableHolder实现。  

      当向RequestCollapser添加参数时,当参数到达一定数量时,就会执行批量。

    public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
            ...
            while (true) {
                final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
                ...final Observable<ResponseType> response;
                if (arg != null) {
                    response = b.offer(arg);
                } else {
                    response = b.offer( (RequestArgumentType) NULL_SENTINEL);
                }
                //如果到达一定数量,respose返回null
                if (response != null) {
                    return response;
                } else {
                    //执行批量
                    createNewBatchAndExecutePreviousIfNeeded(b);
                }
            }
        }

      RequestCollapser内部有一个定时器,每个一定时间就会批量执行并返回结果。  

    private class CollapsedTask implements TimerListener {
            final Callable<Void> callableWithContextOfParent;
            CollapsedTask() {
                callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        ...
                RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
                if (currentBatch != null && currentBatch.getSize() > 0) {
                  createNewBatchAndExecutePreviousIfNeeded(currentBatch);
                }
                ... } }); } @Override public void tick() { ...
            callableWithContextOfParent.call();
           ...
    } @Override public int getIntervalTimeInMilliseconds() { return properties.timerDelayInMilliseconds().get(); } }

      批量执行

    public void executeBatchIfNotAlreadyStarted() {
         ...
                try {
                  Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
                    for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
                        try {
                            Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);//获取批量执行结果
                  //批量执行结果映射到执行请求中
                            commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
                   ...
    }).doOnCompleted(new Action0() {                ... }).subscribe(); } catch (Exception e) { ... } } } catch (Exception e) { ... } finally { batchLock.writeLock().unlock(); } } }
  • 相关阅读:
    VERSIONINFO Resource
    WCF 学习文摘
    hook 学习
    COM 学习
    ActiveX开发
    Word 开发资料集合
    Loops with PL/SQL
    TWain 在 Qt4 中的调用方法
    从 TWAIN 设备中扫描图像
    Qt enum使用总结
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/8271618.html
Copyright © 2011-2022 走看看