zoukankan      html  css  js  c++  java
  • 读书笔记-java并发编程-核心方法与框架1-5

    • Semaphore的使用
      单词Semaphore[' seməf :(r)]的中文含义是信号、信号系统。此类的主要作用就是限制线程并发的数量,如果不限制线程并发的数量,则CPU的资源很快就被耗尽,每个线程执行的任务是相当缓慢,因为CPU要把时间片分配给不同的线程对象,而且上下文切换也要耗时,最终造成系统运行效率大幅降低,所以限制并发线程的数量还是非常有必要的
    public class Service {
        private Semaphore semaphore = new Semaphore(1);
    
        public void testMethod() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()
                        + " begin timer=" + System.currentTimeMillis());
                Thread.sleep(5000);
                System.out.println(Thread.currentThread().getName()
                        + "   end timer=" + System.currentTimeMillis());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 类Semaphore的构造函数参数permits是许可的意思,代表同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码
    • 其实还可以传入>1的许可,代表同一时间内,最多允许有x个线程可以执行acquire()和release()之间的代码。
    • 类Semaphore所提供的功能完全就是synchronized关键字的升级版
    • 当Semaphore的数量大于1时将不能保证线程安全
    • Semaphore的数量可以动态增加,当release的数量大于初始化数量时会自动增加:
        public static void main(String[] args) {
            try {
                Semaphore semaphore = new Semaphore(5);
                semaphore.acquire();
                semaphore.acquire();
                semaphore.acquire();
                semaphore.acquire();
                semaphore.acquire();
                System.out.println(semaphore.availablePermits());
                semaphore.release();
                semaphore.release();
                semaphore.release();
                semaphore.release();
                semaphore.release();
                semaphore.release();
                System.out.println(semaphore.availablePermits());
                semaphore.release(4);
                System.out.println(semaphore.availablePermits());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    结果:
    0
    6
    10
    
    
    • 方法acquireUninterruptibly()的作用是使等待进入acquire()方法的线程,不允许被中断。
    • 当调用interrupt方法时,在执行的线程不会被打断,仍然会执行完成
    • availablePermits()返回此Semaphore对象中当前可用的许可数,此方法通常用于调试,因为许可的数量有可能实时在改变,并不是固定的数量。drainPermits()可获取并返回立即可用的所有许可个数,并且将可用许可置0。
    • 方法getQueueLength()的作用是取得等待许可的线程个数。方法hasQueuedThreads()的作用是判断有没有线程在等待这个许可。这两个方法通常都是在判断当前有没有等待许可的线程信息时使用
    public void testMethod() {
    		try {
    			semaphore.acquire();
    			Thread.sleep(1000);
    			System.out.println("还有大约" + semaphore.getQueueLength() + "个线程在等待");
    			System.out.println("是否有线程正在等待信号量呢?" + semaphore.hasQueuedThreads());
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			semaphore.release();
    		}
    	}
    
    • 公平与非公平信号量: 公平信号量是获得锁的顺序与线程启动的顺序有关,但不代表100%地获得信号量,仅仅是在概率上能得到保证。而非公平信号量就是无关的了。
    • tryAcquire(): 无参方法tryAcquire()的作用是尝试地获得1个许可,如果获取不到则返回false,此方法通常与if语句结合使用,其具有无阻塞的特点。无阻塞的特点可以使线程不至于在同步处一直持续等待的状态,如果if语句判断不成立则线程会继续走else语句,程序会继续向下运行
    public class Service {
    
    	private Semaphore semaphore = new Semaphore(1);
    
    	public void testMethod() {
    		if (semaphore.tryAcquire()) {
    			System.out.println("ThreadName=" + Thread.currentThread().getName()
    					+ "首选进入!");
    			for (int i = 0; i < Integer.MAX_VALUE; i++) {
    				String newString = new String();
    				Math.random();
    			}
    			semaphore.release();
    		} else {
    			System.out.println("ThreadName=" + Thread.currentThread().getName()
    					+ "未成功进入!");
    		}
    	}
    }
    
    • 有参方法tryAcquire(int permits)的作用是尝试地获得x个许可,如果获取不到则返回false
    • 有参方法tryAcquire(int long timeout, TimeUnit unit)的作用是在指定的时间内尝试地获得1个许可,如果获取不到则返回false。
    • 有参方法tryAcquire(int permits, long timeout, TimeUnit unit)的作用是在指定的时间内尝试地获得x个许可,如果获取不到则返回false。
    • 使用Semaphore创建字符串池
    • 类Semaphore可以有效地对并发执行任务的线程数量进行限制,这种功能可以应用在pool池技术中,可以设置同时访问pool池中数据的线程数量。本实验的功能是同时有若干个线程可以访问池中的数据,但同时只有一个线程可以取得数据,使用完毕后再放回池中。
    public class ListPool {
    
    	private int poolMaxSize = 5;
    	private int semaphorePermits = 5;
    	private List<String> list = new ArrayList<String>();
    	private Semaphore concurrencySemaphore = new Semaphore(semaphorePermits);
    	private ReentrantLock lock = new ReentrantLock();
    	private Condition condition = lock.newCondition();
    
    	public ListPool() {
    		super();
    		for (int i = 0; i < poolMaxSize; i++) {
    			list.add("高洪岩" + (i + 1));
    		}
    	}
    
    	public String get() {
    		String getString = null;
    		try {
    			concurrencySemaphore.acquire();
    			lock.lock();
    			while (list.size() == 0) {
    				condition.await();
    			}
    			getString = list.remove(0);
    			lock.unlock();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		return getString;
    	}
    
    	public void put(String stringValue) {
    		lock.lock();
    		list.add(stringValue);
    		condition.signalAll();
    		lock.unlock();
    		concurrencySemaphore.release();
    	}
    
    }
    
    • 使用Semaphore实现多生产者/多消费者模式
    public class RepastService {
    
    	volatile private Semaphore setSemaphore = new Semaphore(10);// 厨师
    	volatile private Semaphore getSemaphore = new Semaphore(20);// 就餐者
    	volatile private ReentrantLock lock = new ReentrantLock();
    	volatile private Condition setCondition = lock.newCondition();
    	volatile private Condition getCondition = lock.newCondition();
    	volatile private Object[] producePosition = new Object[4];
    
    	private boolean isEmpty() {
    		boolean isEmpty = true;
    		for (int i = 0; i < producePosition.length; i++) {
    			if (producePosition[i] != null) {
    				isEmpty = false;
    				break;
    			}
    		}
    		if (isEmpty == true) {
    			return true;
    		} else {
    			return false;
    		}
    	}
    
    	private boolean isFull() {
    		boolean isFull = true;
    		for (int i = 0; i < producePosition.length; i++) {
    			if (producePosition[i] == null) {
    				isFull = false;
    				break;
    			}
    		}
    		return isFull;
    	}
    
    	public void set() {
    		try {
    			// System.out.println("set");
    			setSemaphore.acquire();// 允许同时最多有10个厨师进行生产
    			lock.lock();
    			while (isFull()) {
    				// System.out.println("生产者在等待");
    				setCondition.await();
    			}
    			for (int i = 0; i < producePosition.length; i++) {
    				if (producePosition[i] == null) {
    					producePosition[i] = "数据";
    					System.out.println(Thread.currentThread().getName()
    							+ " 生产了 " + producePosition[i]);
    					break;
    				}
    			}
    			getCondition.signalAll();
    			lock.unlock();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			setSemaphore.release();
    		}
    	}
    
    	public void get() {
    		try {
    			// System.out.println("get");
    			getSemaphore.acquire();// 允许同时最多有16个就餐者
    			lock.lock();
    			while (isEmpty()) {
    				// System.out.println("消费者在等待");
    				getCondition.await();
    			}
    			for (int i = 0; i < producePosition.length; i++) {
    				if (producePosition[i] != null) {
    					System.out.println(Thread.currentThread().getName()
    							+ " 消费了 " + producePosition[i]);
    					producePosition[i] = null;
    					break;
    				}
    			}
    			setCondition.signalAll();
    			lock.unlock();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			getSemaphore.release();
    		}
    	}
    
    }
    
    • 类Exchanger的功能可以使2个线程之间传输数据,它比生产者/消费者模式使用的wait/notify要更加方便。
    • 类Exchanger中的exchange()方法具有阻塞的特色,也就是此方法被调用后等待其他线程来取得数据,如果没有其他线程取得数据,则一直阻塞等待。
    • 当调用exchange(V x, long timeout, TimeUnit unit)方法后在指定的时间内没有其他线程获取数据,则出现超时异常

    第二章

    • 类CountDownLatch也是一个同步功能的辅助类,使用效果是给定一个计数,当使用这个CountDownLatch类的线程判断计数不为0时,则呈wait状态,如果为0时则继续运行
    • 实现等待与继续运行的效果分别需要使用await()和countDown()方法来进行。调用await()方法时判断计数是否为0,如果不为0则呈等待状态。其他线程可以调用count-Down()方法将计数减1,当计数减到为0时,呈等待的线程继续运行。而方法getCount()就是获得当前的计数个数。
    public class MyService {
    
    	private CountDownLatch down = new CountDownLatch(1);
    
    	public void testMethod() {
    		try {
    			System.out.println("A");
    			down.await();
    			System.out.println("B");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void downMethod() {
    		System.out.println("X");
    		down.countDown();
    	}
    
    }
    
    	public static void main(String[] args) throws InterruptedException {
    		MyService service = new MyService();
    		MyThread t = new MyThread(service);
    		t.start();
    		Thread.sleep(2000);
    		service.downMethod();
    	}
    只有执行了downMethod方法后,阻塞得以解除
    
    • 裁判在等全部的运动员到来:多个线程与同步点间阻塞的特性,线程必须都到达同步点后才可以继续向下运行
    public class MyThread extends Thread {
    
    	private CountDownLatch maxRuner;
    
    	public MyThread(CountDownLatch maxRuner) {
    		super();
    		this.maxRuner = maxRuner;
    	}
    
    	@Override
    	public void run() {
    		try {
    			Thread.sleep(20000);
    			maxRuner.countDown();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    
    public static void main(String[] args) throws InterruptedException {
    		CountDownLatch maxRuner = new CountDownLatch(10);
    		MyThread[] tArray = new MyThread[Integer.parseInt(""
    				+ maxRuner.getCount())];
    		for (int i = 0; i < tArray.length; i++) {
    			tArray[i] = new MyThread(maxRuner);
    			tArray[i].setName("线程" + (i + 1));
    			tArray[i].start();
    		}
    		maxRuner.await();
    		System.out.println("都回来了!");
    	}
    
    • 各就各位准备比赛
    public class MyService {
    
    	private CountDownLatch down = new CountDownLatch(1);
    
    	public void testMethod() {
    		try {
    			System.out.println(Thread.currentThread().getName() + "准备");
    			down.await();
    			System.out.println(Thread.currentThread().getName() + "结束");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void downMethod() {
    		System.out.println("开始");
    		down.countDown();
    	}
    
    }
    
    public class Run {
    	public static void main(String[] args) throws InterruptedException {
    		MyService service = new MyService();
    		MyThread[] tArray = new MyThread[10];
    		for (int i = 0; i < tArray.length; i++) {
    			tArray[i] = new MyThread(service);
    			tArray[i].setName("线程" + (i + 1));
    			tArray[i].start();
    		}
    		Thread.sleep(2000);
    		service.downMethod();
    	}
    }
    存在的问题:如果有的运动员2秒内没有准备好,仍然会开始
    
    • 改进:
    public class MyThread extends Thread {
    
    	private CountDownLatch comingTag;// 裁判等待所有运动员到来
    	private CountDownLatch waitTag;// 等待裁判说准备开始
    	private CountDownLatch waitRunTag;// 等待起跑
    	private CountDownLatch beginTag;// 起跑
    	private CountDownLatch endTag;// 所有运动员到达终点
    
    	public MyThread(CountDownLatch comingTag, CountDownLatch waitTag,
    			CountDownLatch waitRunTag, CountDownLatch beginTag,
    			CountDownLatch endTag) {
    		super();
    		this.comingTag = comingTag;
    		this.waitTag = waitTag;
    		this.waitRunTag = waitRunTag;
    		this.beginTag = beginTag;
    		this.endTag = endTag;
    	}
    
    	@Override
    	public void run() {
    		try {
    			System.out.println("运动员使用不同交通工具不同速度到达起跑点,正向这头走!");
    			Thread.sleep((int) (Math.random() * 10000));
    			System.out.println(Thread.currentThread().getName() + "到起跑点了!");
    			comingTag.countDown();
    			System.out.println("等待裁判说准备!");
    			waitTag.await();
    			System.out.println("各就各位!准备起跑姿势!");
    			Thread.sleep((int) (Math.random() * 10000));
    			waitRunTag.countDown();
    			beginTag.await();
    			System.out.println(Thread.currentThread().getName()
    					+ " 运行员起跑 并且跑赛过程用时不确定");
    			Thread.sleep((int) (Math.random() * 10000));
    			endTag.countDown();
    			System.out.println(Thread.currentThread().getName() + " 运行员到达终点");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    
    public static void main(String[] args) {
    		try {
    			CountDownLatch comingTag = new CountDownLatch(10);
    			CountDownLatch waitTag = new CountDownLatch(1);
    			CountDownLatch waitRunTag = new CountDownLatch(10);
    			CountDownLatch beginTag = new CountDownLatch(1);
    			CountDownLatch endTag = new CountDownLatch(10);
    
    			MyThread[] threadArray = new MyThread[10];
    			for (int i = 0; i < threadArray.length; i++) {
    				threadArray[i] = new MyThread(comingTag, waitTag, waitRunTag,
    						beginTag, endTag);
    				threadArray[i].start();
    			}
    			System.out.println("裁判员在等待选手的到来!");
    			comingTag.await();
    			System.out.println("裁判看到所有运动员来了,各就各位前“巡视”用时5秒");
    			Thread.sleep(5000);
    			waitTag.countDown();
    			System.out.println("各就各位!");
    			waitRunTag.await();
    			Thread.sleep(2000);
    			System.out.println("发令枪响起!");
    			beginTag.countDown();
    			endTag.await();
    			System.out.println("所有运动员到达,统计比赛名次!");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    • 方法await(long timeout, TimeUnit unit)的作用使线程在指定的最大时间单位内进入WAITING状态,如果超过这个时间则自动唤醒,程序继续向下运行。参数timeout是等待的时间,而unit参数是时间的单位
    • 类CyclicBarrier不仅有CountDownLatch所具有的功能,还可以实现屏障等待的功能,也就是阶段性同步,它在使用上的意义在于可以循环地实现线程要一起做任务的目标,而不是像类CountDownLatch一样,仅仅支持一次线程与同步点阻塞的特性
    • 类CyclicBarrier和Semaphore及CountDown-Latch一样,也是一个同步辅助类。它允许一组线程互相等待,直到到达某个公共屏障点(commonbarrier point),这些线程必须实时地互相等待,这种情况下就可以使用CyclicBarrier类来方便地实现这样的功能。CyclicBarrier类的公共屏障点可以重用,所以类的名称中有“cyclic循环”的单词。
      1)CountDownLatch作用:一个线程或者多个线程,等待另外一个线程或多个线程完成某个事情之后才能继续执行。
      2)CyclicBarrier的作用:多个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待,所以对于CyclicBarrier来说,重点是“多个线程之间”任何一个线程没有完成任务,则所有的线程都必须等待
    • 所有线程都到达同步点时再继续运行
    public class MyThread extends Thread {
    
    	private CyclicBarrier cbRef;
    
    	public MyThread(CyclicBarrier cbRef) {
    		super();
    		this.cbRef = cbRef;
    	}
    
    	@Override
    	public void run() {
    		try {
    			Thread.sleep((int) (Math.random() * 1000));
    			System.out.println(Thread.currentThread().getName() + " 到了! "
    					+ System.currentTimeMillis());
    			cbRef.await();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (BrokenBarrierException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    public static void main(String[] args) {
    		CyclicBarrier cbRef = new CyclicBarrier(5, new Runnable() {
    			public void run() {
    				System.out.println("全都到了!");
    			}
    		});
    		MyThread[] threadArray = new MyThread[5];
    		for (int i = 0; i < threadArray.length; i++) {
    			threadArray[i] = new MyThread(cbRef);
    		}
    		for (int i = 0; i < threadArray.length; i++) {
    			threadArray[i].start();
    		}
    	}
    
    
    • 线程个数大于parties数量时分批处理
    public class ThreadA extends Thread {
    
    	private CyclicBarrier cbRef;
    
    	public ThreadA(CyclicBarrier cbRef) {
    		super();
    		this.cbRef = cbRef;
    	}
    
    	@Override
    	public void run() {
    		try {
    			System.out.println(Thread.currentThread().getName() + " begin ="
    					+ System.currentTimeMillis() + " 等待凑齐2个继续运行");
    			cbRef.await();
    			System.out.println(Thread.currentThread().getName() + "   end ="
    					+ System.currentTimeMillis() + " 已经凑齐2个继续运行");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (BrokenBarrierException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    
    	public static void main(String[] args) throws InterruptedException {
    		CyclicBarrier cbRef = new CyclicBarrier(2, new Runnable() {
    			@Override
    			public void run() {
    				System.out.println("全来了!");
    			}
    		});
    
    		for (int i = 0; i < 4; i++) {
    			ThreadA threadA1 = new ThreadA(cbRef);
    			threadA1.start();
    			Thread.sleep(2000);
    		}
    	}
    
    • 类CyclicBarrier具有屏障重置性
    public class ThreadA extends Thread {
    
    	private CyclicBarrier cbRef;
    
    	public ThreadA(CyclicBarrier cbRef) {
    		super();
    		this.cbRef = cbRef;
    	}
    
    	@Override
    	public void run() {
    		try {
    			cbRef.await();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (BrokenBarrierException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    
    	public static void main(String[] args) throws InterruptedException {
    		CyclicBarrier cbRef = new CyclicBarrier(2);
    
    		ThreadA threadA1 = new ThreadA(cbRef);
    		threadA1.start();
    		Thread.sleep(500);
    		System.out.println(cbRef.getNumberWaiting());
    
    		ThreadA threadA2 = new ThreadA(cbRef);
    		threadA2.start();
    		Thread.sleep(500);
    		System.out.println(cbRef.getNumberWaiting());
    
    		ThreadA threadA3 = new ThreadA(cbRef);
    		threadA3.start();
    		Thread.sleep(500);
    		System.out.println(cbRef.getNumberWaiting());
    
    		ThreadA threadA4 = new ThreadA(cbRef);
    		threadA4.start();
    		Thread.sleep(500);
    		System.out.println(cbRef.getNumberWaiting());
    
    	}
    
    
    • 方法reset()的作用是重置屏障
    public class MyService {
    
    	public CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
    		@Override
    		public void run() {
    			System.out.println("                        彻底结束了 "
    					+ System.currentTimeMillis());
    		}
    	});
    
    	public void testMethod() {
    		try {
    			System.out.println(Thread.currentThread().getName() + " 准备!"
    					+ System.currentTimeMillis());
    			cyclicBarrier.await();
    			System.out.println(Thread.currentThread().getName() + " 结束!"
    					+ System.currentTimeMillis());
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (BrokenBarrierException e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    
    	public static void main(String[] args) throws InterruptedException {
    		MyService service = new MyService();
    		MyThreadA a = new MyThreadA(service);
    		a.setName("A");
    		MyThreadB b = new MyThreadB(service);
    		b.setName("B");
    		// 线程C未实例化
    		a.start();
    		b.start();
    
    		Thread.sleep(2000);
    		service.cyclicBarrier.reset();
    
    	}
    

    第三章 Phaser的使用

    • 方法arriveAndAwaitAdvance()的作用与CountDownLatch类中的await()方法大体一样,通过从方法的名称解释来看,arrive是到达的意思,wait是等待的意思,而advance是前进、促进的意思,所以执行这个方法的作用就是当前线程已经到达屏障,在此等待一段时间,等条件满足后继续向下一个屏障继续执行。
    public class PrintTools {
    
    	public static Phaser phaser;
    
    	public static void methodA() {
    		System.out.println(Thread.currentThread().getName() + " A1 begin="
    				+ System.currentTimeMillis());
    		phaser.arriveAndAwaitAdvance();
    		System.out.println(Thread.currentThread().getName() + " A1   end="
    				+ System.currentTimeMillis());
    
    		System.out.println(Thread.currentThread().getName() + " A2 begin="
    				+ System.currentTimeMillis());
    		phaser.arriveAndAwaitAdvance();
    		System.out.println(Thread.currentThread().getName() + " A2   end="
    				+ System.currentTimeMillis());
    	}
    
    	public static void methodB() {
    		try {
    			System.out.println(Thread.currentThread().getName() + " A1 begin="
    					+ System.currentTimeMillis());
    			Thread.sleep(5000);
    			phaser.arriveAndAwaitAdvance();
    			System.out.println(Thread.currentThread().getName() + " A1   end="
    					+ System.currentTimeMillis());
    
    			System.out.println(Thread.currentThread().getName() + " A2 begin="
    					+ System.currentTimeMillis());
    			Thread.sleep(5000);
    			phaser.arriveAndAwaitAdvance();
    			System.out.println(Thread.currentThread().getName() + " A2   end="
    					+ System.currentTimeMillis());
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    
    public static void main(String[] args) {
    		Phaser phaser = new Phaser(3);
    		PrintTools.phaser = phaser;
    
    		ThreadA a = new ThreadA(phaser);
    		a.setName("A");
    		a.start();
    
    		ThreadB b = new ThreadB(phaser);
    		b.setName("B");
    		b.start();
    
    		ThreadC c = new ThreadC(phaser);
    		c.setName("C");
    		c.start();
    	}
    因为C调用methodB执行时间比较长,所以AB都要等待C执行到arriveAndAwaitAdvance()时才能凑够3个,继续往下执行
    
    • 方法arriveAndDeregister()的作用是使当前线程(运动员)退出比赛,并且使parties值减1
    public class PrintTools {
    
    	public static Phaser phaser;
    
    	public static void methodA() {
    		System.out.println(Thread.currentThread().getName() + " A1 begin="
    				+ System.currentTimeMillis());
    		phaser.arriveAndAwaitAdvance();
    		System.out.println(Thread.currentThread().getName() + " A1   end="
    				+ System.currentTimeMillis());
    
    		System.out.println(Thread.currentThread().getName() + " A2 begin="
    				+ System.currentTimeMillis());
    		phaser.arriveAndAwaitAdvance();
    		System.out.println(Thread.currentThread().getName() + " A2   end="
    				+ System.currentTimeMillis());
    	}
    
    	public static void methodB() {
    		try {
    			System.out.println(Thread.currentThread().getName() + " A1 begin="
    					+ System.currentTimeMillis());
    			Thread.sleep(5000);
    			System.out.println("A:" + phaser.getRegisteredParties());
    			phaser.arriveAndDeregister();
    			System.out.println("B:" + phaser.getRegisteredParties());
    			System.out.println(Thread.currentThread().getName() + " A1   end="
    					+ System.currentTimeMillis());
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    
    

    第四章 Executor与ThreadPoolExecutor的使用

    • Executor与ThreadPoolExecutor的使用
    • 使用Executors类的newCachedThreadPool()方法创建的是无界线程池,可以进行线程自动回收。所谓的“无界线程池”就是池中存放线程个数是理论上的Integer.MAX_VALUE最大值。
    • 使用newCachedThreadPool (ThreadFactory)定制线程工厂
    public class MyThreadFactory implements ThreadFactory {
    
    	public Thread newThread(Runnable r) {
    		Thread thread = new Thread(r);
    		thread.setName("定制池中的线程对象的名称" + Math.random());
    		return thread;
    	}
    }
    
    public static void main(String[] args) {
    		MyThreadFactory threadFactory = new MyThreadFactory();
    		ExecutorService executorService = Executors
    				.newCachedThreadPool(threadFactory);
    		executorService.execute(new Runnable() {
    			public void run() {
    				System.out.println("我在运行" + System.currentTimeMillis() + " "
    						+ Thread.currentThread().getName());
    			}
    		});
    	}
    
    
    • 使用newFixedThreadPool(int)方法创建有界线程池
    • 使用newFixedThreadPool(int, ThreadFactory)定制线程工厂
    public class MyThreadFactory implements ThreadFactory {
    
    	public Thread newThread(Runnable r) {
    		Thread thread = new Thread(r);
    		thread.setName("定制池中的线程对象的名称" + Math.random());
    		return thread;
    	}
    }
    
    public class Run {
    	public static void main(String[] args) {
    		MyThreadFactory threadFactory = new MyThreadFactory();
    		ExecutorService executorService = Executors.newFixedThreadPool(2,
    				threadFactory);
    		Runnable runnable = new Runnable() {
    			public void run() {
    				try {
    					System.out.println("begin 我在运行"
    							+ System.currentTimeMillis() + " "
    							+ Thread.currentThread().getName());
    					Thread.sleep(3000);
    					System.out.println("  end 我在运行"
    							+ System.currentTimeMillis() + " "
    							+ Thread.currentThread().getName());
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		};
    		executorService.execute(runnable);
    		executorService.execute(runnable);
    		executorService.execute(runnable);
    	}
    }
    
    • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, longkeepAliveTime, TimeUnit unit, BlockingQueueworkQueue)
      参数解释如下:
      ❑ corePoolSize:池中所保存的线程数,包括空闲线程,也就是核心池的大小。
      ❑ maximumPoolSize:池中允许的最大线程数。
      ❑ keepAliveTime:当线程数量大于corePoolSize值时,在没有超过指定的时间内是不从线程池中将空闲线程删除的,如果超过此时间单位,则删除。
      ❑ unit:keepAliveTime参数的时间单位。
      ❑ workQueue:执行前用于保持任务的队列。此队列仅保持由execute方法提交的Runnable任务。

    • BlockingQueue只是一个接口,常用的实现类有LinkedBlockingQueue和ArrayBlocking-Queue。用LinkedBlockingQueue的好处在于没有大小限制,优点是队列容量非常大,所以执行execute()不会抛出异常,而线程池中运行的线程数也永远不会超过corePoolSize值,因为其他多余的线程被放入LinkedBlockingQueue队列中,keepAliveTime参数也就没有意义了。

    为了更好地理解这些参数在使用上的一些关系,可以将它们进行详细化的注释:
    1)A代表execute(runnable)欲执行的runnable的数量;
    2)B代表corePoolSize;
    3)C代表maximumPoolSize;
    4)D代表A-B(假设A>=B);
    5)E代表new LinkedBlockingDeque();队列,无构造参数;
    6)F代表SynchronousQueue队列;
    7)G代表keepAliveTime。
    构造方法中5个参数之间都有使用上的关系,
    在使用线程池的过程中大部分会出现如下5种过程:
    如果A<=B,那么马上创建线程运行这个任务,并不放入扩展队列Queue中,其他参数功能忽略;
    如果A>B&&A<=C&&E,则C和G参数忽略,并把D放入E中等待被执行;
    如果A>B&&A<=C&&F,则C和G参数有效,并且马上创建线程运行这些任务,而不把D放入F中,D执行完任务后在指定时间后发生超时时将D进行清除;
    如果A>B&&A>C&&E,则C和G参数忽略,并把D放入E中等待被执行;
    如果A>B&&A>C&&F,则处理C的任务,其他任务则不再处理抛出异常。
    链表队列和同步队列的区别:
    1.当任务数小于核心线程数,两者表现相同
    2.当任务数大于核心线程数,小于最大线程数,链表队列会将多出的任务放到队列,同步队列会创建新的线程执行,不放入队列
    3.当任务数大于最大线程数,链表队列将大于核心线程数的任务放到队列,同步队列只会处理最大线程数的任务,其余任务跑出异常

    • 方法shutdown()和shutdownNow()与返回值

    方法shutdown()的作用是使当前未执行完的线程继续执行,而不再添加新的任务Task,还有shutdown()方法不会阻塞,调用shutdown()方法后,主线程main就马上结束了,而线程池会继续运行直>到所有任务执行完才会停止。如果不调用shutdown()方法,那么线程池会一直保持下去,以便随时执行被添加的新Task任务。方法shutdownNow()的作用是中断所有的任务Task,并且抛出>InterruptedException异常,前提是在Runnable中使用if (Thread.currentThread().isInterrupted() == true)语句来判断当前线程的中断状态,而未执行的线程不再执行,也就是从执行队>列中清除。如果没有if(Thread.currentThread().isInterrupted() == true)语句及抛出异常的代码,则池中正在运行的线程直到执行完毕,而未执行的线程不再执行,也从执行队列中清除。

    • 方法isShutdown()的作用是判断线程池是否已经关闭。

    • 方法set/getRejectedExecutionHandler()

    public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    		System.out.println(((MyRunnable1) r).getUsername() + " 被拒绝执行");
    	}
    }
    
    public static void main(String[] args) throws InterruptedException {
    		MyRunnable1 myRunnable1 = new MyRunnable1("中国1");
    		MyRunnable1 myRunnable2 = new MyRunnable1("中国2");
    		MyRunnable1 myRunnable3 = new MyRunnable1("中国3");
    		MyRunnable1 myRunnable4 = new MyRunnable1("中国4");
    
    		ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 9999L,
    				TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    		pool.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
    		pool.execute(myRunnable1);
    		pool.execute(myRunnable2);
    		pool.execute(myRunnable3);
    		pool.execute(myRunnable4);
    	}
    
    

    线程池中的资源全部被占用的时候,对新添加的Task任务有不同的处理策略,
    在默认的情况下,ThreadPoolExecutor类中有4种不同的处理方式:
    ❑ AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出RejectedExecutionException异常。
    ❑ CallerRunsPolicy:当任务添加到线程池中被拒绝时,会使用调用线程池的Thread线程对象处理被拒绝的任务。
    ❑ DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
    ❑ DiscardPolicy:当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

    • 方法afterExecute()和beforeExecute():在线程池ThreadPoolExecutor类中重写这两个方法可以对线程池中执行的线程对象实现监控
    • 方法getActiveCount()的作用是取得有多少个线程正在执行任务
    • 方法getCompletedTaskCount ()的作用是取得有多少个线程已经执行完任务了
    • 方法getMaximumPoolSize ()的作用是取得构造方法传入的maximumPoolSize参数值
    • 方法getPoolSize ()的作用是取得池中有多少个线程
    • 方法getTaskCount ()的作用是取得有多少个任务发送给了线程池

    第五章 Future和Callable的使用

    接口Callable与线程功能密不可分,但和Runnable的主要区别为:

    1. Callable接口的call()方法可以有返回值,而Runnable接口的run()方法没有返回值。
    2. Callable接口的call()方法可以声明抛出异常,而Runnable接口的run()方法不可以声明抛出异常。
    3. 执行完Callable接口中的任务后,返回值是通过Future接口进行获得的。
    • 方法submit()不仅可以传入Callable对象,也可以传入Runnable对象,说明submit()方法支持有返回值和无返回值的功能。
    • 方法get(long timeout, TimeUnit unit)的作用是在指定的最大时间内等待获得返回值,如果超时会抛出异常
    • 接口RejectedExecutionHandler的主要作用是当线程池关闭后依然有任务要执行时,可以实现一些处理。
    • execute和submit区别: 方法execute()没有返回值,而submit()方法可以有返回值。方法execute()在默认的情况下异常直接抛出,不能捕获,但可以通过自定义Thread-Factory的方式进行捕获,而submit()方法在默认的情况下,可以catch Execution-Exception捕获异常。

    Callable接口与Runnable接口在对比时主要的优点是,Callable接口可以通过Future取得返回值。但需要注意的是,Future接口调用get()方法取得处理的结果值时是阻塞性的,也就是如果调用Future对象的get()方法时,任务尚未执行完成,则调用get()方法时一直阻塞到此任务完成时为止。如果是这样的效果,则前面先执行的任务一旦耗时很多,则后面的任务调用get()方法就呈阻塞状态,也就是排队进行等待,大大影响运行效率。也就是主线程并不能保证首先获得的是最先完成任务的返回值,这就是Future的缺点,影响效率

  • 相关阅读:
    MySQL数据库的优化
    PHP中获取文件扩展名
    PHP实现几种经典算法详解
    Linux服务器上crontab定时执行脚本文件
    LeetCode每日一题(五):加一
    巧妙利用枚举找出数组元素所在区间
    PHP实现几种经典算法详解
    _initialize() 区别 __construct()
    LeetCode每日一题(四):搜索插入位置
    LeetCode每日一题(三):移除元素
  • 原文地址:https://www.cnblogs.com/Baronboy/p/14056735.html
Copyright © 2011-2022 走看看