zoukankan      html  css  js  c++  java
  • Spring Cloud-hystrix使用例子(七)

     

    继承方式

    HystrixCommand

    public class UserSelectAllCommand extends HystrixCommand<List<User>> {
        private RestTemplate restTemplate;
    
        /**
         * 设置线程组 和命令名用于仪表盘统计信息
         * 设置线程组 可以使同一个组使用同一个线程池
         * .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey") 可以跟细粒度的线程池划分
         * @param restTemplate
         */
        public  UserSelectAllCommand(RestTemplate restTemplate){
            super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("YouGroupName"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("YouCommandName"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                            .withExecutionTimeoutInMilliseconds(10000)//设置超时时间。我这边全局设置无效 就对应设置
    
                    ));
            this.restTemplate=restTemplate;
        }
        @Override
        protected List<User> run() throws Exception {
            return restTemplate.getForObject("http://PROVIDER/user/findAll",List.class);
        }
    }
    @Controller
    @RequestMapping("/UserHystrixCommand")
    public class UserHystrixCommandContorller {
        @Autowired
        RestTemplate restTemplate;
    
        //同步执行
        @RequestMapping("/findAll")
        @ResponseBody
        public List<User> findAll() {
            UserSelectAllCommand userSelectAllCommand = new UserSelectAllCommand(restTemplate);
            return userSelectAllCommand.execute();
        }
    
        //异步
        @RequestMapping("/findAllAsyn")
        @ResponseBody
        public List<User> findAllAsyn() throws ExecutionException, InterruptedException {
            UserSelectAllCommand userSelectAllCommand = new UserSelectAllCommand(restTemplate);
            return userSelectAllCommand.queue().get();
        }
    
    }

    调用execute同步执行  queue 返回Future 异步执行

    还可以通过执行

     Observable observable= userSelectAllCommand.toObservable();//订阅的时候发起请求
     Observable observable=userSelectAllCommand.observe();//立即发起请求

    通过订阅获得请求结果

        observable.subscribe(new Subscriber() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
    
                @Override
                public void onNext(Object o) {
    
                }
            });

    HystrixObservableCommand

    与HystrixCommand是可以发射多次结果

    public class UserSelectObservableCommand extends HystrixObservableCommand<User> {
        /**
         * 设置线程组 和命令名用于仪表盘统计信息
         * 设置线程组 可以使同一个组使用同一个线程池
         * .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey") 可以跟细粒度的线程池划分
         * @param restTemplate
         */
        @Autowired
        RestTemplate restTemplate;
        private  List<Integer> ids;
        public UserSelectObservableCommand(List<Integer> ids, RestTemplate restTemplate) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("YouGroupName"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("YouCommandName"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                            .withExecutionTimeoutInMilliseconds(10000)//设置超时时间。我这边全局设置无效 就对应设置
    
                    ));
            this.restTemplate=restTemplate;
            this.ids=ids;
        }
    
        @Override
        protected Observable<User> construct() {
            return Observable.create(new Observable.OnSubscribe<User>() {
                @Override
                public void call(Subscriber<? super User> subscriber) {
                    try{
                        if(!subscriber.isUnsubscribed()){
                            for (Integer id:
                                    ids) {
    
                                MultiValueMap<String, String> map= new LinkedMultiValueMap<String, String>();
                                map.add("id",id.toString());
                                HttpHeaders headers = new HttpHeaders();
                                headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
                                HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(map, headers);
                                //调用多次服务
                                User user=restTemplate.postForEntity("http://PROVIDER/user/findById", request,User.class).getBody();
                                subscriber.onNext(user);
                            }
                        }
                        subscriber.onCompleted();
                    }catch (Exception e){
                        e.printStackTrace();
                        subscriber.onError(e);
                    }
    
    
                }
            }).subscribeOn(Schedulers.io());
        }
    
        /**
         * 服务降级执行逻辑
         * 错误 超时 线程池拒绝 断路器熔断 执行
         * @return
         */
        @Override
        protected Observable<User> resumeWithFallback() {
            return Observable.create(new Observable.OnSubscribe<User>() {
                @Override
                public void call(Subscriber<? super User> observer) {
                    try {
                        if (!observer.isUnsubscribed()) {
                            User u = new User();
                            observer.onNext(u);
                            observer.onCompleted();
                        }
                    } catch (Exception e) {
                        observer.onError(e);
                    }
                }
            }).subscribeOn(Schedulers.io());
        }
    }
     //异步
        @RequestMapping("/process")
        @ResponseBody
        public void process() throws ExecutionException, InterruptedException {
            UserSelectObservableCommand  userSelectObservableCommand=new UserSelectObservableCommand(Arrays.asList(new Integer[]{1,2,3,4,6}),restTemplate);
            Observable<User> observable= userSelectObservableCommand.observe();
            observable.subscribe(new Subscriber<User>(){
                List<User> users=new ArrayList<User>();
    
                @Override
                public void onCompleted() {
                    users.stream().forEach(c->{
                        System.out.println(c.getName());
                    });
    
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
    
                @Override
                public void onNext(User user) {
                    users.add(user);
                }
            });
    
        }

    注解方式

    @Component
    public class UserService {
        @Autowired
        RestTemplate restTemplate;
        
        @HystrixCommand(groupKey = "userService",commandKey = "findAll")
        public List<User> findAll(){
            return restTemplate.getForObject("http://PROVIDER/user/findAll",List.class);
        }
    
        @HystrixCommand
        public Future<List<User>> findAllAsyn(){
            return new AsyncResult<List<User>>() {
                @Override
                public List<User> invoke() {
                   return findAll();
                }
            };
        }
    
        /**
         * ObservableExecutionMode.EAGER observe ()
         *  ObservableExecutionMode.LAZY toObservable ()
         *  ignoreExceptions 排除异常
         * @param id
         * @return
         */
        @HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER,ignoreExceptions = BusinessException.class)
        public Observable<User> findUserByIdA(Integer id){
    
           return Observable.create(new Observable.OnSubscribe<User>(){
    
               @Override
               public void call(Subscriber<? super User> subscriber) {
                   //判断是否取消订阅
                   if(subscriber.isUnsubscribed()){
                       User user=restTemplate.getForObject("http://PROVIDER/user/findById",User.class);
                       subscriber.onNext(user);
                       subscriber.onCompleted();
                   }
               }
           });
        }
    
    }
    @Controller
    @RequestMapping("/UserHystrixCommandAnotation")
    public class UserHystrixCommandAnotationContorller {
        @Autowired
        UserService userService;
        //同步执行
        @RequestMapping("/findAll")
        @ResponseBody
        public List<User> findAll(){
            return userService.findAll();
        }
    
        //异步
        @RequestMapping("/findAllAsyn")
        @ResponseBody
        public List<User> findAllAsyn() throws ExecutionException, InterruptedException {
           return userService.findAllAsyn().get();
        }
    
    
    
    }

     请求缓存

    继承方式

    public class UserSelectAllCommand extends HystrixCommand<List<User>> {
        private RestTemplate restTemplate;
        public static final   HystrixCommandKey hystrixCommandKey=HystrixCommandKey.Factory.asKey("findAll");;
    
        /**
         * 设置线程组 和命令名用于仪表盘统计信息
         * 设置线程组 可以使同一个组使用同一个线程池
         * .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey") 可以跟细粒度的线程池划分
         * @param restTemplate
         */
        public  UserSelectAllCommand(RestTemplate restTemplate){
    
            super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UserService"))
                    .andCommandKey(hystrixCommandKey)
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                            .withExecutionTimeoutInMilliseconds(10000)//设置超时时间。我这边全局设置无效 就对应设置
    
                    ));
            this.restTemplate=restTemplate;
        }
        @Override
        protected List<User> run() throws Exception {
            System.out.println("执行了....");
            return restTemplate.getForObject("http://PROVIDER/user/findAll",List.class);
        }
    
        /**
         * 只需要重写这个方法 将开启缓存
         * @return
         */
        @Override
        protected String getCacheKey() {
            return  "UserSelectAllCommand";//因为没有参数所以key是用类名
        }
    
        /**
         * 清除缓存
         */
        public static  void  clearKey(){
            HystrixRequestCache.getInstance(UserSelectAllCommand.hystrixCommandKey,HystrixConcurrencyStrategyDefault.getInstance() ).clear("UserSelectAllCommand");
        }
    
    }
    @Controller
    @RequestMapping("/UserHystrixCommand")
        public class UserHystrixCommandContorller {
        @Autowired
        RestTemplate restTemplate;
    
        //同步执行
        @RequestMapping("/findAll")
        @ResponseBody
        public List<User> findAll() {
            //开启缓存后 必须初始化一个context
            HystrixRequestContext.initializeContext();
            UserSelectAllCommand userSelectAllCommand = new UserSelectAllCommand(restTemplate);
            userSelectAllCommand.execute();
            userSelectAllCommand = new UserSelectAllCommand(restTemplate);
            userSelectAllCommand.execute();
            userSelectAllCommand = new UserSelectAllCommand(restTemplate);
            //清空缓存
            UserSelectAllCommand.clearKey();
            return userSelectAllCommand.execute();
        }
    
    }

    注意每个HystrixCommand命令只能调用一次 多次调用会报错

    注解方式

    @CacheResult 标识结果被缓存 必须配合@HystrixCommand使用 可以使用cacheKeyMethod或者CacheKey设置缓存key
    cacheKeyMethod 标识获得缓存key的方法名 参数形式必须与目标方法一致
    @CacheRemove 标识将清除指定key的缓存 commandKey 必须指定 用于定位到清除指定命令的缓存cacheKeyMethod 指定清除缓存key或者使用CacheKey指定

    cacheKeyMethod 获得清除缓存key的方法名 参数形式必须与目标方法一致

     commandKey 指定需要清除指定命令的缓存

    @Cachekey 标识指定目标为缓存的key优先级比cacheKeyMethod低  

    指定缓存key的几种方式

    @Component
    public class UserService {
        @Autowired
        RestTemplate restTemplate;
    
        /**
         * 如果没有指定cacheKey 则默认是参数
         *
         * @param id
         * @return
         */
        @CacheResult
        @HystrixCommand(ignoreExceptions = BusinessException.class)
        public User findUserById(Integer id) {
            System.out.println("执行了。。。。");
            User user = restTemplate.getForObject("http://PROVIDER/user/findById?id={id}", User.class, id);
            return user;
        }
    
    }
     @RequestMapping("/findById")
        @ResponseBody
        public User findById(Integer id){
            //开启缓存后 必须初始化一个context 可以在过滤器统一实现
            HystrixRequestContext.initializeContext();
            userService.findUserById(id);
            userService.findUserById(id);
            userService.findUserById(id);
            return userService.findUserById(id);
        }

    使用cacheMethod定义缓存key

        /**
         * 如果没有指定cacheKey 则默认是参数
         * @param id
         * @return
         */
        @CacheResult(cacheKeyMethod= "getFindUserByIdKey")
        @HystrixCommand(ignoreExceptions = BusinessException.class)
        public User findUserById(Integer id) {
            System.out.println("执行了。。。。");
            User user = restTemplate.getForObject("http://PROVIDER/user/findById?id={id}", User.class, id);
            return user;
        }
    
        /**
         * 参数要与上面指定方法的一致
         * @param id
         * @return
         */
        public String getFindUserByIdKey(Integer id){
            return  String.valueOf(id);
        }

    使用cacheKey定义缓存的key

       /**
         * 如果没有指定cacheKey 则默认是参数
         * @param id
         * @return
         */
        @CacheResult
        @HystrixCommand(ignoreExceptions = BusinessException.class)
        public User findUserById(@CacheKey Integer id) {
            System.out.println("执行了。。。。");
            User user = restTemplate.getForObject("http://PROVIDER/user/findById?id={id}", User.class, id);
            return user;
        }

    如果是对象可以使用

     @CacheResult
        @HystrixCommand(ignoreExceptions = BusinessException.class)
        public User findUserById(@CacheKey("{属性名字}") User user) {
            System.out.println("执行了。。。。");
            User user = restTemplate.getForObject("http://PROVIDER/user/findById?id={id}", User.class, id);
            return user;
        }

    清除缓存

       /**
         * CacheRemove.commandKey必须指定  通过他能够找到缓存的位置然后通过key删除
         *
         * @param user
         * @return
         */
        @CacheRemove(commandKey = "findUserById")
        @HystrixCommand(ignoreExceptions = BusinessException.class)
        public boolean saveEdit(@CacheKey("id") User user) {
            return true;
        }

    commandKey必须指定 用于定位某个命令的key  没显式指定命令 则为方法名    几种指定的key的方式和缓存一致

    请求合并 

    继承的方式

    1.准备一个批量查询的Service

    @Service
    public class UserService {
        @Autowired
     private RestTemplate restTemplate;
    
        public List<User> findAll(List<Long> ids){
            List<User> users=restTemplate.getForObject("http://PROVIDER/user?ids={1}", List.class, StringUtils.join(ids,","));
            return  users;
        }
    }

    2.准备一个批处理Command

    public class UserBatchCommand extends HystrixCommand<List<User>> {
        UserService userService;
        List<Long> ids;
        public UserBatchCommand(UserService userService,List<Long> userIds){
            super(Setter.withGroupKey(asKey("userServiceCommand")).andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                    .withExecutionTimeoutInMilliseconds(10000)));//设置超时时间。我这边全局设置无效 就对应设置
            this.userService=userService;
            this.ids=userIds;
        }
    
        @Override
        protected List<User> run() throws Exception {
            return userService.findAll(ids);
        }
    }

    3.定义请求合并器

    /**
     * 第一个泛型参数 为批量处理的请求的返回类型
     * 第二个泛型参数 为单个请求的返回类型
     * 第三个泛型参数 为参数类型
     */
    public class UserCollapseCommand extends HystrixCollapser<List<User>,User ,Long> {
        UserService userService;
        Long userId;
    
        /**
         * 用于获取请求参数
         * @return
         */
        @Override
        public Long getRequestArgument() {
            return userId;
        }
        public  UserCollapseCommand(UserService userService,Long userId){
            super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapsercommand"))
                    .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
            this.userService=userService;
            this.userId=userId;
        }
    
    
        /**
         * 合并请求产生批量处理的方法
         * @param collection
         * @return
         */
        @Override
        protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> collection) {
            List<Long> userIds=new ArrayList<Long>(collection.size());
            userIds.addAll(collection.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
            return new UserBatchCommand(userService,userIds);
        }
    
        /**
         * 批量请求获得结果后 将结果拆分 返回给各个原子请求
         * @param users
         * @param collection
         */
        @Override
        protected void mapResponseToRequests(List<User> users, Collection<CollapsedRequest<User, Long>> collection) {
            int count=0;
            ObjectMapper objectMapper=new ObjectMapper();
            for(CollapsedRequest<User,Long> collapsedRequest:collection) {
                User user =objectMapper.convertValue(users.get(count++),User.class);
                collapsedRequest.setResponse(user);
            }
        }
    }

    4.测试

    @RunWith(SpringJUnit4ClassRunner .class)
    @SpringBootTest(classes={SpringcloudConsumerApplication.class, hystrixCollapse.hystrixCollapserTest.class})
    public class hystrixCollapserTest {
    
        @Autowired
        UserService userService;
        @Test
    
         public void  simpleTest() throws ExecutionException, InterruptedException {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
    
            List<Future<User>> user=new ArrayList<Future<User>>();
            for(long id=0;id<10;id++){
                UserCollapseCommand userCollapseCommand=new UserCollapseCommand(userService,id);
                User user1=userCollapseCommand.queue().get();
                System.out.print(user1.getId());
            }
            Thread.sleep(4000);
    
         }
    
    
    }

    当在一定窗口期内 的请求 会合并成一个请求   通过HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100) 默认100毫秒

    注解方式

    @Service
    public class UserSerivice {
        @Autowired
        private RestTemplate restTemplate;
    
        @HystrixCollapser(batchMethod = "findAll",collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds",value="100")})
        public User find(Long id){
            return restTemplate.getForObject("http://PROVIDER/user/{1}", User.class, id);
        }
    
        /**
         * 直接返回list会转为linkendHashMap 所以这里手动转了一次 正式环境 都用包装类封装一次 ProcessResult7yt  g
         * @param ids
         * @return
         */
        @HystrixCommand(commandProperties={
                @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000")})
        public List<User> findAll(List<Long> ids){
            List<User> users=restTemplate.getForObject("http://PROVIDER/user?ids={1}", List.class, StringUtils.join(ids,","));
            ObjectMapper objectMapper=new ObjectMapper();
            List<User> userList=new ArrayList<User>();
            for (Object obj:users
                 ) {
                userList.add(objectMapper.convertValue(obj,User.class));
            }
            return  userList;
        }
    }

     合并器原理图

    未使用合并器

    使用合并器后

    请求合并器虽然能节省线程池的开销  但是因为有窗口期  如果窗口10m  一个请求耗时需要5m  也会等到窗口期过才发起请求

    窗口期内有3个以上请求 才推荐使用请求合并

  • 相关阅读:
    解决sql2008附加不了2005的数据库文件的问题
    方阵
    台阶问题
    螺旋矩阵
    兔子问题
    九乘九乘法口诀
    选猴王
    拿鸡蛋问题
    软工个人作业
    小学四则运算法则训练
  • 原文地址:https://www.cnblogs.com/LQBlog/p/10139853.html
Copyright © 2011-2022 走看看