zoukankan      html  css  js  c++  java
  • 曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?

    曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?

    前言

    在大家的项目中,想必都有那种,启动时候要去其他服务拉一些数据的情况,如果我们启动时,其他服务没启动,按岂不是就起不来了吗,如果这段拉数据的代码,并不是核心业务,那你这就有点说不过去了:不能因为对方没启动,我们也不能启动吧?

    经过一些思考后,我觉得可以这样,启动的时候:

    • 启动一个定时的线程池,让它去执行拉数据的任务,如果任务执行失败,会过一段时间后再次执行
    • 我们希望,一旦某一次执行任务,成功后,就不要再去拉数据了,浪费网络流量和cpu

    我这边可以大概就大家演示下。

    示例代码

    服务端

    随便写了个spring boot服务端,监听本机8082端口。模拟第三方服务

    @RestController
    @Slf4j
    public class BusinessController {
    
    
        @GetMapping("/")
        public String test() {
            return "success";
        }
    }
    
    @SpringBootApplication
    @Slf4j
    public class WebDemoApplicationServer {
    
    	public static void main(String[] args) {
            ConfigurableApplicationContext context = SpringApplication.run(WebDemoApplicationServer.class, args);
        }
    
    }
    

    客户端

    客户端程序,依赖第三方服务,启动时,要去上面的服务端拉数据。

    代码和上面差不多,唯一是在启动时,会执行以下逻辑:

    @Component
    public class InitRunner implements  CommandLineRunner{
       private static final Logger log = LoggerFactory.getLogger(InitRunner.class);
    
        @Autowired
        private RestTemplate restTemplate;
    
        @Override
        public void run(String... args) throws Exception {
            ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
            String s = entity.toString();
            log.info("get data:{}",s);
        }
    }
    

    在上面的服务没启动的时候,这个客户端是起不来的。

    怎么解决呢,很简单。

    方案1

    public class InitRunnerV2 implements CommandLineRunner {
    
        @Autowired
        private RestTemplate restTemplate;
    	
      	// 1
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
                new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys"));
    
        @Override
        public void run(String... args)  {
          	//2 
            TestTask task = new TestTask(restTemplate);
          	//3 
            ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                            0, 10, TimeUnit.SECONDS);
          	// 4
            task.setScheduledFuture(scheduledFuture);
        }
    
    
    }
    
    • 1处,new了一个线程池,ScheduledThreadPoolExecutor类型,可周期执行某个任务

    • 2处,new了一个任务,这个任务会执行我们的拉数据逻辑。

      这个任务的代码如下:

      @Slf4j
      public class TestTask implements Runnable{
          private RestTemplate restTemplate;
      
          private volatile ScheduledFuture<?> scheduledFuture;
      
          public TestTask(RestTemplate restTemplate) {
              this.restTemplate = restTemplate;
          }
      
          ...
      
          public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
              this.scheduledFuture = scheduledFuture;
          }
      }
      

      其实很简单,就是定义了2个字段,一个是RestTemplate,请求数据时要用;另一个是ScheduledFuture<?>类型,这个字段在上面的InitRunnerV2代码的第三处被赋值。

    • 3处,让这个任务循环执行,每10s一次。

    • 4处,给task的 ScheduledFuture 赋值,注意的是,在task中,这个字段我们定义为volatile,保证线程可见。

    下面是任务代码的剖析:

    @Override
        public void run() {
            try {
                ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
                String s = entity.toString();
                log.info("get data:{}",s);
            } catch (Exception e) {
    //            log.error("e:{}",e);
                log.error("error");
                return;
            }
    
            /**
             * 1 有可能任务执行太快,future还没被赋值
             */
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
    
        }
    

    唯一有什么要说的,就是1处,如果成功了,我们就会调用scheduledFuture.cancel(true);,这样,这个scheduled 任务就不会继续执行了,也就达到了我们的目的,经济实惠。

    到此,代码基本就这样了,详细代码见:

    https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/spring-boot-scheduler-future-demo-parent

    不成熟方案2

    因为上面的方案挺简单实用,但感觉没啥干货,于是我想着是否可以自己来实现一个定制的线程池,把这些事情给自动化了。

    希望实现的最终效果如下,给future增加一个回调,需要在任务执行成功时,该回调自动被调用:

    public class InitRunnerV3 implements CommandLineRunner {
    
        @Autowired
        private RestTemplate restTemplate;
    
        CustomScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
                new CustomScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys"));
    
        @Override
        public void run(String... args)  {
            // 1
            TestTaskV3 task = new TestTaskV3(restTemplate);
            // 2
            CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                            0, 10, TimeUnit.SECONDS);
            // 3
            scheduledFuture.setCustomFutureCallBack(new CustomFutureCallBack() {
    
                @Override
                public void onSuccess(CustomScheduledFuture customScheduledFuture) {
                    log.info("onSuccess");
                    // 4
                    customScheduledFuture.cancel(true);
                }
    
                @Override
                public void onException(Throwable throwable) {
                    log.error("e:{}",throwable);
                }
            });
        }
    
    • 1处,执行任务,任务内部如下,去除了设置future的逻辑,和取消的逻辑

      
      @Slf4j
      public class TestTaskV3 implements Runnable{
          private RestTemplate restTemplate;
      
          public TestTaskV3(RestTemplate restTemplate) {
              this.restTemplate = restTemplate;
          }
        
          @Override
          public void run() {
              try {
                  ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
                  String s = entity.toString();
                  log.info("get data:{}",s);
              } catch (Exception e) {
      //            log.error("e:{}",e);
                  log.error("error");
                  throw e;
              }
        
          }
      
      }
      
    • 2处,循环执行任务,这里的scheduled线程池,是我们自定义的,回头再说;获取其返回的future

    • 3处,给future增加回调,在回调中,如果成功,则取消该任务。

                  @Override
                  public void onSuccess(CustomScheduledFuture customScheduledFuture) {
                      log.info("onSuccess");
                      // 4
                      customScheduledFuture.cancel(true);
                  }
      

    寻找扩展点

    这里,afterExecute是个空实现,就是留给子线程池扩展用的:

        protected void afterExecute(Runnable r, Throwable t) { }
    

    那我们可以考虑下,要怎么才能实现我们的目标呢,我们要在这个方法内,通过传进来的Runnable r,获取到下面这个future才能实现目的:

            CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                            0, 10, TimeUnit.SECONDS);
    
    

    获取到future,就能拿到在future上设置的callback对象,就能调用callback,所以,现在问题是,要在传进来的Runnable中,获取到scheduledFuture

    所以,我们就得包装一下,传进来的runnable,我们定义了如下的Runnable:

    @Data
    public class CustomDecoratedRunnable implements Runnable {
        Runnable runnable;
    
        CustomScheduledFuture customScheduledFuture;
    
        public CustomDecoratedRunnable(Runnable runnable,CustomScheduledFuture customScheduledFuture) {
            this.runnable = runnable;
            this.customScheduledFuture = customScheduledFuture;
        }
    
        @Override
        public void run() {
            this.runnable.run();
        }
    
    
    }
    

    定制线程池

    我们具体看看,我们定制的线程池对象,我们的线程池,直接继承了ScheduledThreadPoolExecutor

    public class CustomScheduledThreadPoolExecutor<V> extends ScheduledThreadPoolExecutor {
    
        public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
            super(corePoolSize, threadFactory);
        }
      
      	...
    }
    

    scheduleAtFixedRate方法,我们进行了重写:

    @Override
        public CustomScheduledFuture<V> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            /**
             * 1 
             */
            CustomScheduledFuture customScheduledFuture = new CustomScheduledFuture();
    		// 2 将future设置到task中
            CustomDecoratedRunnable customDecoratedRunnable = new CustomDecoratedRunnable(command,customScheduledFuture);
           // 3
            ScheduledFuture<?> scheduledFuture = super.scheduleAtFixedRate(customDecoratedRunnable,
                    initialDelay, period, unit);
    
            /**
             * 4 将返回的future,设置到我们包装过的future
             */
            customScheduledFuture.setScheduledFuture((RunnableScheduledFuture) scheduledFuture);
    
            return customScheduledFuture;
        }
    
    • 1处,新建一个自定义的future

    • 2处,将自定义的future,设置到上面说的task中

    • 3处,把包装过的task,丢给线程池

    • 4处,返回一个定制的future,这个future,包装了原有的future,同时,支持设置callback

      public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V> {
          /**
           * 其实是下面这种类型:
           * {@link java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
           *
           */
          RunnableScheduledFuture<V> scheduledFuture;
        
      	// 设置callback时,赋值
          CustomFutureCallBack customFutureCallBack;
      
          Runnable runnable;
      }
      

    丢给定制线程池的task

    本来,我以为,丢给线程池什么Runnable对象,在afterExecute就能拿到什么样的Runnable对象,结果:

    发现,传进来的,已经被包装过了,应该是为了支持周期执行。

    所以,没办法,看起来路被堵死了,通过这个传进来的Runnable,也拿不到我们原始的Runnable。

    后边找了半天,找到下面这个点:

    #java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate
    
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            // 1
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    
    • 1处,会调用decorateTask来包装task,默认实现,就是如下:

          protected <V> RunnableScheduledFuture<V> decorateTask(
              Runnable runnable, RunnableScheduledFuture<V> task) {
              return task;
          }
      

      这里的task,就是前面那个代码里的 ScheduledFutureTask<Void> sft:

              ScheduledFutureTask<Void> sft =
                  new ScheduledFutureTask<Void>(command,
                                                null,
                                                triggerTime(initialDelay, unit),
                                                unit.toNanos(period));
              // 1
              RunnableScheduledFuture<Void> t = decorateTask(command, sft);
      

    所以,我们得想办法重载这个方法:

        @Override
        protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
            CustomScheduledFuture<V> future = new CustomScheduledFuture<>();
            future.setRunnable(runnable);
            future.setScheduledFuture(task);
            return future;
        }
    

    这里,利用CustomScheduledFuture,封装了task和runnable两个对象。

    同时,我们自定义的这个CustomScheduledFuture,也是实现了这个方法的返回值,指定的接口:

    @Data
    public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V> 
    
    
    

    目前为止,经过包装后,在afterExecute处,拿到的Runnable如下:

    afterExecute的逻辑,调用回调

     @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            CustomScheduledFuture future;
            CustomDecoratedRunnable runnable = null;
            if (r instanceof CustomScheduledFuture) {
                future = (CustomScheduledFuture) r;
                // 1
                runnable = (CustomDecoratedRunnable) future.getRunnable();
            }
            // 2
            CustomScheduledFuture customScheduledFuture = runnable.getCustomScheduledFuture();
            // 3
            CustomFutureCallBack customFutureCallBack = customScheduledFuture.getCustomFutureCallBack();
            if (customFutureCallBack != null) {
                if (t != null) {
                    customFutureCallBack.onException(t);
                } else {
                    // 4
                    customFutureCallBack.onSuccess(customScheduledFuture);
                }
            }
    
        }
    
    • 1处,获取runnable
    • 2处,根据runnable,获取我们的future
    • 3处,通过future,获取回调
    • 4处,调用回调

    效果展示

    2020-04-10 09:45:28.068  INFO 14456 --- [           main] No active profile set, falling back to default profiles: default
    2020-04-10 09:45:28.822  INFO 14456 --- [           main] Started WebDemoApplication in 1.153 seconds (JVM running for 1.805)
    2020-04-10 09:45:36.933 ERROR 14456 --- [init-data-from-third-sys-1-thread-1] error
    2020-04-10 09:48:48.975  INFO 14456 --- [init-data-from-third-sys-1-thread-1] onSuccess
    

    可以看到,任务执行失败了,但为啥会调用onSuccess呢;另外,大家可以看到,都是在线程池的线程中执行的。

    为啥会error了,还执行success呢,我发现,即使我在task中抛出了异常,但是上层没捕获。

    我猜测,是因为:

    public interface Runnable {
        /**
         * When an object implementing interface <code>Runnable</code> is used
         * to create a thread, starting the thread causes the object's
         * <code>run</code> method to be called in that separately executing
         * thread.
         * <p>
         * The general contract of the method <code>run</code> is that it may
         * take any action whatsoever.
         *
         * @see     java.lang.Thread#run()
         */
        public abstract void run();
    }
    

    这里没有抛出异常,所以,即使实现的runnable中抛了,上层也不管。

    具体还要验证。

    注意点

    另一个点是,执行失败了,等了10s,并没有再次执行,猜测是我的定制task,导致了周期执行的问题。这个待验证和解决。

    但,一个简单的回调,我们已经实现了。

    总结

    大家使用方案1 就可以了;后面的方案,是折腾着玩的。希望对大家有帮助。
    全部代码都在:

    https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/spring-boot-scheduler-future-demo-parent

  • 相关阅读:
    如何判断 DataRow 中是否存在某列????
    jquery操作table中的tr,td的方法双击dblclick attr parent id原创
    oracle 取当天日期减一天 应该如何写
    走出“搜索引擎营销”三个误区
    解决方案是什么
    强制远程连接 命令
    ORACLE 异常错误处理
    HttpClient是否有默认并发数限制?
    多线程下载程序的功能需求
    STL线程库简介
  • 原文地址:https://www.cnblogs.com/grey-wolf/p/12671471.html
Copyright © 2011-2022 走看看