CPU多级缓存:
read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放到工作内存中的变量副本中
write(写入):作用于主内存的变量,他把store操作从工作内存中一个变量的值,传送到主内存的变量中
不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到内存中
如果对一个变脸执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始变量的值。
可以阻塞线程,并保证线程在满足特定的条件下继续执行,线程执行完成之后,在进行其他的处理
阻塞进程,并同一时间控制请求的并发量,控制同时的并发数
添加几个注解:
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.Target; import java.lang.annotation.RetentionPolicy; //用来标记【不推荐】的类或者写法 @Target(ElementType.TYPE) @Retention(RetentionPolicy.SOURCE) public @interface NotRecommend { String value() default ""; }
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.Target; import java.lang.annotation.RetentionPolicy; //用来标记【线程安全】的类或者写法 @Target(ElementType.TYPE) @Retention(RetentionPolicy.SOURCE) public @interface NotThreadSafe { String value() default ""; }
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.Target; import java.lang.annotation.RetentionPolicy; //用来标记【推荐】的类或者写法 @Target(ElementType.TYPE) @Retention(RetentionPolicy.SOURCE) public @interface Recommend { }
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.Target; import java.lang.annotation.RetentionPolicy; //用来标记【线程安全】的类或者写法 @Target(ElementType.TYPE) @Retention(RetentionPolicy.SOURCE) public @interface ThreadSafe { String value() default ""; }
线程不安全:
import com.example.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j @NotThreadSafe public class ConcurrencyTest { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static int count=0; public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i<clientTotal;i++){ executorService.execute(()->{ try{ semaphore.acquire(); //是否允许被执行 add(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}",count); } private static void add(){ count++; } }
线程安全性:
import com.example.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @ThreadSafe public class CountExample2 { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static AtomicInteger count= new AtomicInteger(0); public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i<clientTotal;i++){ executorService.execute(()->{ try{ semaphore.acquire(); //是否允许被执行 add(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}",count.get()); } private static void add(){ count.incrementAndGet(); } }
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import com.example.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class AtomicExample2 { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static AtomicLong count= new AtomicLong(0); public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i<clientTotal;i++){ executorService.execute(()->{ try{ semaphore.acquire(); //是否允许被执行 add(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}",count.get()); } private static void add(){ count.incrementAndGet(); } }
import com.example.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.LongAdder; @Slf4j @ThreadSafe public class AtomicExample6 { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static LongAdder count= new LongAdder(); public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i<clientTotal;i++){ executorService.execute(()->{ try{ semaphore.acquire(); //是否允许被执行 add(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}",count); } private static void add(){ count.increment(); } }
import com.example.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicReference; @Slf4j @ThreadSafe public class AtomicExample7 { private static AtomicReference<Integer> count=new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0,2); count.compareAndSet(0,1); count.compareAndSet(1,3); count.compareAndSet(2,4); count.compareAndSet(3,5); log.info("count:{}",count.get()); } }
import com.example.annoations.ThreadSafe; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @Slf4j @ThreadSafe public class AtomicExample8 { //更新指定的类的某个字段的值,字段用volatile 非static 描述的字段 private static AtomicIntegerFieldUpdater<AtomicExample8> updater= AtomicIntegerFieldUpdater.newUpdater(AtomicExample8.class,"count"); @Getter public volatile int count=100; public static void main(String[] args) { AtomicExample8 example8=new AtomicExample8(); if(updater.compareAndSet(example8,100,200)){ log.info("update success 1,{}",example8.getCount()); } if(updater.compareAndSet(example8,100,200)){ log.info("update success 2,{}",example8.getCount()); } else{ log.info("update failed,{}",example8.getCount()); } } }
import com.example.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @ThreadSafe public class AtomicExample9 { private static AtomicBoolean isHappened=new AtomicBoolean(false);//默认是是否发生 //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i<clientTotal;i++){ executorService.execute(()->{ try{ semaphore.acquire(); //是否允许被执行 test(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("isHappened:{}",isHappened.get()); } private static void test(){ if(isHappened.compareAndSet(false,true)); log.info("execute"); }
同步锁。
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class SynchronizedExample1 { //修饰一个代码块,不同调用对象之间是互相不影响的, public void test1(int j){ synchronized (this){ for (int i=0;i<10;i++){ log.info("test1{}-{}",j,i); } } } //修饰一个方法 不同调用对象之间是互相不影响的,不属于方法声明的一部分 public synchronized void test2(int j){ for (int i=0;i<10;i++){ log.info("test2{}-{}",j,i); } } public static void main(String[] args) { SynchronizedExample1 example1=new SynchronizedExample1(); SynchronizedExample1 example2=new SynchronizedExample1(); ExecutorService excutorService= Executors.newCachedThreadPool(); excutorService.execute(()->{ example1.test2(1); }); excutorService.execute(()->{ example2.test2(1); }); } }
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class SynchronizedExample2 { //修饰一个类 public void test1(int j){ synchronized (this){ for (int i=0;i<10;i++){ log.info("test1{}-{}",j,i); } } } //修改一个静态方法 public static synchronized void test2(int j){ for (int i=0;i<10;i++){ log.info("test2{}-{}",j,i); } } public static void main(String[] args) { SynchronizedExample1 example1=new SynchronizedExample1(); SynchronizedExample1 example2=new SynchronizedExample1(); ExecutorService excutorService= Executors.newCachedThreadPool(); excutorService.execute(()->{ example1.test2(1); }); excutorService.execute(()->{ example2.test2(1); }); } }
原子性对比:
无法保证线程安全:(不具有原子性)
适用场景:作为状态标识量
线程安全性总结: