zoukankan      html  css  js  c++  java
  • Hystrix的工作原理

    一、简介

      hystrix经常被我们用于服务的熔断,降级等领域,基于RxJava(一种基于观察者模式的响应式编程框架)实现,具备服务降级、服务熔断、线程与信号隔离、请求缓存、请求合并以及服务监控等强大功能。

    二、基本原理

      当我们需要调用某个方法时(一般是远程调用),通过 Hystrix 将方法调用包裹起来,交由 Hystrix 来完成,从而享受 Hystrix 带来保护。

    Hystrix 提供了两个请求命令:HystrixCommand、HystrixObservableCommand,可以使用这两个对象来包裹待执行的任务。

    HystrixCommand用在依赖服务返回单个操作结果的时候:

      execute():同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。

      queue():异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。

    HystrixObservableCommand用在依赖服务返回多个操作结果的时候:

      observe():返回Obervable对象,他代表了操作的多个结果,它是一个Hot Observable。

      toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。

    三、基本用法

      hystrix可以使用手动自定义Command、注解、结合feign的方式来实现,手动创建和结合feign的这里就不介绍了,主要看一下注解的实现方式。

    1、开启hystrix,启动类上加上@EnableHystrix注解

    2、在需要降级的方法上使用@HystrixCommand

    public class UserServiceImpl{
       @Autowired    private UserDao userDao;
    @HystrixCommand(fallbackMethod
    = "getUserNameFallBack") @Override public String getUserName(String userId){ int i = 1/0 return userDao.getNameById(userId); } public String getUserNameFallBack(String userId){ return "服务暂时不可用,请稍后再试"; } }

    四、初始化

    从@EnableHystrix注解看起 

    //开启EnableCircuitBreaker
    @EnableCircuitBreaker
    public @interface EnableHystrix {
    }
    //导入EnableCircuitBreakerImportSelector
    @Import(EnableCircuitBreakerImportSelector.class)
    public @interface EnableCircuitBreaker {
    }

    在注解上又开启了一个注解@EnableCircuitBreaker,并导入了一个Selector

    @Order(Ordered.LOWEST_PRECEDENCE - 100)
    public class EnableCircuitBreakerImportSelector
            extends SpringFactoryImportSelector<EnableCircuitBreaker> {
    
        @Override
        protected boolean isEnabled() {
            return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
                    Boolean.class, Boolean.TRUE);
        }
    
    }

    这是hystrix生效的一个关键点,继承了SpringFactoryImportSelector,此类在初始化后,会执行selectImports(AnnotationMetadata metadata)的方法。此方法会根据注解启动的注解(这里指@EnableCircuitBreaker)从spring.factories文件中获取其配置需要初始化@Configuration类,看下关键代码

            List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
                    .loadFactoryNames(this.annotationClass, this.beanClassLoader)));

    看一下spring.factories文件

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
    org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
    org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
    org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
    
    org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
    org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

    配置类HystrixCircuitBreakerConfiguration

    @Configuration(proxyBeanMethods = false)
    public class HystrixCircuitBreakerConfiguration {
    
        @Bean
        public HystrixCommandAspect hystrixCommandAspect() {
            return new HystrixCommandAspect();
        }
    
    }

    向spring注入了HystrixCommandAspect

    @Aspect
    public class HystrixCommandAspect {
    
        private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
    
        static {
            META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                    .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                    .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                    .build();
        }
        
        //切点是所有使用了HystrixCommand注解的地方
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
        public void hystrixCommandAnnotationPointcut() {
        }
        //切点是所有使用了HystrixCollapser注解的地方
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
        public void hystrixCollapserAnnotationPointcut() {
        }
        
        //环绕通知
        @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
        public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
            Method method = getMethodFromTarget(joinPoint);
            Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
            //两个注解不能同时作用
            if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
                throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                        "annotations at the same time");
            }
            //META_HOLDER_FACTORY_MAP预先初始化了两个工厂类
            //@HystrixCommand:CommandMetaHolderFactory
            //@HystrixCollapser:CollapserMetaHolderFactory
            MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
            //创建,把切点封装进了MetaHolder
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            // 创建HystrixInvokable,只是一个空接口,没有任何方法,只是用来标记具备可执行的能力
            // 具体的执行由实现类来做
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            //执行类型
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                    metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
    
            Object result;
            try {
                if (!metaHolder.isObservable()) {
                    // 利用工具CommandExecutor来执行
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = executeObservable(invokable, executionType, metaHolder);
                }
            } catch (HystrixBadRequestException e) {
                throw e.getCause();
            } catch (HystrixRuntimeException e) {
                throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
            }
            return result;
        }
    
       ···
    
    }

    通过AOP编程,创建方法的代理,决定执行何种逻辑

    创建参数包装类

    public MetaHolder create(final ProceedingJoinPoint joinPoint) {
                //获取方法对象
                Method method = getMethodFromTarget(joinPoint);
                //目标对象
                Object obj = joinPoint.getTarget();
                //方法的参数列表
                Object[] args = joinPoint.getArgs();
                //代理对象
                Object proxy = joinPoint.getThis();
                //调用子类create方法
                return create(proxy, method, obj, args, joinPoint);
            }
            
            public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
                //获取HystrixCommand注解的信息
                HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
                //判断方法的返回值类型Future:异步,Observable:rxjava中的被观察者,其他:同步
                ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
                //获取MetaHolder的builder对象
                MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
                if (isCompileWeaving()) {
                    builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
                }
                //建造者模式,创建MetaHolder包装类
                return builder.defaultCommandKey(method.getName())
                                .hystrixCommand(hystrixCommand)
                                .observableExecutionMode(hystrixCommand.observableExecutionMode())
                                .executionType(executionType)
                                .observable(ExecutionType.OBSERVABLE == executionType)
                                .build();
            }

    创建命令执行器

        public HystrixInvokable create(MetaHolder metaHolder) {
            HystrixInvokable executable;
            //@HystrixCollapser
            if (metaHolder.isCollapserAnnotationPresent()) {
                executable = new CommandCollapser(metaHolder);
            //@HystrixCommand 并且 返回值类型是Observable
            } else if (metaHolder.isObservable()) {
                executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
            } else {
                //其他情况 把HystrixCommandBuilder封装进GenericCommand
                executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
            }
            return executable;
        }
        public GenericCommand(HystrixCommandBuilder builder) {
            super(builder);
        }
        
        protected AbstractHystrixCommand(HystrixCommandBuilder builder) {
            super(builder.getSetterBuilder().build());
            //命令形式 包含需要执行的方法 fallback方法
            this.commandActions = builder.getCommandActions();
            this.collapsedRequests = builder.getCollapsedRequests();
            this.cacheResultInvocationContext = builder.getCacheResultInvocationContext();
            this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext();
            this.ignoreExceptions = builder.getIgnoreExceptions();
            //执行类型 ASYNCHRONOUS SYNCHRONOUS OBSERVABLE
            this.executionType = builder.getExecutionType();
        }
        
        public HystrixCommand.Setter build() throws HystrixPropertyException {
            //分组key:类名,命令key:方法名,线程池key
            HystrixCommand.Setter setter = HystrixCommand.Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                    .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
            if (StringUtils.isNotBlank(threadPoolKey)) {
                setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
            }
            try {
                //初始化线程池的配置
                setter.andThreadPoolPropertiesDefaults(HystrixPropertiesManager.initializeThreadPoolProperties(threadPoolProperties));
            } catch (IllegalArgumentException e) {
                throw new HystrixPropertyException("Failed to set Thread Pool properties. " + getInfo(), e);
            }
            try {
                //初始化命令执行配置
                setter.andCommandPropertiesDefaults(HystrixPropertiesManager.initializeCommandProperties(commandProperties));
            } catch (IllegalArgumentException e) {
                throw new HystrixPropertyException("Failed to set Command properties. " + getInfo(), e);
            }
            return setter;
        }

    执行命令

        public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder)
                                               throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { //转换成子接口HystrixExecutable return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode()
                                    ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } } public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }

    HystrixCommand的四种执行方式

     

    测试类

    public class HystrixCommandTest {
    
        private <T> com.netflix.hystrix.HystrixCommand<T> createCommand(T message) {
            com.netflix.hystrix.HystrixCommand.Setter setter = com.netflix.hystrix.HystrixCommand.Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("serviceA"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("methodA"));
            return new com.netflix.hystrix.HystrixCommand<T>(setter) {
                @Override
                protected T run() throws Exception {
                    System.out.println("HystrixCommand执行了!!!" + System.currentTimeMillis());
                    return message;
                }
            };
        }
    
        @Test
        public void test01() {
            com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test01");
            System.out.println(command.execute());
        }
    
    
        @Test
        public void test02() throws ExecutionException, InterruptedException {
            com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test02");
            Future<String> f = command.queue();
            System.out.println("queue之后,command执行:" + System.currentTimeMillis());
            Thread.sleep(1000);
            System.out.println(f.get());
        }
    
        @Test
        public void test03() throws InterruptedException {
            HystrixCommand<String> command = createCommand("this is test03");
            // observe直接执行run方法,称为Hot Observable
            Observable<String> observe = command.observe();
            System.out.println("observe之后,command执行:" + System.currentTimeMillis());
            Thread.sleep(1000);
            observe.subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("通过订阅,获取执行结果:" + s);
                }
            });
        }
    
        @Test
        public void test04() throws InterruptedException {
            HystrixCommand<String> command = createCommand("this is test04");
            // toObservable不直接执行run方法
            Observable<String> observe = command.toObservable();
            System.out.println("未订阅,command不执行:" + System.currentTimeMillis());
            Thread.sleep(1000);
            observe.subscribe();
            System.out.println("订阅后,command执行了" + System.currentTimeMillis());
            Thread.sleep(1000);
        }
    
        @Test
        public void test05() throws InterruptedException, ExecutionException {
            HystrixCommand<String> command = createCommand("this is test05");
            // toObservable不直接执行run方法
            Future<String> f = command.toObservable().toBlocking().toFuture();
            System.out.println("转成future执行:" + System.currentTimeMillis());
            Thread.sleep(1000);
            System.out.println(f.get());
        }
    }

    回到hystrix源码中queue()方法

        public Future<R> queue() {
            // toObservable转换为Observable
            // toBlocking转换为BlockingObservable
            // toFuture转换为Future
            // 完成了Observable的创建和订阅
            final Future<R> delegate = toObservable().toBlocking().toFuture();
            // 代理future,对于cancel操作做特殊处理
            // 因为toObservable().toBlocking().toFuture()返回的future无法通过cancel方法中断执行线程。
            final Future<R> f = new Future<R>() {
    
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    if (delegate.isCancelled()) {
                        return false;
                    }
                    // 如果 execution.isolation.thread.interruptOnFutureCancel = true(默认false)
                    if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                        // 设置标志位
                        interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                    }
                    // 执行目标future的cancel
                    final boolean res = delegate.cancel(interruptOnFutureCancel.get());
                    // 如果command还没执行完成 且 需要中断执行的线程
                    if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                        // 获取执行线程
                        final Thread t = executionThread.get();
                        // 执行线程非当前线程则中断线程
                        if (t != null && !t.equals(Thread.currentThread())) {
                            t.interrupt();
                        }
                    }
    
                    return res;
                }
    
                ```
                
            };
    
            //判断是否执行完成
            if (f.isDone()) {
                try {
                    //获取结果
                    f.get();
                    return f;
                } catch (Exception e) {
                    ···
                }
            }
    
            return f;
        }

    这里用到了RxJava框架的响应式编程,会执行到具体Command(之前封装的GenericCommand)的run方法

    public class GenericCommand extends AbstractHystrixCommand<Object> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);
    
        public GenericCommand(HystrixCommandBuilder builder) {
            super(builder);
        }
    
        //执行目标的方法
        @Override
        protected Object run() throws Exception {
            LOGGER.debug("execute command: {}", getCommandKey().name());
            return process(new Action() {
                @Override
                Object execute() {
                    return getCommandAction().execute(getExecutionType());
                }
            });
        }
        
        //执行fallback方法
        @Override
        protected Object getFallback() {
            final CommandAction commandAction = getFallbackAction();
            if (commandAction != null) {
                try {
                    return process(new Action() {
                        @Override
                        Object execute() {
                            MetaHolder metaHolder = commandAction.getMetaHolder();
                            Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                            return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                        }
                    });
                } catch (Throwable e) {
                    LOGGER.error(FallbackErrorMessageBuilder.create()
                            .append(commandAction, e).build());
                    throw new FallbackInvocationException(unwrapCause(e));
                }
            } else {
                return super.getFallback();
            }
        }
    
    }

    参考链接:https://blog.csdn.net/alex_xfboy/article/details/89844066

    参考链接:https://juejin.cn/column/6960847703521624094

  • 相关阅读:
    使用C++ 实现的 websocket 客户端 (基于easywsclient)
    ant打包报错 JRE version less than 1.8 is not suppored
    离线安装SVN 4.2.3
    maven项目使用oracle11g
    springboot 新工程报错 Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.
    IP与域名绑定
    web项目如果省略端口
    Linux源码安装Python3.7服务
    Linux yum软件包安装、管理与使用
    RPM软件包管理与使用
  • 原文地址:https://www.cnblogs.com/sglx/p/15771838.html
Copyright © 2011-2022 走看看