1.Exchanger
功能:用于线程间数据的交换
应用场景:1)遗传算法,目前还不是特别理解 2)校对工作,假设A,B线程做同一件任务,可以通过数据校验判断两线程是否正确的工作
例子:是一个简单的校对工作例子
public class TestExchanger { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); ExecutorService es = Executors.newFixedThreadPool(2); //拥有两个线程的线程池 es.execute(new Runnable() { @Override public void run() { String A = "银行流水A"; try { exchanger.exchange(A); } catch (InterruptedException e) { e.printStackTrace(); } } }); es.execute(new Runnable() { @Override public void run() { String B = "银行流水B"; try { String A = exchanger.exchange(B); System.out.println("A和B数据是否一致: " + A.equals(B) + ",A: " + A + ",B: " + B); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
Exchanger类中最重要的一个方法就是exchange(),该方法用于交换信息,并且接受来自另外一个线程的数据,exchange()方法里面还可以加参数。exchange(V x,long timeout,TimeUnit unit)设置一个最大等待时间避免一直等待
注意:信息交换是在同步点(Exchager提供一个同步点)进行交换,而不是两个线程调用了exchange()方法就进行交换。
底层实现:Lock+Condition,暂时还没深入,学习了进行补充
2.Semaphore
功能:控制同时访问特定资源的线程数量
应用场景:流量控制,比如数据库的连接
例子:
public class TestSemaphore { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); //一次只能5个线程同时访问 final Semaphore semp = new Semaphore(5); // 模拟20个客户端访问 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 获取许可 semp.acquire(); System.out.println("Accessing: " + NO); Thread.sleep((long) (Math.random() * 10000)); // 访问完后,释放 semp.release(); System.out.println("-----------------"+semp.availablePermits()); } catch (InterruptedException e) { e.printStackTrace(); } } }; exec.execute(run); } // 退出线程池 exec.shutdown(); } }
同时有五个线程可以执行,获取资源后打印语句后随机睡眠,最后释放资源,semp.availablePermits(),可以获得的许可数量,释放一个后将有一个没有被分发的许可证,当有多的许可证时,会采取先到先得的方式分配许可
Semaphore有两个构造函数,Semaphore(int)和Semaphore(int,boolean)。参数中的int表示该信号量拥有的许可数量,boolean表示获取许可的时候是否是公平的,如果是公平的那么,当有多个线程要获取许可时,会按照线程来的先后顺序分配许可,否则,线程获得许可的顺序是不定的
3.CyclicBarrier
功能:控制同时访问特定资源的线程数量
应用场景:希望创建一组任务,并行执行,在进行下一个步骤之前等待,直到所有任务完成
例子:赛马比赛。只有当所有马一起到达栅栏时,才能继续跑下一个栅栏
Horse类:
public class Horse implements Runnable { private static int counter = 0; private final int id = counter++; private int strides = 0; private static Random rand = new Random(47); private static CyclicBarrier barrier; public Horse(CyclicBarrier b) { barrier = b; } public synchronized int getStrides() { return strides; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { strides += rand.nextInt(3); // Produces 0, 1 or 2 } barrier.await(); //在栅栏处等待 } } catch(InterruptedException e) { // A legitimate way to exit } catch(BrokenBarrierException e) { // This one we want to know about throw new RuntimeException(e); } } public String toString() { return "Horse " + id + " "; } public String tracks() { StringBuilder s = new StringBuilder(); for(int i = 0; i < getStrides(); i++) s.append("*"); s.append(id); return s.toString(); } }
HorseRace类:
public class HorseRace { static final int FINISH_LINE = 75; private List<Horse> horses = new ArrayList<Horse>(); private ExecutorService exec = Executors.newCachedThreadPool(); private CyclicBarrier barrier; public HorseRace(int nHorses, final int pause) { barrier = new CyclicBarrier(nHorses, new Runnable() { public void run() { System.out.println("执行runnable方法的线程: " + Thread.currentThread().getName()); StringBuilder s = new StringBuilder(); for(int i = 0; i < FINISH_LINE; i++) s.append("="); // The fence on the racetrack System.out.println(s); for(Horse horse : horses) System.out.println(horse.tracks()); for(Horse horse : horses) if(horse.getStrides() >= FINISH_LINE) { System.out.println(horse + "won!"); exec.shutdownNow(); return; } try { TimeUnit.MILLISECONDS.sleep(pause); } catch(InterruptedException e) { System.out.println("barrier-action sleep interrupted"); } } }); for(int i = 0; i < nHorses; i++) { Horse horse = new Horse(barrier); horses.add(horse); exec.execute(horse); } } public static void main(String[] args) { int nHorses = 7; int pause = 200; new HorseRace(nHorses, pause); } }
CyclicBarrier有两个构造函数:
public CyclicBarrier(int parties, Runnable barrierAction) {} public CyclicBarrier(int parties) {}
parties代表一次要并行执行的任务,barrierAction当这些线程都到达barries状态时要执行的任务,会选择一个线程去启动这个任务
重载方法await()
public int await() throws InterruptedException, BrokenBarrierException { } public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { }
第一个方法用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务。第二个方法是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务
4.CountDownLatch
功能:允许一个或多个线程等待其他线程完成操作
应用场景:
例子:
public class TestCountDown { private static CountDownLatch c = new CountDownLatch(2); //等待线程的执行数量为2 public static void main(String[] args) throws InterruptedException { new Thread( new Runnable() { @Override public void run() { System.out.println(1); c.countDown(); } } ).start(); new Thread(new Runnable() { @Override public void run() { System.out.println(2); c.countDown(); } }).start(); c.await(); //阻塞当前线程,即main线程等待其他线程完成任务以后才能执行 System.out.println(3); } }