zoukankan      html  css  js  c++  java
  • 2013-线程同步的使用

       在实际的项目开发中,很多情况下我们需要用到线程类来控制程序并发进行以提高效率,但同时也会涉及到线程安全问题。

       简单的线程控制我们可以使用Executors类来简单实现,具体可参考

       文章1: Java ScheduledThreadPoolExecutor延迟或周期性执行任务   

       文章2:使用Executors和ThreadPoolExecutor2  使用了队列

       文章3:ForkJoin-任务分拆合并处理器  JDK 7中的新功能,可以将大任务拆分处理,再将结果合并

      

       但是一些涉及业务比较多的线程控制仍然需要传统的方法实现,下面代码的背景:有大量的请求访问,请求参数有很多重复,加入缓存机制,保证线程安全

       我们通过service类和Worker类来实现,具体代码如下:

      

    package itour.cn.fare.gateway.service;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    public class ObeQueryServiceImpl implements ObeQueryService {
    
    	private static Logger log = LoggerFactory.getLogger(ObeQueryServiceImpl.class);
    	private static ObeQueryServiceImpl service = new ObeQueryServiceImpl();
    	private Map<String, Integer> avhTask = new HashMap<String, Integer>();// 线程执行任务表,保证同样的任务只执行一次
    	private Map<String, Object> avhLock = new HashMap<String, Object>();// 锁表,保证同样的请求锁定状态下只进入一个
    
    
    	private void getAvh(String key, String org, String dis, String date, String airCompany) {
    
    		synchronized (this) {
    			if (avhLock.get(key) == null) {// 添加锁,同样的请求只添加一次
    
    				avhLock.put(key, new byte[0]);
    
    				log.debug(String.format("AV create lock:%s  ", key));
    			}
    		}
    
    		synchronized (avhLock.get(key)) {
    
    			// 添加任务,保证同样的请求只执行一次
    			if (avhTask.get(key) == null) {
    
    				avhTask.put(key, new Integer(1));
    
    				ObeAvhWorker worker = new ObeAvhWorker(key, org, dis, date, airCompany);
    
    				log.debug(String.format("AV[%s]  AvWorker Start ", key));
    				worker.start();
    
    			}
    
    			try {
    
    				log.debug(String.format("AV[%s] Waitting AvWorker ", key));
    
    				avhLock.get(key).wait();// 等待run方法执行完
    
    			} catch (InterruptedException e) {
    				log.error(e.getMessage(), e);
    			}
    
    			log.debug(String.format("AV[%s] AvWorker Notified", key));
    
    		}
    	}
    
    
    	public Map<String, Integer> getAvhTask() {
    		return avhTask;
    	}
    
    	public Map<String, Object> getAvhLock() {
    		return avhLock;
    	}
    
    }
    



    package itour.cn.fare.gateway;
    
    import itour.cn.fare.gateway.service.ObeQueryServiceImpl;
    
    import java.io.Serializable;
    import java.util.List;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.travelsky.sbeclient.obe.exceptions.ObeException;
    import com.travelsky.sbeclient.obe.response.AVResponse;
    import com.travelsky.sbeclient.obe.response.AvItem;
    
    public class ObeAvhWorker extends Thread {
    
    	private static Logger log = LoggerFactory.getLogger(ObeAvhWorker.class);
    	private ObeQueryServiceImpl service = ObeQueryServiceImpl.getInstance();
    	private String key;
    	private String org;
    	private String dis;
    	private String date;
    	private String airCompany;
    
    	public ObeAvhWorker(String key, String org, String dis, String date, String airCompany) {
    		this.key = key;
    		this.org = org;
    		this.dis = dis;
    		this.date = date;
    		this.airCompany = airCompany;
    	}
    
    	@Override
    	public void run() {
    		log.debug("ObeAvhWorker Thread Start for " + key);
    		try {
    			worker();
    
    		}catch (Exception e) {
    
    			log.error(e.getMessage(), e);
    		} finally {
    			synchronized (service.getAvhLock().get(key)) {
    				service.getAvhLock().get(key).notifyAll();
    			}
    			service.getAvhTask().remove(key);
    		}
    	}
    
    	private void worker() throws Exception {
    		
    		ObeSession session = new ObeSession();
    		
    		AVResponse avr = session.avh(org, dis, date, airCompany);
    		
    		List<AvItem> avItems = avr.getAvItems();
    		// 存入缓存
    		if (avItems != null && avItems.size() > 0) {
    			if (log.isDebugEnabled())
    				log.info(String.format("Avh[%s] put into cachePool total %d", key, avItems.size()));
    			CachePoolManager.getInstance().putAvCV(key, (Serializable) avItems);
    		}
    	}
    }
    

    需要注意的是:在worker执行完任务的时候一定要记得释放锁和删除任务表中的该任务。


  • 相关阅读:
    配置SecondaryNameNode
    hadoop 根据secondary namenode恢复namenode
    Hadoop如何修改HDFS文件存储块大小
    hadoop1.2.1 datanode 由于权限无法启动 expected: rwxr-xr-x
    CentOS 7 下,如何设置DNS服务器
    Eclipse+pydev环境搭建
    Python numpy
    Leetcode#54 Spiral Matrix
    Leetcode#53 Maximum Subarray
    Leetcode#40 Combination Sum II
  • 原文地址:https://www.cnblogs.com/kuyuyingzi/p/4266315.html
Copyright © 2011-2022 走看看