package com.bjsxt.height.concurrent019; import java.io.IOException; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UseCyclicBarrier { static class Runner implements Runnable { private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(5)); System.out.println(name + " 准备OK."); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " Go!!"); } } public static void main(String[] args) throws IOException, InterruptedException { CyclicBarrier barrier = new CyclicBarrier(3); // 3 ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Thread(new Runner(barrier, "zhangsan"))); executor.submit(new Thread(new Runner(barrier, "lisi"))); executor.submit(new Thread(new Runner(barrier, "wangwu"))); executor.shutdown(); } }
结果:
分析:只有3个线程都await(),程序才会继续向下运行。
package com.bjsxt.height.concurrent019; import java.util.concurrent.CountDownLatch; public class UseCountDownLatch { public static void main(String[] args) { final CountDownLatch countDown = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("进入线程t1" + "等待其他线程处理完成..."); countDown.await(); System.out.println("t1线程继续执行..."); } catch (InterruptedException e) { e.printStackTrace(); } } },"t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("t2线程进行初始化操作..."); Thread.sleep(3000); System.out.println("t2线程初始化完毕,通知t1线程继续..."); countDown.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("t3线程进行初始化操作..."); Thread.sleep(4000); System.out.println("t3线程初始化完毕,通知t1线程继续..."); countDown.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); t3.start(); } }
结果:
分析:每次调用countDown(),数值减1,减到0,程序继续运行。上面new CountDownLatch(2)初始化数值为2.
package com.bjsxt.height.concurrent019; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; public class UseFuture implements Callable<String>{ private String para; public UseFuture(String para){ this.para = para; } /** * 这里是真实的业务逻辑,其执行可能很慢 */ @Override public String call() throws Exception { //模拟执行耗时 Thread.sleep(3000); String result = this.para + "处理完成"; return result; } //主控制函数 public static void main(String[] args) throws Exception { String queryStr = "query"; //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类 FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr)); FutureTask<String> future1 = new FutureTask<String>(new UseFuture(queryStr)); //创建一个固定线程的线程池且线程数为1, ExecutorService executor = Executors.newFixedThreadPool(2); //这里提交任务future,则开启线程执行RealData的call()方法执行 Future f = executor.submit(future); executor.submit(future1); System.out.println("请求完毕"); try { //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑 System.out.println("模拟处理实际业务逻辑..."); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待 System.out.println("数据:" + future.get()); System.out.println("数据:" + future1.get()); System.out.println("--------------------------"); executor.shutdown(); } }
结果:
分析:future.get()会将主线程阻塞,等待线程处理得到结果后,主线程才会继续执行。
package com.bjsxt.height.concurrent019; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class UseSemaphore { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5个线程同时访问 final Semaphore semp = new Semaphore(5); // 模拟20个客户端访问 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 获取许可 semp.acquire(); System.out.println("Accessing: " + NO); //模拟实际业务逻辑 Thread.sleep((long) (Math.random() * 10000)); // 访问完后,释放 semp.release(); } catch (InterruptedException e) { } } }; exec.execute(run); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //System.out.println(semp.getQueueLength()); // 退出线程池 exec.shutdown(); } }
结果:
分析:这就是JAVA层面的限流的实现,每次限制只能有5个线程同时运行。
package com.bjsxt.height.lock020; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class UseReentrantLock { private Lock lock = new ReentrantLock(); public void method1(){ try { lock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1.."); Thread.sleep(1000); System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1.."); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void method2(){ try { lock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2.."); Thread.sleep(2000); System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2.."); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { final UseReentrantLock ur = new UseReentrantLock(); Thread t1 = new Thread(new Runnable() { @Override public void run() { ur.method1(); ur.method2(); } }, "t1"); t1.start(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //System.out.println(ur.lock.getQueueLength()); } }
结果:
分析:重入锁基本与synchronized相似,可以代替之。
锁同样引入了Condition来实现类似wait与notify的功能
package com.bjsxt.height.lock020; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class UseCondition { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void method1(){ try { lock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态.."); Thread.sleep(3000); System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁.."); condition.await(); // Object wait System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行..."); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void method2(){ try { lock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入.."); Thread.sleep(3000); System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒.."); condition.signal(); //Object notify } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { final UseCondition uc = new UseCondition(); Thread t1 = new Thread(new Runnable() { @Override public void run() { uc.method1(); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { uc.method2(); } }, "t2"); t1.start(); t2.start(); } }
结果:
package com.bjsxt.height.lock020; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class UseManyCondition { private ReentrantLock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); public void m1(){ try { lock.lock(); System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待.."); c1.await(); System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续.."); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void m2(){ try { lock.lock(); System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待.."); c1.await(); System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续.."); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void m3(){ try { lock.lock(); System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待.."); c2.await(); System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续.."); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void m4(){ try { lock.lock(); System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒.."); c1.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void m5(){ try { lock.lock(); System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒.."); c2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { final UseManyCondition umc = new UseManyCondition(); Thread t1 = new Thread(new Runnable() { @Override public void run() { umc.m1(); } },"t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { umc.m2(); } },"t2"); Thread t3 = new Thread(new Runnable() { @Override public void run() { umc.m3(); } },"t3"); Thread t4 = new Thread(new Runnable() { @Override public void run() { umc.m4(); } },"t4"); Thread t5 = new Thread(new Runnable() { @Override public void run() { umc.m5(); } },"t5"); t1.start(); // c1 t2.start(); // c1 t3.start(); // c2 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } t4.start(); // c1 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } t5.start(); // c2 } }
结果:
package com.bjsxt.height.lock021; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; public class UseReentrantReadWriteLock { private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private ReadLock readLock = rwLock.readLock(); private WriteLock writeLock = rwLock.writeLock(); public void read(){ try { readLock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..."); Thread.sleep(3000); System.out.println("当前线程:" + Thread.currentThread().getName() + "退出..."); } catch (Exception e) { e.printStackTrace(); } finally { readLock.unlock(); } } public void write(){ try { writeLock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..."); Thread.sleep(3000); System.out.println("当前线程:" + Thread.currentThread().getName() + "退出..."); } catch (Exception e) { e.printStackTrace(); } finally { writeLock.unlock(); } } public static void main(String[] args) { final UseReentrantReadWriteLock urrw = new UseReentrantReadWriteLock(); Thread t1 = new Thread(new Runnable() { @Override public void run() { urrw.read(); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { urrw.read(); } }, "t2"); Thread t3 = new Thread(new Runnable() { @Override public void run() { urrw.write(); } }, "t3"); Thread t4 = new Thread(new Runnable() { @Override public void run() { urrw.write(); } }, "t4"); // t1.start(); // t2.start(); // t1.start(); // R // t3.start(); // W t3.start(); t4.start(); } }
结果:自行测试,各个线程start会有不同结果,实现了读写分离。