zoukankan      html  css  js  c++  java
  • 【Hystrix】实现服务隔离和降级

    一、背景

      在今天,基于SOA的架构已经大行其道。伴随着架构的SOA化,相关联的服务熔断、降级、限流等思想,也在各种技术讲座中频繁出现。
      伴随着业务复杂性的提高,系统的不断拆分,一个面向用户端的API,其内部的RPC调用层层嵌套,调用链条可能会非常长。这会造成以下问题:

    • API接口可用性降低:引用Hystrix官方的一个例子,假设tomcat对外提供的一个application,其内部依赖了30个服务,每个服务的可用性都很高,为99.99%。那整个applicatiion的可用性就是:99.99%的30次方 = 99.7%,即0.3%的失败率。这也就意味着,每1亿个请求,有30万个失败;按时间来算,就是每个月的故障时间超过2小时。

    1.1 服务熔断

      为了解决上述问题,服务熔断的思想被提出来。类似现实世界中的“保险丝“,当某个异常条件被触发,直接熔断整个服务,而不是一直等到此服务超时。 熔断的触发条件可以依据不同的场景有所不同,比如统计一个时间窗口内失败的调用次数。

    1.2 服务降级

      有了熔断,就得有降级。所谓降级,就是当某个服务熔断之后,服务器将不再被调用,此时客户端可以自己准备一个本地的fallback回调,返回一个缺省值。 这样做,虽然服务水平下降,但好歹可用,比直接挂掉要强,当然这也要看适合的业务场景。关于Hystrix中fallback的使用,见官网

    1.3 服务隔离

    1. 雪崩效应:服务雪崩效应产生服务堆积在同一个线程池中,因为在同一个线程池中,所有请求全部到一个服务进行访问,这时候会导致其他服务没有线程接收请求访问,所以就会产生服务雪崩效应。
    2. Tomcat底层:http+线程池,每个线程都是独立的请求。
    3. 当大多数人在使用Tomcat时,多个http服务会共享一个线程池,假设其中一个http服务访问的数据库响应非常慢,这将造成服务响应时间延迟增加,大多数线程阻塞等待数据响应返回,导致整个Tomcat线程池都被该服务占用,甚至拖垮整个Tomcat。因此,如果我们能把不同http服务隔离到不同的线程池,则某个http服务的线程池满了也不会对其他服务造成灾难性故障。这就需要线程隔离或者信号量隔离来实现了。
    4. 服务隔离:每个服务接口互不影响,服务隔离有两种实现方式线程池方式计数器
    5. 作用:服务保护,当服务产生堆积的时候,对服务实现保护功能。(堆积请求:假设默认tomcat最大线程线程池是50。尝试第51一个请求,第51个请求会阻塞。大量请求正在等待,如果堆积请求过多,可能会造成服务器瘫痪。)
    6. 使用线程隔离或信号隔离的目的是为不同的服务分配一定的资源,当自己的资源用完,直接返回失败而不是占用别人的资源。

    1.4 总结

    1. 服务隔离:保证每个服务互不影响,使用信号量线程池方式
    2. 服务降级:当服务不可用的时候,不会被等待,直接返回一个友好的提示
    3. 服务熔断:当服务器达到最大的承受能的之后,直接拒绝访问服务,最会调用服务降级方法,返回友好提示。目的是保证服务不被宕机掉

    二、使用Hystrix实现服务隔离和降级

    2.1 Hytrix 简介

    • Hystrix 是一个微服务关于服务保护的框架,是Netflix开源的一款针对分布式系统的延迟和容错解决框架,目的是用来隔离分布式服务故障。它提供线程和信号量隔离,以减少不同服务之间资源竞争带来的相互影响;提供优雅降级机制;提供熔断机制使得服务可以快速失败,而不是一直阻塞等待服务响应,并能从中快速恢复。Hystrix通过这些机制来阻止级联失败并保证系统弹性、可用。
    • Hystrix的资源隔离策略有两种,分别为:线程池和信号量

    2.2 线程池方式

    1. 使用线程池隔离可以完全隔离第三方应用,请求线程可以快速放回。
    2. 请求线程可以继续接受新的请求,如果出现问题线程池隔离是独立的不会影响其他应用。
    3. 当失败的应用再次变得可用时,线程池将清理并可立即恢复,而不需要一个长时间的恢复。
    4. 独立的线程池提高了并发性

    缺点: 线程池隔离的主要缺点是它们增加计算开销(CPU)。每个命令的执行涉及到排队、调度和上下文切换都是在一个单独的线程上运行的。

    public class OrderHystrixCommand extends HystrixCommand<JSONObject> {
        @Autowired
        private MemberService memberService;
    
        /**
         * @param group
         */
        public OrderHystrixCommand(MemberService memberService) {
            super(setter());
            this.memberService = memberService;
        }
    
        protected JSONObject run() throws Exception {
            JSONObject member = memberService.getMember();
            System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
            return member;
        }
    
        private static Setter setter() {
    
            // 服务分组
            HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("orders");
            // 服务标识
            HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("order");
            // 线程池名称
            HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("order-pool");
            // #####################################################
            // 线程池配置 线程池大小为10,线程存活时间15秒 队列等待的阈值为100,超过100执行拒绝策略
            HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(10)
                    .withKeepAliveTimeMinutes(15).withQueueSizeRejectionThreshold(100);
            // ########################################################
            // 命令属性配置Hystrix 开启超时
            HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                    // 采用线程池方式实现服务隔离
                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                    // 禁止
                    .withExecutionTimeoutEnabled(false);
            return HystrixCommand.Setter.withGroupKey(groupKey).andCommandKey(commandKey).andThreadPoolKey(threadPoolKey)
                    .andThreadPoolPropertiesDefaults(threadPoolProperties).andCommandPropertiesDefaults(commandProperties);
    
        }
        
    //############   服务降级  ##########
        @Override
        protected JSONObject getFallback() {
            // 如果Hystrix发生熔断,当前服务不可用,直接执行Fallback方法
            System.out.println("系统错误!");
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("code", 500);
            jsonObject.put("msg", "系统错误!");
            return jsonObject;
        }
        }
    
    

    应用场景:

    1. 第三方应用或者接口
    2. 并发量大

    2.3 信号量

    • 计数器方式:底层使用原子计数器,针对于每个服务:都设置自己独立限制阈值,比如设置每个服务接口最多同时只能访问50次,超出缓存队列请求后,自己实现拒绝策略。
    • 使用一个原子计数器(或信号量)来记录当前有多少个线程在运行,当请求进来时先判断计数器的数值,若超过设置的最大线程个数则拒绝该请求,若不超过则通行,这时候计数器+1,请求返回成功后计数器-1。
    • 与线程池隔离最大不同在于执行依赖代码的线程依然是请求线程,信号量的大小可以动态调整, 线程池大小不可以

    代码如下:

    public class OrderHystrixCommand2 extends HystrixCommand<JSONObject> {
        @Autowired
        private MemberService memberService;
    
        /**
         * @param group
         */
        public OrderHystrixCommand2(MemberService memberService) {
            super(setter());
            this.memberService = memberService;
        }
    
        protected JSONObject run() throws Exception {
    
            // Thread.sleep(500);
            // System.out.println("orderIndex线程名称" +
            // Thread.currentThread().getName());
            // System.out.println("success");
            JSONObject member = memberService.getMember();
            System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
            return member;
        }
    
        private static Setter setter() {
            // 服务分组
            HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("order");
            // 命令属性配置 采用信号量模式
            HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                    // 使用一个原子计数器(或信号量)来记录当前有多少个线程在运行,当请求进来时先判断计数
                    // 器的数值,若超过设置的最大线程个数则拒绝该请求,若不超过则通行,这时候计数器+1,请求返 回成功后计数器-1。
                    .withExecutionIsolationSemaphoreMaxConcurrentRequests(50);
            return HystrixCommand.Setter.withGroupKey(groupKey).andCommandPropertiesDefaults(commandProperties);
        }
        
    //############   服务降级  ##########
        @Override
        protected JSONObject getFallback() {
            // 如果Hystrix发生熔断,当前服务不可用,直接执行Fallback方法
            System.out.println("系统错误!");
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("code", 500);
            jsonObject.put("msg", "系统错误!");
            return jsonObject;
        }
        }
    
    

    应用场景:

    1. 内部应用或者中间件(redis)
    2. 并发需求不大

    三、项目搭建

    需求:搭建一套分布式rpc远程通讯案例:比如订单服务调用会员服务实现服务隔离,防止雪崩效应案例

    3.1 订单工程

    1. 引入Maven依赖

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.0.RELEASE</version>
        </parent>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
            <dependency>
                <groupId>com.netflix.hystrix</groupId>
                <artifactId>hystrix-metrics-event-stream</artifactId>
                <version>1.5.12</version>
            </dependency>
            <dependency>
                <groupId>com.netflix.hystrix</groupId>
                <artifactId>hystrix-javanica</artifactId>
                <version>1.5.12</version>
            </dependency>
    </dependencies>
    

    2. Service

    @Service
    public class MemberService {
    public JSONObject getMember() {
        JSONObject result = HttpClientUtils.httpGet("http://127.0.0.1:8081/member/memberIndex");
        return result;
    }
    }
    

    3.Controller

    @RestController
    @RequestMapping("/order")
    public class OrderController {
        @Autowired
        private MemberService memberService;
    
        /**
         * 订单服务会调用会员服务,底层使用rpc远程调用的方式
         * @return
         * @throws InterruptedException
         */
        @RequestMapping("/orderIndex")
        public Object orderIndex() throws InterruptedException {
            JSONObject member = memberService.getMember();
            //线程池名称+线程池id组合
            System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
            return member;
        }
    
        // 订单服务调用会员服务,解决服务雪崩效应,采用线程池的方式
        @RequestMapping("/orderIndexHystrix")
        public Object orderIndexHystrix() throws InterruptedException {
            return new OrderHystrixCommand(memberService).execute();
        }
        // 订单服务调用会员服务,解决服务雪崩效应,采用信号量的方式
        @RequestMapping("/orderIndexHystrix2")
        public Object orderIndexHystrix2() throws InterruptedException {
            return new OrderHystrixCommand2(memberService).execute();
        }
    
        @RequestMapping("/findOrderIndex")
        public Object findIndex() {
            System.out.println("当前线程:" + Thread.currentThread().getName() + ",findOrderIndex");
            return "findOrderIndex";
        }
    }    
    

    4.配置文件

    server:
      port: 8080
      #线程池中的最大线程数为20
      tomcat:
        max-threads: 20
    

    5.工具类

    public class HttpClientUtils {
        private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志记录
    
    private static RequestConfig requestConfig = null;
    
    static {
        // 设置请求和传输超时时间
        requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build();
    }
    
    /**
     * post请求传输json参数
     * 
     * @param url
     *            url地址
     * @param json
     *            参数
     * @return
     */
    public static JSONObject httpPost(String url, JSONObject jsonParam) {
        // post请求返回结果
        CloseableHttpClient httpClient = HttpClients.createDefault();
        JSONObject jsonResult = null;
        HttpPost httpPost = new HttpPost(url);
        // 设置请求和传输超时时间
        httpPost.setConfig(requestConfig);
        try {
            if (null != jsonParam) {
                // 解决中文乱码问题
                StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8");
                entity.setContentEncoding("UTF-8");
                entity.setContentType("application/json");
                httpPost.setEntity(entity);
            }
            CloseableHttpResponse result = httpClient.execute(httpPost);
            // 请求发送成功,并得到响应
            if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                String str = "";
                try {
                    // 读取服务器返回过来的json字符串数据
                    str = EntityUtils.toString(result.getEntity(), "utf-8");
                    // 把json字符串转换成json对象
                    jsonResult = JSONObject.parseObject(str);
                } catch (Exception e) {
                    logger.error("post请求提交失败:" + url, e);
                }
            }
        } catch (IOException e) {
            logger.error("post请求提交失败:" + url, e);
        } finally {
            httpPost.releaseConnection();
        }
        return jsonResult;
    }
    
    /**
     * post请求传输String参数 例如:name=Jack&sex=1&type=2
     * Content-type:application/x-www-form-urlencoded
     * 
     * @param url
     *            url地址
     * @param strParam
     *            参数
     * @return
     */
    public static JSONObject httpPost(String url, String strParam) {
        // post请求返回结果
        CloseableHttpClient httpClient = HttpClients.createDefault();
        JSONObject jsonResult = null;
        HttpPost httpPost = new HttpPost(url);
        httpPost.setConfig(requestConfig);
        try {
            if (null != strParam) {
                // 解决中文乱码问题
                StringEntity entity = new StringEntity(strParam, "utf-8");
                entity.setContentEncoding("UTF-8");
                entity.setContentType("application/x-www-form-urlencoded");
                httpPost.setEntity(entity);
            }
            CloseableHttpResponse result = httpClient.execute(httpPost);
            // 请求发送成功,并得到响应
            if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                String str = "";
                try {
                    // 读取服务器返回过来的json字符串数据
                    str = EntityUtils.toString(result.getEntity(), "utf-8");
                    // 把json字符串转换成json对象
                    jsonResult = JSONObject.parseObject(str);
                } catch (Exception e) {
                    logger.error("post请求提交失败:" + url, e);
                }
            }
        } catch (IOException e) {
            logger.error("post请求提交失败:" + url, e);
        } finally {
            httpPost.releaseConnection();
        }
        return jsonResult;
    }
    
    /**
     * 发送get请求
     * 
     * @param url
     *            路径
     * @return
     */
    public static JSONObject httpGet(String url) {
        // get请求返回结果
        JSONObject jsonResult = null;
        CloseableHttpClient client = HttpClients.createDefault();
        // 发送get请求
        HttpGet request = new HttpGet(url);
        request.setConfig(requestConfig);
        try {
            CloseableHttpResponse response = client.execute(request);
    
            // 请求发送成功,并得到响应
            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                // 读取服务器返回过来的json字符串数据
                HttpEntity entity = response.getEntity();
                String strResult = EntityUtils.toString(entity, "utf-8");
                // 把json字符串转换成json对象
                jsonResult = JSONObject.parseObject(strResult);
            } else {
                logger.error("get请求提交失败:" + url);
            }
        } catch (IOException e) {
            logger.error("get请求提交失败:" + url, e);
        } finally {
            request.releaseConnection();
        }
        return jsonResult;
    }
    }
    

    3.2 会员工程

    @RestController
    @RequestMapping("/member")
    public class MemberController {
    @RequestMapping("/memberIndex")
    public Object memberIndex() throws InterruptedException {
        Map<String, Object> hashMap = new HashMap<String, Object>();
        hashMap.put("code", 200);
        hashMap.put("msg", "memberIndex");
        Thread.sleep(1500);
        return hashMap;
    }
    }
    

    四、项目源码

    会员工程

    订单工程

  • 相关阅读:
    构建之法阅读笔记02
    4.7-4.13 第八周总结
    构建之法阅读笔记01
    顶会热词统计
    结对作业-四则运算升级版
    3.31-4.5 第七周总结
    大道至简阅读笔记03
    3.23-3.30 第六周总结
    第7周总结
    人月神话阅读笔记之三
  • 原文地址:https://www.cnblogs.com/haoworld/p/hystrix-shi-xian-fu-wu-ge-li-he-jiang-ji.html
Copyright © 2011-2022 走看看