zoukankan      html  css  js  c++  java
  • java并行调度框架封装及演示样例

    參考资料:  阿里巴巴开源项目 CobarClient  源代码实现。

    分享作者:闫建忠 

    分享时间:2014年5月7日

    ---------------------------------------------------------------------------------------

    并行调度封装类设计: BXexample.java

    package org.hdht.business.ordermanager.quartzjob;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.commons.lang.exception.ExceptionUtils;
    import org.springframework.dao.ConcurrencyFailureException;
    
    public class BXexample {
    
    	 private static ExecutorService createCustomExecutorService(int poolSize, final String method) {
    	        int coreSize = Runtime.getRuntime().availableProcessors();//返回系统CUP数量
    	        if (poolSize < coreSize) {
    	            coreSize = poolSize;
    	        }
    	        ThreadFactory tf = new ThreadFactory() {
    	            public Thread newThread(Runnable r) {
    	                Thread t = new Thread(r, "thread created at BXexample method [" + method + "]");
    	                t.setDaemon(true);
    	                return t;
    	            }
    	        };
    	        BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>();
    	        final ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, poolSize, 60,
    	        		TimeUnit.SECONDS, queueToUse, tf, new ThreadPoolExecutor.CallerRunsPolicy());
    
    	        return executor;
    	 }
    
    	public static <T> List<T> getSubListPage(List<T> list, int skip,int pageSize) {
    		if (list == null || list.isEmpty()) {
    			return null;
    		}
    		int startIndex = skip;
    		int endIndex = skip + pageSize;
    		if (startIndex > endIndex || startIndex > list.size()) {
    			return null;
    		}
    		if (endIndex > list.size()) {
    			endIndex = list.size();
    		}
    		return list.subList(startIndex, endIndex);
    	}
    	
    	
    	public static void BXfunction(Collection<?> paramCollection,final ExectueCallBack ecb){
    		//构建运行器
    		ExecutorService executor = createCustomExecutorService(Runtime.getRuntime().availableProcessors(), "batchExecuteProjection");
    		try {
    			//监视器
    			final CountDownLatch latch = new CountDownLatch(paramCollection.size());
    			final StringBuffer exceptionStaktrace = new StringBuffer();
    			Iterator<?> iter = paramCollection.iterator();
    			while (iter.hasNext()) {
    				final Object entity = iter.next();
    				Runnable task = new Runnable() {
    					public void run() {
    						try {
    							ecb.doExectue(entity);
    						} catch (Throwable t) {
    							exceptionStaktrace.append(ExceptionUtils.getFullStackTrace(t));
    						} finally {
    							latch.countDown();
    						}
    					}
    				};
    				executor.execute(task);//并行调度
    			}
    
    			try {
    				latch.await();//监视器等待全部线程运行完成
    			} catch (InterruptedException e) {
    				//调度异常
    				throw new ConcurrencyFailureException(
    						"unexpected interruption when re-arranging parameter collection into sub-collections ",e);
    			}
    			if (exceptionStaktrace.length() > 0) {
    				//业务异常
    				throw new ConcurrencyFailureException(
    						"unpected exception when re-arranging parameter collection, check previous log for details.
    "+ exceptionStaktrace);
    			}
    			
    			
    		} finally {
    			executor.shutdown();//运行器关闭
    		}
    	}
    	
    	
    }
    

    回调接口类设计:ExectueCallBack.java

    package org.hdht.business.ordermanager.quartzjob;
    
    public interface ExectueCallBack {
    	void doExectue(Object executor) throws Exception;
    }
    



    演示样例(hello 演示样例)

    	public static void main(String[] args) {
    
    		List<String> paramCollection  = new ArrayList<String>();
    		paramCollection.add("9");
    		paramCollection.add("2");
    		paramCollection.add("18");
    		paramCollection.add("7");
    		paramCollection.add("6");
    		paramCollection.add("1");
    		paramCollection.add("3");
    		paramCollection.add("4");
            paramCollection.add("14");
    		paramCollection.add("13");
    		
    		int freesize = 3;//当前处理能力
    		
    		for(int i=0;i<paramCollection.size();i=i+freesize){
    			
    			List<String> tl = BXexample.getSubListPage(paramCollection, i, freesize);
    			
    			BXexample.BXfunction(tl,new ExectueCallBack() {
    	            public void doExectue(Object executor) throws Exception {
    	            	int k = Integer.parseInt((String)executor);
    
    	            	for(int i=0;i<k*10000000;i++){
    	                    //运行循环
    	            	}
    	            	System.out.println(k+":hello world");
    	            }
    	        });
    			
    		}
    	}



    演示样例(实际业务应用演示样例)

    /**
    		 * 并行调度相关处理
    		 * 
    		 * 按卫星*日期 ,将待处理的任务分解为 卫星+日期 粒度的子任务 加入到paramMapList列表中
    		 */
    		List<Map<String, Object>> paramMapList = new ArrayList<Map<String, Object>>();
    		for (Iterator<OrderParamSatellite> iterator = paramSatellites.iterator(); iterator.hasNext();) {
    			OrderParamSatellite paramSatellite = iterator.next();
    			
    			paramMapList.addAll(this.getParamMapList(paramSatellite));
    		}
    
    
    
    		//依据集群最大处理能力,分页处理任务列表,作为list截取的步长
    		
    		int fsize = HostServerQueue.getInstance().freeSize();
    		for(int i=0;i<paramMapList.size();i=i+fsize){
    			List<Map<String, Object>> tl = BXexample.getSubListPage(paramMapList, i,  fsize);
    			//并行调度
    			BXexample.BXfunction(tl,new ExectueCallBack(){
    	            	public void doExectue(Object executor) throws Exception {
    	            		ExecuteOrderBTask((Map<String, Object>)executor);
    	            	}
    	        });
    			
    			//动态查找空暇节点数量,即集群最大处理能力
    			fsize = HostServerQueue.getInstance().freeSize();
    		}


  • 相关阅读:
    [置顶] windows player,wzplayerV2 for windows
    wzplayer 近期将会支持BlackBerry和WinPhone8
    wzplayerEx for android(真正硬解接口,支持加密的 player)
    ffmpeg for ios 交叉编译 (支持i686 armv7 armv7s) 包含lame支持
    ffmpeg for ios 交叉编译 (支持i686 armv7 armv7s) 包含lame支持
    编译cegcc 0.59.1
    wzplayer 近期将会支持BlackBerry和WinPhone8
    wzplayerEx for android(真正硬解接口,支持加密的 player)
    windows player,wzplayerV2 for windows(20140416)更新
    编译cegcc 0.59.1
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/5342034.html
Copyright © 2011-2022 走看看