zoukankan      html  css  js  c++  java
  • JAVA进阶-多线程(2)

    堵塞队列:
    1)BlockingQueue该接口提供了:
    add()/remove() 假设当队列没有数据,从队列中取数据;或者队列中数据已满,
    向队列中加入数据;则会抛出异常.
    put()/take() 假设当队列没有数据,从队列中取数据;或者队列中数据已满,
    向队列中加入数据;则会形成堵塞.
    offer()/poll() 会给调用者返回特殊的值,开发人员能够通过这些值做对应的处理
    同一时候还提供了超时版本号. 
    2)接口实现
    ArrayBlockingQueue>由数组实现的有界队列,默认情况下没有指定公平策略(也就是
    一般的FIFO先进先出策略),假设不启动策略,会导致共享资源被贪婪的线程长时间占有,
    而无法获取资源的线程可能死掉,这样的情况称为饿死;
    LinkedBlockingQueue>将最大的容量变为可选,默认的容量为整型最大值,也就是不存在
    生产者生产增加队列时产生堵塞的情况.该队列一般在要求较低的情况下使用.
    PriorityBlockingQueue>无界队列,由线程对象的优先级决定获取cpu操作时间,同一时候,
    开发着也能够提供自己的比較器,比方同样扩展同样优先级的线程.
    DelayedQueue>是用类似栈维护的特殊的优先级队列.
    1.检索前指定时间内保持驻留在队列中.
    2.依照驻留时间排序,最长驻留时间位于底部.
    3.仅仅同意检索过期后的对象,当队列中没有过期对象.poll返回null,peek
     则获取栈顶的对象.
    SynchronousQueue>实现了每一个插入操作都必须等待相应的移除操作;队列始终为空,
    当,发现队列有东西,就会有相应的消费着瞬间消费这些东西;
    TransferQueue>该接口扩展了BlockingQueue.而且LinkedTransferQueue提供了
    接口的详细实现;该接口扩展了BlockingQueue的put方法为transfer(),该方法
    为超时的非堵塞调用.同一时候,该接口提供了获取等待消费者的数量检測.
    ---------------

    /**
     *		
     *
     * 	@author Lean  @date:2014-9-28  
     */
    public class StockExchange {
    
    	public static void main(String[] args) {
    		BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>();
    		
    		Saller saller=new Saller(queue);
    		Buyer buyer=new Buyer(queue);
    		Thread[] sallerThreads=new Thread[20];
    		Thread[] buyerThreads=new Thread[20];
    		for (int i = 0; i <sallerThreads.length; i++) {
    			sallerThreads[i]=new Thread(saller);
    			sallerThreads[i].start();
    			buyerThreads[i]=new Thread(buyer);
    			buyerThreads[i].start();
    		}
    		try {
    			Thread.sleep(20);
    		} catch (InterruptedException e) {
    		}
    		System.out.println("all thread interrupt!");
    		for (Thread thread : sallerThreads) {
    			thread.interrupt();
    		}
    		for (Thread thread : buyerThreads) {
    			thread.interrupt();
    		}
    	}
    	
    	static class Saller implements Runnable{
    
    		private BlockingQueue<Integer> mQueue;
    		private boolean shutDownRequest;
    		
    		public Saller(BlockingQueue<Integer> queue){
    			mQueue=queue;
    		}
    		
    		@Override
    		public void run() {
    			while (shutDownRequest==false) {
    				int quantity=(int)(Math.random()*100);
    				try {
    					mQueue.put(quantity);
    //					System.out.println("saller order by Thread:"+Thread.currentThread().getName()+"  quantity:"+quantity);
    				} catch (InterruptedException e) {
    					shutDownRequest=true;
    				}
    			}
    		}
    		
    	}
    	
    	static class Buyer implements Runnable{
    		
    		private BlockingQueue<Integer> mQueue;
    		private boolean shutDownRequest;
    		
    		public Buyer(BlockingQueue<Integer> queue){
    			mQueue=queue;
    		}
    		
    		@Override
    		public void run() {
    			while (shutDownRequest==false) {
    				try {
    					System.out.println("buyer order by Thread:"+Thread.currentThread().getName()+"  quantity:"+mQueue.take());
    				} catch (InterruptedException e) {
    					shutDownRequest=true;
    				}
    			}
    		}
    		
    	}
    	
    }
    
    ---------------

    ---------------

    /**
     *		
     * 	@author Lean  @date:2014-9-28  
     */
    public class LuckyNumberGenerator {
    	
    	public static void main(String[] args) {
    		TransferQueue<String> queue=new LinkedTransferQueue<String>();
    		Thread producerThread=new Thread(new Producer(queue));
    		producerThread.setDaemon(true);
    		producerThread.start();
    		
    		for (int i = 0; i < 20; i++) {
    			Thread comsumerThread=new Thread(new Comsumer(queue));
    			comsumerThread.setDaemon(true);
    			comsumerThread.start();
    			try {
    				Thread.sleep(2000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		System.out.println(Thread.currentThread().getThreadGroup().activeCount());
    	}
    	
    	static class Producer implements Runnable{
    
    		private TransferQueue<String> mQueue;
    		
    		public Producer(TransferQueue<String> queue){
    			this.mQueue=queue;
    		}
    		
    		public String product(){
    			return "your lucky number is: "+((int)(Math.random()*100));
    		}
    		
    		@Override
    		public void run() {
    			while (true) {
    				try {
    					if (mQueue.hasWaitingConsumer()) {
    							mQueue.put(product());
    					}
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		
    	}
    	
    	static class Comsumer implements Runnable{
    
    		private TransferQueue<String> mQueue;
    		
    		public Comsumer(TransferQueue<String> queue){
    			this.mQueue=queue;
    		}
    		
    		@Override
    		public void run() {
    			try {
    				System.out.println(mQueue.take());
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		
    	}
    	
    	
    	
    }
    
    ---------------


    同步器:

    1)信号量Semaphore
    >指定代理个数,在某一时间内,查看当前是否有代理处理事情,
    处理完事件,释放代理;

    /**
     * 		
     * 
     * @author Lean
     */
    public class Bank {
    
    	private static final int COUNT=100;
    	private static final Semaphore semaphore=new Semaphore(2,true);
    	
    	public static void main(String[] args) {
    		for (int i = 0; i < COUNT; i++) {
    			final int count=i;
    			new Thread(){
    				@Override
    				public void run() {
    					try {
    						if (semaphore.tryAcquire(10, TimeUnit.MILLISECONDS)) {
    							try {
    								Teller.getService(count);
    							}finally{
    								semaphore.release();
    							}
    						}
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				};
    			}.start();
    		}
    	}
    	
    	static class Teller{
    		public static void getService(int i){
    			System.out.println("serving:"+i);
    			try {
    				Thread.sleep((long)(Math.random()*10));
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	
    
    }
    


    2)屏障CyclicBarrier>指多个线程到达某个点后停止运行(调用CyclicBarrier对象的
    wawit()方法)当多个任务(到达构造參数的指定的个数)达到指定的位置后,运行CyclicBarrier构造參数的Runnable;

    /**
     *		屏障(会合点)
     *		sample:计算平方和
     * 	@author Lean  @date:2014-9-29  
     */
    public class CalculateSum {
    	
    	public static final int COUNT=3;
    	public static int[] tempArray=new int[COUNT];
    	
    	public static void main(String[] args) {
    		CyclicBarrier barrier=new CyclicBarrier(COUNT,new Runnable() {
    			
    			@Override
    			public void run() {
    				int sum=0;
    				for (int i = 0; i < COUNT; i++) {
    					sum=sum+tempArray[i];
    				}
    				System.out.println("the result is:"+sum);
    			}
    		});
    		for (int i = 0; i <COUNT; i++) {
    			new Thread(new Square(i,barrier)).start();
    		}
    		System.out.println("caculate now...");
    	}
    	
    	static class Square implements Runnable{
    
    		private int initSize;
    		private CyclicBarrier barrier;
    		
    		public Square(int initSize,CyclicBarrier barrier){
    			this.initSize=initSize;
    			this.barrier=barrier;
    		}
    		
    		@Override
    		public void run() {
    			int result=initSize*initSize;
    			tempArray[initSize]=result;
    			try {
    				barrier.await();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			} catch (BrokenBarrierException e) {
    				e.printStackTrace();
    			}
    		}
    		
    		
    	}
    	
    	
    }
    


    3)倒计数闭锁CountDownLatch>构造CountDownLatch的时候指定倒数个数,调用await()会使其后面的代码堵塞

    调用countDown(),倒数-1,当倒数为0时,运行CountDownLatch对象await()后的代码.相比于CyclicBarrier,

    CountDownLatch提供了手动控制屏蔽,比較灵活


    /**
     *
     * 	@author Lean  @date:2014-9-29  
     */
    public class EnhancedStockExchange {
    	
    	public static void main(String[] args) {
    		BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>();
    		CountDownLatch startLatch=new CountDownLatch(1);
    		final CountDownLatch stopLatch=new CountDownLatch(200);
    		Producer producer=new Producer(startLatch, stopLatch, queue);
    		Saller saller=new Saller(startLatch, stopLatch, queue);
    		Thread[] sellerThreads=new Thread[100];
    		for (int i = 0; i < sellerThreads.length; i++) {
    			sellerThreads[i]=new Thread(saller);
    			sellerThreads[i].start();
    		}
    		Thread[] producerThreads=new Thread[100];
    		for (int i = 0; i < producerThreads.length; i++) {
    			producerThreads[i]=new Thread(producer);
    			producerThreads[i].start();
    		}
    		//倒数闭锁,当前倒数为1,运行例如以下函数,倒数0;
    		startLatch.countDown();
    		
    		new Thread(new Runnable() {
    			
    			@Override
    			public void run() {
    				try {
    					//运行await(),暂停直至倒数器为0
    					stopLatch.await();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				System.out.println("all thread countdown!");
    			}
    		}).start();
    		
    		
    		try {
    			Thread.sleep(20);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println("Terminating...");
    		//运行interrupt(),运行while语句后的mStopLatch.countDown();倒数为1
    		for (Thread thread : sellerThreads) {
    			thread.interrupt();
    		}
    		for (Thread thread : producerThreads) {
    			thread.interrupt();
    		}
    		//倒数为0,运行run()方法内await()后的代码;
    		stopLatch.countDown();
    	}
    	
    	
    	static class Producer implements Runnable{
    		
    		public CountDownLatch mStartLatch;
    		public CountDownLatch mStopLatch;
    		private BlockingQueue<Integer> mQueue;
    		private boolean shutDownRequest;
    		
    		public Producer(CountDownLatch startLatch,CountDownLatch stopLatch,BlockingQueue<Integer> queue){
    			mStartLatch=startLatch;
    			mStopLatch=stopLatch;
    			mQueue=queue;
    		}
    		
    		@Override
    		public void run() {
    			try {
    				mStartLatch.await();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			while (shutDownRequest==false) {
    				try {
    					mQueue.put((int)(Math.random()*(100)));
    				} catch (InterruptedException e) {
    					shutDownRequest=true;
    				}
    			}
    			mStopLatch.countDown();
    		}
    		
    	}
    	
    	static class Saller implements Runnable{
    		
    		public CountDownLatch mStartLatch;
    		public CountDownLatch mStopLatch;
    		private BlockingQueue<Integer> mQueue;
    		private boolean shutDownRequest;
    		
    		public Saller(CountDownLatch startLatch,CountDownLatch stopLatch,BlockingQueue<Integer> queue){
    			mStartLatch=startLatch;
    			mStopLatch=stopLatch;
    			mQueue=queue;
    		}
    		
    		@Override
    		public void run() {
    			try {
    				mStartLatch.await();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			while (shutDownRequest==false) {
    				try {
    					System.out.println("saller comsume: "+mQueue.take());
    				} catch (InterruptedException e) {
    					shutDownRequest=true;
    				}
    			}
    			mStopLatch.countDown();
    		}
    		
    	}
    	
    }
    




    4)移相器Phaser>实现屏障一样的功能,相比于屏障和倒计数闭锁,Phaser实例manager
    提供了可伸缩的等待数目.

    在执行的过程中,动态添加拦截数可调用manager.register();当调用manager.arriveAndDeregister()时,当前全部

    等待线程继续运行;在线程运行中,可调用manager.arriveAndAwaitAdvance();

    等待其它线程;同一时候我们能够调用manager.getArrivedParties()查看等待线程数;

    /**
     *
     * 	@author Lean  @date:2014-9-29  
     */
    public class HorseRace {
    
    	private final int NUMBER_OF_HORSE=12;
    	private static final int INIT_PARTIES=1;
    	private static final Phaser manager=new Phaser(INIT_PARTIES);
    	
    	public static void main(String[] args) {
    		//检查准备就绪的马匹数量
    		Thread raceMonitor=new Thread(new RaceMonitor());
    		raceMonitor.setDaemon(true);
    		raceMonitor.start();
    		
    		new HorseRace().managerRace();
    		
    	}
    	
    
    	private void managerRace() {
    		ArrayList<Horse> horses=new ArrayList<HorseRace.Horse>();
    		for (int i = 0; i < NUMBER_OF_HORSE; i++) {
    			horses.add(new Horse());
    		}
    		runRace(horses);
    	}
    
    	private void runRace(Iterable<Horse> horses) {
    		for (final Horse horse : horses) {
    			manager.register();
    			new Thread(){
    				@Override
    				public void run() {
    					try {
    						Thread.sleep((new Random()).nextInt(1000));
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    					manager.arriveAndAwaitAdvance();
    					horse.run();
    				};
    			}.start();
    		}
    		try {
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		manager.arriveAndDeregister();
    	}
    
    
    
    
    	private static class RaceMonitor implements Runnable{
    
    		@Override
    		public void run() {
    			while (true) {
    //				System.out.println("number of horses to run:"+HorseRace.manager.getArrivedParties());
    				try {
    					Thread.sleep(1);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		
    	}
    	
    	private static class Horse implements Runnable{
    		
    		private static final AtomicInteger idSource=new AtomicInteger();
    		private final int id=idSource.incrementAndGet();
    		
    		@Override
    		public void run() {
    			System.out.println(toString()+" is running");
    		}
    
    		@Override
    		public String toString() {
    			return "Horse [id=" + id + "]";
    		}
    		
    	}
    	
    }
    

    5)交换器Exchanger<T>
    类型T为两个线程交换的对象,在某些同样操作的批量编程中,当中一类线程
    负责生产对象,还有一类编程负责消耗对象,对于线程间共享数据,前面介绍了锁
    的定义,当我们使用JAVA提供的Exchanger<T>传输对象,不须要锁的概念.
    buffers=ProductExchange.exchanger.exchange(buffers, 1000,TimeUnit.MILLISECONDS);
    该对象的exchange方法參数传递了该线程其它线程的数据,并返回了其它线程返回的数据

    /**
     *
     * 	@author Lean  @date:2014-9-29  
     */
    public class ProductExchange {
    	
    	public static Exchanger<ArrayList<Integer>> exchanger=new Exchanger<ArrayList<Integer>>();
    	
    	public static void main(String[] args) {
    		Thread producerThread=new Thread(new Producer());
    		Thread comsumeThread=new Thread(new Comsume());
    		producerThread.start();
    		comsumeThread.start();
    		try {
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    
    		producerThread.interrupt();
    		comsumeThread.interrupt();
    
    	}
    	
    	private static class Producer implements Runnable{
    		
    		private static ArrayList<Integer> buffers=new ArrayList<Integer>();
    		private boolean okToRun=true;
    		
    		@Override
    		public void run() {
    			while (okToRun) {
    				try {
    					if (buffers.isEmpty()) {
    						for (int i = 0; i <10; i++) {
    							buffers.add((int)(Math.random()*100));
    						}
    						Thread.sleep(200);
    						for (int i : buffers) {
    							System.out.print(i+" ,");
    						}
    						System.out.println("");
    						buffers=ProductExchange.exchanger.exchange(buffers, 1000,TimeUnit.MILLISECONDS);
    					}
    				} catch (InterruptedException e) {
    					okToRun=false;
    				} catch (TimeoutException e) {
    					System.out.println("produce time out!");
    				}
    			}
    		}
    		
    	}
    	
    	private static class Comsume implements Runnable{
    		
    		private static ArrayList<Integer> buffers=new ArrayList<Integer>();
    		private boolean okToRun=true;
    		
    		@Override
    		public void run() {
    			while (okToRun) {
    				try {
    					if (buffers.isEmpty()) {
    						buffers=ProductExchange.exchanger.exchange(buffers);
    						for (int i : buffers) {
    							System.out.print(i+" ,");
    						}
    						System.out.println("");
    						Thread.sleep(200);
    						buffers.clear();
    					}
    				} catch (InterruptedException e) {
    					okToRun=false;
    				}
    			}
    		}
    		
    	}
    	
    }
    

  • 相关阅读:
    mongodb协议透传
    [转]PyInstaller2的信息文件Version的生成
    [转]使用PyInstaller2将Python脚本转化为可执行文件(中使用部分)
    Cache应用(sql依赖缓存)
    关于Cookie与Session的疑问解答
    ADO.NET Entity Framework
    WPF中的画笔功能,实现直实线、弯实线、直虚线、弯虚线
    Singleton模式之多线程
    控件回发系列一(IPostBackEventHandler)
    使用VS2010创建EntityDataModel出错
  • 原文地址:https://www.cnblogs.com/liguangsunls/p/7264368.html
Copyright © 2011-2022 走看看