zoukankan      html  css  js  c++  java
  • Java-BlockingQueue的使用

    每次都是隔很长时间才在博客中写点什么,说自己忙吧,这是给自己的一个借口,其实呢还是懒啊。哎。。。

    最近项目中有个对比的需求,需要从日志文件中获取到参数,然后调用不同的API,进行结果的对比。但是不知用什么方式比较好,于是查了下jdk的手册,发现了BlockingQueue这个好东西。

    关于BlockingQueue的介绍,大家有兴趣的可以自己看下:http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html

    需求呢其实很简单就是将参数放置到Queue中,然后交由下一个策略去消费。刚开始时是通过不同的线程往队列中存放数据,然后返回给下个服务一个BlockingQueue的对象,下一个策略从队列中消费,code如下:

    @SuppressWarnings("rawtypes")
    	@Override
    	public BlockingQueue getTxtLogContent(String path) {
    		File file = new File(path);
    		BufferedReader reader = null;
    		String tempStr = null;
    		final BlockingQueue queue = new LinkedBlockingQueue();
    		try {
    			reader = new BufferedReader(new FileReader(file));
    			while ((tempStr = reader.readLine()) != null) {
    				final InputOutputPrameters parameter = new InputOutputPrameters();
    				String[] list = tempStr.split(";");
    				if (list != null && list.length > 0) {
    					parameter.setInputParamters(list[0]);
    					parameter.setOutputParameters(list[1]);
    				}
    				new Thread(){
    					@SuppressWarnings("unchecked")
    					public void run(){
    						try {	
    							Thread.sleep((long)(Math.random()*100));
    							log.info("开始存入数据!");
    							queue.put(parameter);
    							log.info("已经存入数据,目前队列中有 " + queue.size() +" 个队列!输入参数:"+ parameter.getInputParamters() + ";
    输出参数:" + parameter.getOutputParameters());
    						} catch (Exception e) {
    							log.error("系统异常:" + e);
    						}
    					}
    				}.start();
    			}
    			reader.close();
    		} catch (FileNotFoundException  e) {
    			e.printStackTrace();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}finally {
    			 if (reader != null) {
    				 try {
    					 reader.close();
    				 } catch (Exception e) {
    					 e.printStackTrace();
    				 }
    			 }
    		 }
    		return queue;
    	}
    

      可是在实际运行时,由于日志比较大,下一个策略可能要等1hour或更长的时间才能开始处理,这明显是不符合要求的,于是又优化了下,将BlockingQueue改为全局static的,然后下一个策略可以直接监控这个队列中是否有值,有值就消费,没值就阻塞线程等待或者超时等其他处理。

    改进后的code:

    1、新建一个队列类:

    public class ParameterQueue extends LinkedBlockingQueue<InputOutputPrameters> {
    
    	/** 
    	*@Fields serialVersionUID: 
    	*/
    	private static final long serialVersionUID = 6032356446145302484L;
    	
    	private static BlockingQueue<InputOutputPrameters> queue = new LinkedBlockingQueue<InputOutputPrameters>();
    
    	/**
    	 * @Fields log: 日志记录
    	 */
    	private static final Logger log = LoggerFactory
    			.getLogger(ParameterQueue.class);
    	
    	/** 
    	 * 获取队列中的对象 
    	 * @Method:getParameter
    	 * @Description: 获取队列中的对象 
    	 * @return 获取到的对象信息
    	*/
    	public static InputOutputPrameters getParameter(){
    		InputOutputPrameters result = null;
    		try {
    			result  = (InputOutputPrameters)queue.take();
    		} catch (Exception e) {
    			log.error("获取队列异常,异常信息:" + e);
    		}
    		return result;
    	}
    	
    	/** 
    	 * 获取队列的数量
    	 * @Method:getQueueSize
    	 * @Description: 获取队列的数量 
    	 * @return 数量
    	*/
    	public static Integer getQueueSize() {
    		return queue.size();
    	}
    	
    	/** 
    	 * 放置参数到队列中 
    	 * @Method:putParameter
    	 * @Description: 放置参数到队列中 
    	 * @param parameter 要放置的对象
    	*/
    	public static void putParameter(InputOutputPrameters parameter) {
    		try {
    			queue.put(parameter);
    		} catch (Exception e) {
    			log.error("插入队列异常,异常信息:" + e);
    		}
    	}
    }
    

      2、读取文件时,直接操作该队列,往队列中put值,下一个策略从该队列中get值,put的code如下:

    public void getSource(String path) {
    		try {
    			File file = new File(path);
    			BufferedReader reader = null;
    			String tempStr = null;
    			try {
    				reader = new BufferedReader(new FileReader(file));
    				while ((tempStr = reader.readLine()) != null) {
    					final InputOutputPrameters parameter = new InputOutputPrameters();
    					String[] list = tempStr.split(";");
    					if (list != null && list.length > 0) {
    						parameter.setInputParamters(list[0]);
    						parameter.setOutputParameters(list[1]);
    					}
    					putInQueue(parameter);
    				}
    				reader.close();
    			} catch (FileNotFoundException  e) {
    				e.printStackTrace();
    			} catch (IOException e) {
    				e.printStackTrace();
    			}finally {
    				 if (reader != null) {
    					 try {
    						 reader.close();
    					 } catch (Exception e) {
    						 e.printStackTrace();
    					 }
    				 }
    			 }
    		} catch (Exception e) {
    			log.error("系统异常: " + e);
    		}
    	}
    
    	/** 
    	 * 将参数存放至队列中 
    	 * @Method:putInQueue
    	 * @Description: 将参数存放至队列中 
    	 * @param parameter 要存放的对象
    	*/
    	private void putInQueue(final InputOutputPrameters parameter) {
    		new Thread(){
    			public void run(){
    				try {	
    					Thread.sleep((long)(Math.random()*100));
    					log.info("开始存入数据!");
    					ParameterQueue.putParameter(parameter);
    					log.info("已经存入数据,目前队列中有 " + ParameterQueue.getQueueSize() +" 个队列!输入参数:"+ parameter.getInputParamters() + ";
    输出参数:" + parameter.getOutputParameters());
    				} catch (Exception e) {
    					log.error("系统异常:" + e);
    				}
    			}
    		}.start();
    	}
    

      

    于是这个要求就达到了。记录下这个小需求,方便以后查阅。

    简要说下,BlockingQueue是线程安全的,常用的是ArrayBlockingQueue、LinkedBlockingQueue

    ArrayBlockingQueue需要制定容量,而LinkedBlockingQueue不需要

    同时在消费时,take()是会阻塞线程的,如果是单线程跑时,take()不到时整个线程就卡了

    所以看具体环境需求,是用take还是其他的,我一般用poll,因为可以制定超时时间。

    哎 不知道怎么写了,就这样吧。

  • 相关阅读:
    HDU 4005 The war
    #undef
    [转载] #define new DEBUG_NEW
    [转载]常用正则表达式
    [百科]
    [转载]
    [转载]
    [转载]
    [百科]
    [转载]VC6中的文件后缀
  • 原文地址:https://www.cnblogs.com/jasonjiang/p/4643726.html
Copyright © 2011-2022 走看看