zoukankan      html  css  js  c++  java
  • 【T】并行调度

    
    
     1 /** 
     2          * 并行调度相关处理 
     3          *  
     4          * 按卫星*日期 ,将待处理的任务分解为 卫星+日期 粒度的子任务 添加到paramMapList列表中 
     5          */  
     6         List<Map<String, Object>> paramMapList = new ArrayList<Map<String, Object>>();  
     7         for (Iterator<OrderParamSatellite> iterator = paramSatellites.iterator(); iterator.hasNext();) {  
     8             OrderParamSatellite paramSatellite = iterator.next();  
     9               
    10             paramMapList.addAll(this.getParamMapList(paramSatellite));  
    11         }  
    12   
    13   
    14   
    15         //根据集群最大处理能力,分页处理任务列表,作为list截取的步长  
    16           
    17         int fsize = HostServerQueue.getInstance().freeSize();  
    18         for(int i=0;i<paramMapList.size();i=i+fsize){  
    19             List<Map<String, Object>> tl = BXexample.getSubListPage(paramMapList, i,  fsize);  
    20             //并行调度  
    21             BXexample.BXfunction(tl,new ExectueCallBack(){  
    22                     public void doExectue(Object executor) throws Exception {  
    23                         ExecuteOrderBTask((Map<String, Object>)executor);  
    24                     }  
    25             });  
    26               
    27             //动态查找空闲节点数量,即集群最大处理能力  
    28             fsize = HostServerQueue.getInstance().freeSize();  
    29         } 
    
    
    
     1 package com.zlg.cobarclient;
     2 
     3 import java.util.ArrayList;  
     4 import java.util.Collection;  
     5 import java.util.Iterator;  
     6 import java.util.List;  
     7 import java.util.Map;  
     8 import java.util.concurrent.BlockingQueue;  
     9 import java.util.concurrent.CountDownLatch;  
    10 import java.util.concurrent.ExecutorService;  
    11 import java.util.concurrent.LinkedBlockingQueue;  
    12 import java.util.concurrent.ThreadFactory;  
    13 import java.util.concurrent.ThreadPoolExecutor;  
    14 import java.util.concurrent.TimeUnit;  
    15   
    16 import org.apache.commons.lang.exception.ExceptionUtils;  
    17 import org.springframework.dao.ConcurrencyFailureException;  
    18 
    19 public class BXexample {
    20     private static ExecutorService createCustomExecutorService(int poolSize, final String method) {  
    21         int coreSize = Runtime.getRuntime().availableProcessors();//返回系统CUP数量  
    22         if (poolSize < coreSize) {  
    23             coreSize = poolSize;  
    24         }  
    25         ThreadFactory tf = new ThreadFactory() {  
    26             public Thread newThread(Runnable r) {  
    27                 Thread t = new Thread(r, "thread created at BXexample method [" + method + "]");  
    28                 t.setDaemon(true);  
    29                 return t;  
    30             }  
    31         };  
    32         BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>();  
    33         final ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, poolSize, 60,  
    34                 TimeUnit.SECONDS, queueToUse, tf, new ThreadPoolExecutor.CallerRunsPolicy());  
    35 
    36         return executor;  
    37  }  
    38 
    39 public static <T> List<T> getSubListPage(List<T> list, int skip,int pageSize) {  
    40     if (list == null || list.isEmpty()) {  
    41         return null;  
    42     }  
    43     int startIndex = skip;  
    44     int endIndex = skip + pageSize;  
    45     if (startIndex > endIndex || startIndex > list.size()) {  
    46         return null;  
    47     }  
    48     if (endIndex > list.size()) {  
    49         endIndex = list.size();  
    50     }  
    51     return list.subList(startIndex, endIndex);  
    52 }  
    53   
    54   
    55 public static void BXfunction(Collection<?> paramCollection,final ExectueCallBack ecb){  
    56     //构建执行器  
    57     ExecutorService executor = createCustomExecutorService(Runtime.getRuntime().availableProcessors(), "batchExecuteProjection");  
    58     try {  
    59         //监视器  
    60         final CountDownLatch latch = new CountDownLatch(paramCollection.size());  
    61         final StringBuffer exceptionStaktrace = new StringBuffer();  
    62         Iterator<?> iter = paramCollection.iterator();  
    63         while (iter.hasNext()) {  
    64             final Object entity = iter.next();  
    65             Runnable task = new Runnable() {  
    66                 public void run() {  
    67                     try {  
    68                         ecb.doExectue(entity);  
    69                     } catch (Throwable t) {  
    70                         exceptionStaktrace.append(ExceptionUtils.getFullStackTrace(t));  
    71                     } finally {  
    72                         latch.countDown();  
    73                     }  
    74                 }  
    75             };  
    76             executor.execute(task);//并行调度  
    77         }  
    78 
    79         try {  
    80             latch.await();//监视器等待所有线程执行完毕  
    81         } catch (InterruptedException e) {  
    82             //调度异常  
    83             throw new ConcurrencyFailureException(  
    84                     "unexpected interruption when re-arranging parameter collection into sub-collections ",e);  
    85         }  
    86         if (exceptionStaktrace.length() > 0) {  
    87             //业务异常  
    88             throw new ConcurrencyFailureException(  
    89                     "unpected exception when re-arranging parameter collection, check previous log for details.
    "+ exceptionStaktrace);  
    90         }  
    91           
    92           
    93     } finally {  
    94         executor.shutdown();//执行器关闭  
    95     }  
    96 }
    97 }
    1 package com.zlg.cobarclient;
    2 
    3 public interface ExectueCallBack {
    4     void doExectue(Object executor) throws Exception;
    5 }
     1 package com.zlg.cobarclient;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 
     6 
     7 public class Hello {
     8     public static void main(String[] args) {
     9         List<String> paramCollection  = new ArrayList<String>();  
    10         paramCollection.add("9");  
    11         paramCollection.add("2");  
    12         paramCollection.add("18");  
    13         paramCollection.add("7");  
    14         paramCollection.add("6");  
    15         paramCollection.add("1");  
    16         paramCollection.add("3");  
    17         paramCollection.add("4");  
    18            paramCollection.add("14");  
    19         paramCollection.add("13");  
    20           
    21         int freesize = 3;//当前处理能力  
    22           
    23         for(int i=0;i<paramCollection.size();i=i+freesize){  
    24               
    25             List<String> tl = BXexample.getSubListPage(paramCollection, i, freesize);  
    26               
    27             BXexample.BXfunction(tl,new ExectueCallBack() {  
    28                 public void doExectue(Object executor) throws Exception {  
    29                     int k = Integer.parseInt((String)executor);  
    30       
    31                     for(int i=0;i<k*10000000;i++){  
    32                         //执行循环  
    33                     }  
    34                     System.out.println(k+":hello world");  
    35                 }  
    36             });  
    37         }
    38     }
    39 }
  • 相关阅读:
    Struts2配置文件讲解
    分布式与集群的区别
    ANDROID中FRAGMENT的两种创建方式
    一个让echarts中国地图包含省市轮廓的技巧
    图解Spark API
    对NP问题的一点感想
    laravel框架容器管理的一些要点
    RedisRepository分享和纠错
    javascript中对数据文本格式化的思考
    LazyMan深入解析和实现
  • 原文地址:https://www.cnblogs.com/flydkPocketMagic/p/6288237.html
Copyright © 2011-2022 走看看