import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j public class CountDownLatchExample1 { private final static int threadCount=200; public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final CountDownLatch countDownLatch=new CountDownLatch(threadCount); for(int i=0;i<threadCount;i++){ final int threadNum=i; executorService.execute(()->{ try{ test(threadNum); }catch (Exception e){ log.error("exception",e); }finally { countDownLatch.countDown(); } }); } //countDownLatch.await(); //保证前面的线程都执行完 countDownLatch.await(10, TimeUnit.MILLISECONDS); //在规定时间内执行 log.info("finish"); } private static void test(int threadNum) throws Exception{ Thread.sleep(100); log.info("{}",threadNum); Thread.sleep(100); } }
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample1 { private final static int threadCount=20; public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(3); for(int i=0;i<threadCount;i++){ final int threadNum=i; executorService.execute(()->{ try{ if(semaphore.tryAcquire()){ //可以指定执行的时间 test(threadNum); semaphore.release(); } //semaphore.acquire(3);//获取多个许可 //semaphore.acquire();//获取一个许可 // test(threadNum); // semaphore.release();//释放一个许可 //semaphore.release(3);//释放多个许可 }catch (Exception e){ log.error("exception",e); } }); } log.info("finish"); } private static void test(int threadNum) throws Exception{ log.info("{}",threadNum); Thread.sleep(100); } }
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; @Slf4j public class CyclicBarrierExamle1 { //private static CyclicBarrier barrier=new CyclicBarrier(5); private static CyclicBarrier barrier=new CyclicBarrier(5,()->{ log.info("callback is running"); }); public static void main(String[] args) throws Exception{ ExecutorService executor= Executors.newCachedThreadPool(); for(int i=0;i<10;i++){ final int threadNum=i; Thread.sleep(1000); executor.execute(()->{ try { race(threadNum); }catch (Exception e){ log.error("exception",e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception{ Thread.sleep(100); log.info("{} is ready",threadNum); //barrier.await(); try { barrier.await(2000, TimeUnit.MILLISECONDS); }catch (BrokenBarrierException |TimeoutException e){ log.warn("BrokenBarrierException",e); } log.info("{}continue",threadNum); } }
在读取中写入的方法:ReentrantReadWriteLock
线程安全的写法。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class LockExample1 { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static int count=0; private final static Lock lock=new ReentrantLock(); public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i<clientTotal;i++){ final int count=i; executorService.execute(()->{ try { semaphore.acquire(); add(count); semaphore.release(); }catch (Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size{}",count); } private static void add(int i){ lock.lock(); try { count++; }finally { lock.unlock(); } } }
import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @Slf4j public class LockExample2 { private final Map<String, Data> map=new TreeMap<>(); private final ReentrantReadWriteLock lock=new ReentrantReadWriteLock(); private final Lock readLock=lock.readLock(); private final Lock writeLock=lock.writeLock(); public Data get(String key){ readLock.lock(); try { return map.get(key); }finally { readLock.unlock(); } } public Set<String> getAllKeys(){ readLock.lock(); try { return map.keySet(); }finally { readLock.unlock(); } } public Data put(String key, Data value){ writeLock.lock(); try { return map.put(key, value); }finally { readLock.unlock(); } } class Data{ } }
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.StampedLock; @Slf4j public class LockExample5 { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static int count=0; private final static StampedLock lock=new StampedLock(); public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i<clientTotal;i++){ final int count=i; executorService.execute(()->{ try { semaphore.acquire(); add(count); semaphore.release(); }catch (Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size{}",count); } private static void add(int i){ long stamp= lock.writeLock(); try { count++; }finally { lock.unlock(stamp); } } }
import java.util.concurrent.locks.StampedLock; public class LockExample4 { class Point{ private double x,y; private final StampedLock sl=new StampedLock(); void move(double deltaX,double deltaY){ long stamp=sl.writeLock(); try{ x+=deltaX; y+=deltaY; }finally { sl.unlockWrite(stamp); } } //乐观锁案例 double distanceFromOrigin(){ long stamp=sl.tryOptimisticRead();//获得一个乐观读锁 double currentX=x,currentY=y;//将两个字段读入本地局部变量 if(!sl.validate(stamp)){ //检查发出乐观锁后同时是否有其他锁发生 stamp=sl.readLock();//没有,再次获取一个读悲观锁 try{ currentX=x; //将两个字段读入本地局部变量 currentY=y; //将两个字段读入本地局部变量 }finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX*currentX+currentY*currentY); } //悲观锁读写案例 void moveIfAtOrigin(double newX,double newY){ long stamp=sl.readLock(); try { while (x==0.0&&y==0.0){ //循环,检查当前状态是否符合 long ws=sl.tryConvertToWriteLock(stamp);//将读锁转化为写锁 if(ws!=0L){ //确认转为写锁是否成功 stamp=ws; //如果成功,替换票据 x=newX; //进行状态改变 y=newY; //进行状态改变 break; }else { sl.unlockRead(stamp);//如果不能成功转化为写锁 stamp=sl.writeLock();//显示直接进行写锁,然后通过循环再试 } } }finally { sl.unlock(stamp);//释放读锁和写锁 } } } }
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class LockExample6 { public static void main(String[] args) { ReentrantLock reentrantLock=new ReentrantLock(); Condition condition=reentrantLock.newCondition(); new Thread(()->{ try { reentrantLock.lock(); log.info("wait signal"); condition.await(); }catch (InterruptedException e){ e.printStackTrace(); } log.info("get signal"); reentrantLock.unlock(); }).start(); new Thread(()->{ reentrantLock.lock(); log.info("get lock"); try { Thread.sleep(300); }catch (InterruptedException e){ e.printStackTrace(); } condition.signalAll(); log.info("send signal~"); reentrantLock.unlock(); }).start(); } }