zoukankan      html  css  js  c++  java
  • 异步框架asyn4j的原理

    启动时调用init方法

    [java] view plain copy
     
    1. public void init(){  
    2.     if (!run){  
    3.       run = true;  
    4.       //工作队列  
    5.       workQueue = newPriorityBlockingQueue(maxCacheWork);  
    6.       //是否存在工作队列满处理类  
    7.       if (this.workQueueFullHandler != null) {  
    8.         //自定义线程池  
    9.         workExecutor = newAsynThreadPoolExecutor(work_thread_num, work_thread_num, 0L,TimeUnit.MILLISECONDS,  
    10.           workQueue, new AsynThreadFactory(),new AsynWorkRejectedExecutionHandler(this.workQueueFullHandler),  
    11.           executeWorkNum);  
    12.       } else {  
    13.         workExecutor = newAsynThreadPoolExecutor(work_thread_num, work_thread_num, 0L,TimeUnit.MILLISECONDS,  
    14.           workQueue, new AsynThreadFactory(),executeWorkNum);  
    15.       }  
    16.       //回调函数工作队列  
    17.       callBackQueue = newLinkedBlockingQueue();  
    18.       //自定义回调函数线程池  
    19.       callBackExecutor = newCallBackThreadPoolExecutor(callback_thread_num, callback_thread_num, 0L,  
    20.         TimeUnit.MILLISECONDS, callBackQueue,new CallBackThreadFactory(),  
    21.         new CallBackRejectedExecutionHandler(),callBackNum);  
    22.       //service启动和关闭处理器  
    23.       if (this.serviceHandler != null) {  
    24.         this.serviceHandler.setServiceStat(0);  
    25.        this.serviceHandler.setAsynService(this);  
    26.         this.serviceHandler.process();  
    27.       }  
    28.       //启动工作队列满处理器  
    29.       if (this.workQueueFullHandler != null) {  
    30.         this.workQueueFullHandler.process();  
    31.       }  
    32.       //程序关闭后的处理  
    33.       Runtime.getRuntime().addShutdownHook(newThread(){  
    34.         public void run()  
    35.         {  
    36.          AsynServiceImpl.this.close(AsynServiceImpl.closeServiceWaitTime);  
    37.         }  
    38.       });  
    39.     }  
    40.   }  

    AsynServiceImpl主要有两个线程池和三个处理器组成

    工作线程池AsynThreadPoolExecutor

    [java] view plain copy
     
    1. public classAsynThreadPoolExecutor  
    2.   extends ThreadPoolExecutor  
    3. {  
    4.   private AtomicLong executeWorkNum;  

    回调函数线程池CallbackThreadPoolExecutor

    [java] view plain copy
     
    1. public classCallBackThreadPoolExecutor  
    2.   extends ThreadPoolExecutor  
    3. {  
    4.   private AtomicLong callBackNum;  

    AsynThreadPoolExecutor的executeWorkNum,统计任务的执行数

    AsynThreadPoolExecutor的callBackNum,统计回调函数的执行数

    ServiceHandler处理器(启动或关闭)

    WorkQueueFullHandler处理器(任务数超过maxCacheWork)

    ErrorAsynWorkHandler处理器(任务执行异常)

    执行时调用addWork方法

      

    [java] view plain copy
     
    1. public void addWork(Object tagerObject, Stringmethod, Object[] params, AsynCallBack asynCallBack, WorkWeight weight, booleancache)  
    2.   {  
    3.     if ((tagerObject == null) || (method ==null)) {  
    4.       throw newIllegalArgumentException("target name is null or  target method name is null");  
    5.     }  
    6.     Object target = null;  
    7.     //如果对象是String  
    8.     if(tagerObject.getClass().isAssignableFrom(String.class)){  
    9.       addWorkWithSpring(tagerObject.toString(),method, params, asynCallBack, weight);  
    10.       return;  
    11.     }  
    12.     //如果对象是class  
    13.     if ((tagerObject instanceof Class)) {  
    14.       String classKey =((Class)tagerObject).getSimpleName();  
    15.       //是否缓存,一个class缓存一个默认的对象,防止重复创建  
    16.          if (cache)  
    17.       {  
    18.         target = targetCacheMap.get(classKey);  
    19.         if (target == null)  
    20.         {  
    21.           target =newObject((Class)tagerObject);  
    22.           targetCacheMap.put(classKey, target);  
    23.         }  
    24.       }  
    25.       else  
    26.       {  
    27.         target = newObject((Class)tagerObject);  
    28.       }  
    29.     }  
    30.     else  
    31.     {  
    32.       target = tagerObject;  
    33.     }  
    34.     if (target == null) {  
    35.       throw newIllegalArgumentException("target object is null");  
    36.     }  
    37.     //封装成异步任务  
    38.     AsynWork anycWork = newAsynWorkEntity(target, method, params, asynCallBack, weight);  
    39.      
    40.     addAsynWork(anycWork);  
    41.   }  
    [java] view plain copy
     
    1. public voidaddAsynWork(AsynWork asynWork){  
    2.     if (!run) {  
    3.       throw new Asyn4jException("asynservice is stop or no start!");  
    4.     }  
    5.     if (asynWork == null) {  
    6.       throw newIllegalArgumentException("asynWork is null");  
    7.     }  
    8.     try<span style="font-family: Arial, Helvetica, sans-serif;">{</span>  
    [java] view plain copy
     
    1.     //通过semaphore来获取许可权限  
    2.     if(this.semaphore.tryAcquire(addWorkWaitTime, TimeUnit.MILLISECONDS))  
    3.     {  
    4.       WorkProcessor workProcessor = newWorkProcessor(asynWork, this);  
    5.        
    6.       workExecutor.execute(workProcessor);  
    7.       totalWork.incrementAndGet();  
    8.     }  
    9.     else{  
    10.       log.warn("work queue is full,addwork to cache queue");  
    11.      this.workQueueFullHandler.addAsynWork(asynWork);  
    12.     }  
    13.   }  
    14.   catch (InterruptedException e)  
    15.   {  
    16.     log.error(e);  
    17.   }  
    18. }  



    通过semaphore来控制最大的访问线程数,maxCacheWork就是semaphore的数量,也是工作队列的数量

    这里的目的是控制工作队列的数量不能超过maxCacheWork(也可以通过用上限的queue来代替)

    如果超过队列maxCacheWork,就用workQueueFullHandler去处理,处理方式同线程池的拒绝策略处理器(

    newAsynWorkRejectedExecutionHandler(this.workQueueFullHandler))不过线程池的拒绝策略没用到(前提是队列的有上限的队列),

    工作任务WorkProcessor(内含AsynWorkEntity)

    [java] view plain copy
     
    1. public void run() {  
    2.   Thread currentThread =Thread.currentThread();  
    3.   if (this.asynWork.getThreadName() != null){  
    4.     setName(currentThread,this.asynWork.getThreadName());  
    5.   }  
    6.   AsynCallBack result = null;  
    7.   try{  
    8.     result = this.asynWork.call();  
    9.     if (result != null) {  
    10.      ApplicationContext.getCallBackExecutor().execute(result);  
    11.     }  
    12.   }  
    13.   catch (Throwable throwable){  
    14.     if(this.applicationContext.getErrorAsynWorkHandler() != null) {  
    15.      this.applicationContext.getErrorAsynWorkHandler().addErrorWork(this.asynWork,throwable);  
    16.     }  
    17.   }  
    18.   finally{  
    19.    this.applicationContext.getSemaphore().release();  
    20.   }  
    21. }  



    首先修改线程名称,然后调用asynWork的call方法,如果有回调函数,就执行,如果有异常就执行ErrorAsynWorkHandler,最后Semaphore释放一个许可

    AsynWorkEntity

    [java] view plain copy
     
    1. public AsynCallBack call()  
    2.   throws Exception {  
    3.   if (this.target == null) {  
    4.     throw new RuntimeException("targetobject is null");  
    5.   }  
    6.   Class clazz = this.target.getClass();  
    7.    
    8.   String methodKey =MethodUtil.getClassMethodKey(clazz, this.params, this.method);  
    9.    
    10.   Method targetMethod =(Method)methodCacheMap.get(methodKey);  
    11.   if (targetMethod == null){  
    12.     targetMethod =MethodUtil.getTargetMethod(clazz, this.params, this.method);  
    13.     if (targetMethod != null) {  
    14.       methodCacheMap.put(methodKey,targetMethod);  
    15.     }  
    16.   }  
    17.   if (targetMethod == null) {  
    18.     throw newIllegalArgumentException("target method is null");  
    19.   }  
    20.   Object result =targetMethod.invoke(this.target, this.params);  
    21.   if (this.anycResult != null) {  
    22.     this.anycResult.setInokeResult(result);  
    23.   }  
    24.   return this.anycResult;  
    25. }  



    通过反射技术调用用具体方法,也用缓存技术,anycResult就是你定义的回调函数,没有就返回null

    关闭时调用close方法

    [java] view plain copy
     
      1. publicvoid close(long waitTime){  
      2.   if (run){  
      3.     run = false;  
      4.     try {  
      5.       workExecutor.awaitTermination(waitTime,TimeUnit.MILLISECONDS);  
      6.        
      7.       callBackExecutor.awaitTermination(0L,TimeUnit.MILLISECONDS);  
      8.     }  
      9.     catch (InterruptedException e)  
      10.     {  
      11.       log.error(e);  
      12.     }  
      13.     //关闭工作线程和回调函数线程  
      14.     workExecutor.shutdown();  
      15.     callBackExecutor.shutdown();  
      16.     //service关闭处理器  
      17.     if (this.serviceHandler != null)  
      18.     {  
      19.      this.serviceHandler.setAsynWorkQueue(workQueue);  
      20.       this.serviceHandler.setCallBackQueue(callBackQueue);  
      21.       this.serviceHandler.setServiceStat(1);  
      22.       this.serviceHandler.process();  
      23.     }  
      24.   }  
      25. }  
  • 相关阅读:
    android焦点
    URI和URL的区别比较与理解
    Android Bundle类
    repo命令
    ubuntu adb找不到设备
    【python】-网络编程
    【python】-反射
    【python】-类的特殊成员方法
    【python】-7-面向对象的进阶
    【python】-多态
  • 原文地址:https://www.cnblogs.com/felixzh/p/6030568.html
Copyright © 2011-2022 走看看