zoukankan      html  css  js  c++  java
  • 多线程异步调度任务

    业务场景:Controller接受一个参数Object,调到业务层,把Object加入队列,成功就返回,同时开启线程做异步的消息发送。

    目录结构:

    CreatThreadPool.java

     1 public class CreatThreadPool {
     2 
     3     private ThreadPoolExecutor pool;
     4 
     5     public CreateThreadPool(long maxAlive, int maxSize, int minSize, int queueSize) {
     6         pool = new ThreadPoolExecutor(minSize, maxSize, maxAlive, TimeUnit.SECONDS, new LinkedBlockingQueue(queueSize));
     7     }
     8 
     9     public Future<String> submit(Runnable task) {
    10         return pool.submit(task, null);
    11     }
    12 
    13     @ManagedAttribute
    14     public String getPoolStatus() {
    15         JSONObject json = new JSONObject();
    16         json.put("activeCount", pool.getActiveCount());
    17         json.put("taskCount", pool.getTaskCount());
    18         json.put("completedTaskCount", pool.getCompletedTaskCount());
    19         return json.toJSONString();
    20     }
    21 }

    CreatThread.java

     1 public class CreatThread implements Runnable{
     2 
     3     private Object obj;
     4     
     5     private ApplicationContext context;
     6 
     7     @Override
     8     public void run() {
     9 
    10         try {
    11             sendMessage();
    12         } catch (Exception e) {
    13             e.getMessage();
    14         }
    15     }
    16 
    17     public void sendMessage() {
    18           //因为此处是线程类所以spring的bean加载不进来,需要使用ApplicationContext来获取
    19         MQPublisher mQPublisher=(MQPublisher) context.getBean(MQPublisher.class);
    20 
    21         Object obj=(MQLinkDTO) context.getBean(Object.class);
    22         //此处是为了把对象转换为JSON格式,所以用了阿里的fastjson
    23         JSONObject json=(JSONObject) JSON.toJSON(noticeDTO);
    24         
    25         mQPublisher.sendMessage(mQLinkDTO.getName(), mQLinkDTO.getKey(),json);
    26     }
    27 
    28     public void setObject(Object obj) {
    29         this.obj = obj;
    30     }
    31     
    32     public void setContext(ApplicationContext context) {
    33         this.context = context;
    34     }
    35 }

    Disptch.java

     1 public class Dispatch implements Runnable,ApplicationContextAware{
     2 
     3     
     4     private LinkedBlockingQueue<Object> queue;
     5     
     6     private ApplicationContext context;
     7     //注入一个空的线程池
     8     @Autowired
     9     private CreatThreadPool pool;
    10     
    11     public Dispatcher(int queueSize) {
    12         //初始化队列
    13         queue=new LinkedBlockingQueue<>(queueSize);
    14     }
    15 
    16     @Override
    17     public void run() {
    18         while (true) {
    19             Object obj = null;
    20             try {
    21                 obj = queue.take();
    22                 
    23                 if (obj != null) {
    24                //创建线程,把object传过去,并且把context传过去,把线程放到线程池,就绪
    25                     CreatThread thread = new CreatThread();
    26                     thread.setObject(obj);
    27                     thread.setContext(context);
    28                     pool.submit(thread);
    29                 }
    30             } catch (Exception e) {
    31                 e.printStackTrace();
    32             }
    33         }
    34         
    35     }
    36 
    37     public boolean offer(Object obj) {
    38         //将obj加入队列,成功返回true
    39         boolean result = queue.offer(obj);
    40         return result;
    41     }
    42 
    43     @Override
    44     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    45        //获取上下文对象
    46         this.context=applicationContext;
    47     }
    48 }

    LoadedConfiguration.java

    @Configuration
    public class LoadedConfiguration {
    
    //读取配置文件,生产Pool、Dispatcher、MQLinkDTO的bean
        @Bean(name="CreatThreadPool")
        public CreatThreadPool loadThreadPool() {
            return new CreatThreadPool(
                    Long.parseLong((String) System.getProperty("threadpool.maxAliveTime")),
                    Integer.parseInt((String) System.getProperty("maxSize")),
                    Integer.parseInt((String) System.getProperty("threadpool.minSize")),
                    Integer.parseInt((String) System.getProperty("threadpool.queueSize")));
        }
    
        @Bean(name="Dispatcher")
        public CreatDispatcher creatDispatcher() {
            Dispatcher dispatcher = new Dispatcher(Integer.parseInt((String) System.getProperty("blockingqueue.size")));
            Thread thread = new Thread(creatDispatcher);
            thread.setName("create.dispatcher");
            thread.start();
            return dispatcher;
        }
        
        @Bean(name="MQLinkDTO")
        public MQLinkDTO creatMQLinkDTO() {
            MQLinkDTO mQLinkDTO = new MQLinkDTO(System.getProperty("topic_Name"),System.getProperty("topic_Key"));
            return mQLinkDTO;
        }
    }

    Service.java

     1 @Service
     2  public class Service {
     3       
     4       @Autowired
     5       private Dispatcher dispatcher;
     6       
     7       public boolean sendMQ(Object obj){
     8           return dispatcher.offer(obj);
     9       }
    10  }

    Controller.java

     1 @RestController
     2 public class Controller {
     3 
     4     @Autowired
     5     private Service service;
     6     
     7     @ApiOperation(value = "获取消息提醒", notes = "获取消息提醒")
     8     @RequestMapping(value = "/get", method = RequestMethod.POST)
     9     public Result<NoticeDTO> get(@RequestBody Object obj){
    10         if(service.sendMQ(obj)){
    11             return new Result<Object>(obj, ResponseCode.SUCCESS);
    12         }else{
    13                     return new Result<Object>(obj, ResponseCode.Fail);
    14                 }    
    15     }
    16 }

     顺序:JVM首先加载LoadedConfiguration.calss 生成Bean,然后客户端发送请求,经过controller->service->dispatch(调度线程,加入线程池等待就绪)

    tip:

    1.线程中不支持使用spring的依赖注入

    2.在线程中想要获取其他的Bean,需要用到ApplicationContext对象。context.getBean(object.class)

    3.做异步处理所以不需要对传入参数加锁,反之需要使用synchronized (obj) {} 对obj加锁,并且使用wait阻塞线程,等待唤醒。

    4.使用队列(LinkedBlockingQueue)时,使用offer方法判断是否加入队列,并且在取对象时(take)记得判空。

    5.JSON.toJSONString方法可以转Object为Json串,但是会产生转义字符,所以要加入到JSONObject中。

  • 相关阅读:
    IMP本质上是一个通用的函数指针
    yourphp常用标签
    yourphp目录结构
    HTTP与HTTPS的区别
    https和http有什么区别
    如何把浏览器不信任的网址设置为可信任的网点
    ico图标在谷歌浏览器中如何显示?
    Yourphp是一款完全开源免费的.核心采用了Thinkphp框架
    No input file specified的解决方法apache伪静态
    一个服务器的吞吐率
  • 原文地址:https://www.cnblogs.com/wangzun/p/7170273.html
Copyright © 2011-2022 走看看