zoukankan      html  css  js  c++  java
  • Java并发编程 (六) 线程安全策略

    个人博客网:https://wushaopei.github.io/    (你想要这里多有)

    一、不可变对象-1

       有一种安全的发布对象,即不可变对象。

    1、不可变对象需要满足的条件

            ① 对象创建以后其状态就不能修改

            ② 对象所有域都是final类型

            ③ 对象是正确创建的(在对象创建期间,this引用没有逸出)

    2、final关键字:

    final 关键字可以用来修饰:类、方法、变量

    修饰类:不能被继承,final类中的成员方法都会被隐式的指定为final方法

    修饰方法:1、锁定方法不被继承类修改;2、效率

        注意:一个类的private方法会被隐式的指定为final方法

    修饰变量:基本数据类型变量、引用类型变量

    被final修饰的数值不能再被修改;被final修饰的引用类型不能再指向另一个对象。

    重点:fianl修饰数据类型变量和引用类型变量的区别

    3、fianl的使用代码演示:

    @Slf4j
    @NoThreadSafe
    public class immutableExample1 {
    
        private final static Integer a = 1;
        private final static String b = "2";
        private final static Map<Integer,Integer> map = Maps.newHashMap();
    
        static {
            map.put(1,2);
            map.put(3,4);
            map.put(5,6);
        }
    
        public static void main(String[] args) {
    //        a = 2;
    //        b = "3";
    //        map = Maps.newHashMap();
            map.put(1,3);
            log.info("{}",map.get(1));
        }
        
        private void test(final int a ){
    //        a = 1;
        }    
    }

    执行结果:

    17:13:54.256 [main] INFO com.mmall.concurrency.example.immutable.immutableExample1 - 3

    分析:由图中可知,被final修饰的变量无法被重新赋值,被final修饰的map引用类型变量也不能指向新的内存引用。

    又由代码执行结果可知,被final修饰的引用数据类型如map的值是可以改变的。

    4、常见不可变对象

      Collections.unmodifiableXXX : Collection 、List、Set、Map ..,

      Guava : ImmutableXXX : Collection 、List、Set、Map....

    注意:

    使用Collections的unmodifiableXXX生成的引用变量就不能再被修改了;

    使用Guava的ImmutableXXX生成的引用变量就不能再被修改了;

    5、  Collections.unmodifiableXXX :代码演示:

    @Slf4j
    @NoThreadSafe
    public class immutableExample2 {
    
        private static Map<Integer,Integer> map = Maps.newHashMap();
    
        static {
            map.put(1,2);
            map.put(3,4);
            map.put(5,6);
            map = Collections.unmodifiableMap(map);
        }
    
        public static void main(String[] args) {
            map.put(1,3);
            log.info("{}",map.get(1));
        }
    }
    

    执行结果:

    Exception in thread "main" java.lang.UnsupportedOperationException
    	at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
    	at com.mmall.concurrency.example.immutable.immutableExample2.main(immutableExample2.java:31)
    
    Process finished with exit code 1

    由执行结果可知,被Collections.unmodifiableMap()修改过的map不能再被重新赋值,虽然声明时没有报错,但是编译运行时却抛出了异常。

    二、不可变对象-2

    1、从源码对原因进行分析:

    unmodifiableMap调用了UnmodifiableMap方法;

    public static <K,V> Map<K,V> unmodifiableMap(Map<? extends K, ? extends V> m) {
        return new UnmodifiableMap<>(m);
    }

    UnmodifiableMap方法中,进行了序列化已经使用final对传入参数m进行修饰:

    private static final long serialVersionUID = -1034234728574286014L;
    
    private final Map<? extends K, ? extends V> m;

    相当于將原本的map使用另一个map进行替代,并将所有的更新方法在操作时进行异常的抛出,相关源码如下:

    @Override
    public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
        throw new UnsupportedOperationException();
    }
    
    @Override
    public V putIfAbsent(K key, V value) {
        throw new UnsupportedOperationException();
    }
    
    @Override
    public boolean remove(Object key, Object value) {
        throw new UnsupportedOperationException();
    }
    
    @Override
    public boolean replace(K key, V oldValue, V newValue) {
        throw new UnsupportedOperationException();
    }
    
    @Override
    public V replace(K key, V value) {
        throw new UnsupportedOperationException();
    }
    .............

    当第一次声明值时,会对引用变量的长度和数据进行副本备份,如果有第二次修改时,会进行校验,发现传入参数和底层取出的值不同时,抛出异常。

    2、ImmutableXXX代码演示:

    @Slf4j
    @ThreadSafe
    public class immutableExample3 {
    
        private final static ImmutableList list = ImmutableList.of(1,2,3);
    
        private final static ImmutableSet set = ImmutableSet.copyOf(list);
    
        private final static ImmutableMap<Integer,Integer> map = ImmutableMap.of(1,2,3,4);
    
        private final static ImmutableMap<Integer,Integer> map2 = ImmutableMap.<Integer,Integer>builder().put(1,2).put(3,4).put(5,6).build();
    
        public static void main(String[] args) {
    //        list.add(4);
    
    //        map.put(1,4);
            System.out.println(map.get(3));
        }
    }

    ImmutableList 重新赋值测试结果:

    Exception in thread "main" java.lang.UnsupportedOperationException
    	at com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:221)
    	at com.mmall.concurrency.example.immutable.immutableExample3.main(immutableExample3.java:33)
    
    Process finished with exit code 1

    ImmutableMap重新赋值测试结果:

    Exception in thread "main" java.lang.UnsupportedOperationException
    	at com.google.common.collect.ImmutableMap.put(ImmutableMap.java:495)
    	at com.mmall.concurrency.example.immutable.immutableExample3.main(immutableExample3.java:35)
    
    Process finished with exit code 1

    查询ImmutableMap的value值:

    4
    
    Process finished with exit code 0

    由结果可知,ImmutableXXX声明的对象为不可变对象,是线程安全的,同时不影响值的获取。

     

    三、线程封闭-1

    避免并发,除了设置不可变对象,还有线程封闭。

    1、什么是线程封闭

    所谓线程封闭,就是把对象锁定到一个线程里,只有这个线程可以看到对象,那么,这个对象就算不是线程安全的,也不会出现线程安全的问题了。因为它只能在一个线程内访问。

    2、线程封闭共有三种:

    第一种线程封闭:

       Ad-hoc 线程封闭 : 程序控制实现,最糟糕,忽略

    第二种线程封闭:

      堆栈封闭:局部变量,无并发问题

    多个线程访问一个方法的时候,局部变量都会被拷贝一份到线程的栈中,所以局部变量是不会被多个线程所共享的,因此也就不会出现并发问题。

    全局的变量容易出现并发问题。

    在一个方法内定义局部变量来完成各种操作,就是属于堆栈封闭的范畴。

    第三种线程封闭:

     ThreadLocal线程封闭:特别好的封闭方法

    原因:

    ThreadLocal内部维护了一个mapmapkey是每一个线程的名称,而map的值就是我们要封闭的对象,每个map中的对象都对应了一个线程中的值,也就是ThreadLocal利用map实现了线程封闭。

    3、ThreadLocal线程封闭——代码演示:

    package com.mmall.concurrency.example.threadLocal;
    
    import com.mmall.concurrency.annoations.ThreadSafe;
    
    /**
     * @ClassName RequestHolder
     * @Description TODO
     * @Author wushaopei
     * @Date 2019/11/1 11:12
     * @Version 1.0
     */
    public class RequestHolder {
    
        private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>();
    
        //在接口未实际处理之前,在filter中将值添加到ThreadLocal中,等到url被调用处理时,再从ThreadLocal中取出相应的值
        public static void add(Long id){
            requestHolder.set(id);
        }
    
        public static Long getId(){
            return requestHolder.get();
        }
    
        //在接口真正处理完之后进行处理
        public static void remove(){
            requestHolder.remove();
        }
    
    
    }

    Fitler :

    package com.mmall.concurrency;
    
    
    import com.mmall.concurrency.example.threadLocal.RequestHolder;
    
    import javax.servlet.*;
    import javax.servlet.http.HttpServletRequest;
    import java.io.IOException;
    
    /**
     * @ClassName HttpFilter
     * @Description TODO
     * @Author wushaopei
     * @Date 2019/11/1 11:18
     * @Version 1.0
     */
    public class HttpFilter implements Filter {
    
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
    
        }
    
        @Override
        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
            HttpServletRequest request = (HttpServletRequest) servletRequest;
            RequestHolder.add(Thread.currentThread().getId());
            filterChain.doFilter(servletRequest,servletResponse);
        }
    
        @Override
        public void destroy() {
    
        }
    }
    

    四、线程封闭-2

    1、HttpFilter.java

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest) servletRequest;
        log.info("do filter,{} - {}",Thread.currentThread().getId(),request.getServletPath());
        RequestHolder.add(Thread.currentThread().getId());
        filterChain.doFilter(servletRequest,servletResponse);
    }

    2、配置Filter,将HttpFilter添加到容器

    @SpringBootApplication
    public class ConcurrencyApplication {
    
       public static void main(String[] args) {
          SpringApplication.run(ConcurrencyApplication.class, args);
       }
    
       @Bean
       public FilterRegistrationBean httpFilter(){
          FilterRegistrationBean<Filter> registrationBean = new FilterRegistrationBean<>();
          registrationBean.setFilter(new HttpFilter());
          registrationBean.addUrlPatterns("/threadLocal/*");
          return registrationBean;
       }
    }

    3、Handler适配器实现接口实现前后的拦截、过滤操作

    package com.mmall.concurrency;
    
    import com.mmall.concurrency.example.threadLocal.RequestHolder;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.lang.Nullable;
    import org.springframework.web.servlet.ModelAndView;
    import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    /**
     * @ClassName HttpInterceptor
     * @Description TODO
     * @Author wushaopei
     * @Date 2019/11/1 11:25
     * @Version 1.0
     */
    @Slf4j
    public class HttpInterceptor extends HandlerInterceptorAdapter {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            log.info("preHandle");
            return true;
        }
    
        //接口执行完成后删除ThreadLocal线程变量
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception {
            RequestHolder.remove();
            log.info("afterCompletion");
            return ;
    
        }
    
    }

    4、在启动器中将HttpInterceptor 的Bean配置到容器中:

    @SpringBootApplication
    public class ConcurrencyApplication extends WebMvcConfigurerAdapter{
    
    
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
       registry.addInterceptor(new HttpInterceptor()).addPathPatterns("/**");
    }
    
    创建ThreadLocal测试接口:
    
    @Controller
    @RequestMapping("/threadLocal")
    public class ThreadLocalController {
    
        @RequestMapping("/test")
        @ResponseBody
        public Long test(){
            return RequestHolder.getId();
        }
    }

    启动并执行测试接口:

    2019-11-01 12:19:08.757  INFO 23868 --- [p-nio-80-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
    2019-11-01 12:19:08.763  INFO 23868 --- [p-nio-80-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 6 ms
    2019-11-01 12:19:08.769  INFO 23868 --- [p-nio-80-exec-2] com.mmall.concurrency.HttpFilter         : do filter,30 , /threadLocal/test
    2019-11-01 12:19:08.779  INFO 23868 --- [p-nio-80-exec-2] com.mmall.concurrency.HttpInterceptor    : preHandle
    2019-11-01 12:19:08.852  INFO 23868 --- [p-nio-80-exec-2] com.mmall.concurrency.HttpInterceptor    : afterCompletio

    由执行结果可知,使用ThreadLocal实现了线程封闭,以为着ThreadLocal是线程安全的。

    5、分析:

    实现流程:

    当请求进来的时候,通过Filter将线程ID存储到了ThreadLocal里面,当接口被处理调用的时候,就可以从ThreadLocal里去取出线程ID,当接口处理完后,再通过HttpInterception适配器中的afterCompletion方法将线程ID给移除掉。

    分析:

    这里在使用ThreadLocal的时候,定义了三个方法,分别是从ThreaadLocal里面放数据、移除数据、获取数据,放数据一般是通过Filter来放数据,先拦截住接口,在拦截器里面把数据放进去,数据处理完之后在Interceptor里面将数据移除出去,避免内存泄露。

    扩展:

    线程封闭技术的常见应用:

    数据库连接对应JDBC的Connection对象,Connection对象在实现时并没有对线程安全做太多的处理,在相应的JDBC规范里也没有要求Connection对象一定是线程安全的,实际上在服务器应用程序中线程从连接池获取了一个Connection对象,使用完之后再将对象返回给连接池,由于大多数请求都是采用单线程同步的方式处理的,在Connection对象返回之前,连接池不会将它分配给其他线程,因此这种管理模式在请求时隐含的将对象封闭在线程里面。我们使用Connnection对象虽然本身不是线程安全的,但是通过线程封闭也做到了线程安全。

    五、线程不安全类与写法-1

    线程不安全的类:如果一个类的对象同时可以被多个线程访问,如果你不做特殊的同步处理。那么,它就容易表现出线程不安全的现象。

    如:抛出异常、逻辑处理错误等等。

    这种类就成为线程不安全的类。

    1、字符串拼接

      StringBuilder - > StringBuffer

    1) StringBuilder 线程安全代码演示:

    package com.mmall.concurrency.example.commonUnsafe;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    /**
     * @ClassName StringExample1
     * @Description TODO
     * @Author wushaopei
     * @Date 2019/11/1 12:45
     * @Version 1.0
     */
    @Slf4j
    @NoThreadSafe
    public class StringExample1 {
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static StringBuilder stringBuilder = new StringBuilder();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update();
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",stringBuilder.length());
        }
    
        private static void  update(){
            stringBuilder.append("1");
        }
    }

    如果线程安全的话,打印结果应该为5000!

    执行打印结果:

    12:48:15.598 [main] INFO com.mmall.concurrency.example.commonUnsafe.StringExample1 - size:4937
    
    Process finished with exit code 0

    由打印结果可知,size的值小于5000,意味着StringBuilder是线程不安全的类。

    2) StringBuffer 线程安全代码演示:

    public static StringBuffer stringBuffer = new StringBuffer();

    执行打印结果:

    12:51:06.859 [main] INFO com.mmall.concurrency.example.commonUnsafe.StringExample2 - size:5000
    
    Process finished with exit code 0

    由打印结果可知,size的值等于5000,意味着StringBuffer线程安全的类。

    3) StringBuffer和StringBuilder线程分析:

    由截图可知,StringBuffer的底层实现方法都添加synchronized同步锁,是线程安全的

    StringBuilder底层的方法没有添加synchronized同步锁,存在线程安全的问题。

    4)为什么java要同时提供StringBuilderStringBuffer两个类?

    之所以java同时提供StringBuilder和StringBuffer两个线程安全和不安全的类,是因为在StringBuffer中,使用synchronized锁机制会导致同时只有一个线程可以操作该对象,对性能和效率有损耗。StringBuffer只有在多线程并发且声明为成员变量时使用就可以保证线程的安全;而在业务层逻辑方法中声明StringBuilder局部变量时,由于存在堆栈封闭的关系,同一时间内只会有一个线程调用该类变量,所以不存在线程安全的问题。

    2、日期转换的类

    SimpleDateFormat - > JodaTimie

    1)SimpleDateFormat类线程安全测试:

    @Slf4j
    @NoThreadSafe
    public class DateFormatExample1 {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update();
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
        }
    
        private static void  update(){
            try {
                simpleDateFormat.parse("20191101");
            }catch (Exception e){
                log.error("parse exception",e);
            }
        }
    }

    执行结果:

    14:25:08.453 [pool-1-thread-162] ERROR com.mmall.concurrency.example.commonUnsafe.DateFormatExample1 - parse exception
    java.lang.NumberFormatException: multiple points
    	at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890)
    	at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
    	....................
    	at com.mmall.concurrency.example.commonUnsafe.DateFormatExample1.update(DateFormatExample1.java:51)
    	at com.mmall.concurrency.example.commonUnsafe.DateFormatExample1.lambda$main$0(DateFormatExample1.java:37)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    14:25:08.446 [pool-1-thread-74] ERROR com.mmall.concurrency.example.commonUnsafe.DateFormatExample1 - parse exception
    java.lang.NumberFormatException: For input string: ".20192019E"
    	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    	at java.lang.Long.parseLong(Long.java:578)
    	..........
    	at com.mmall.concurrency.example.commonUnsafe.DateFormatExample1.update(DateFormatExample1.java:51)
    	at com.mmall.concurrency.example.commonUnsafe.DateFormatExample1.lambda$main$0(DateFormatExample1.java:37)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	............

    由执行结果可知,SimpleDateFormat在进行并发执行时抛出了大量异常,这说明了SimpleDateFormat类的线程是不安全的。SimpleDateFormat声明的实例不能直接以成员变量的形式声明来被多线程使用。

     

    正确使用SimpleDateFormat类,应该以局部变量的方式声明该类的实例:

    代码如下:

    private static void  update(){
        try {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
            simpleDateFormat.parse("20191101");
        }catch (Exception e){
            log.error("parse exception",e);
        }
    }

    执行结果没有抛出异常:

    //空行
    Process finished with exit code 0

    注意:多线程并发使用SimpleDateFormat类时,一定要在方法中以局部变量的方式声明该类的实例。从而避免线程安全的问题。

    2)JodaTimie类线程安全测试:

    导入依赖:

    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
       <version>2.10.5</version>
    </dependency>

    代码段:

    //声明DateTimeFormatter实例
    private static DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd");
    //获取当前线程次序
    final int count = i;
    //传递线程次序
    update(count);
    
    //update方法中日志输出线程执行结果
    log.info("{}, {}",i,DateTime.parse("20191101",dateTimeFormatter).toDate());
    

    由执行结果可知,虽然DateTimeFormatter 实例结果是乱序输出的,但是执行线程总数是完全符合要求的,所以DateTimeFormatter的线程是安全的。

    六、线程不安全类与写法-2

     ArrayList , HashSet , HashMap 等 Collections

    1、ArrayList线程安全测试:

    public class ArrayListExample {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new ArrayList<Integer>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",list.size());
        }
    
        private static void  update(int i){
            list.add(i);
        }
    }

    执行并打印结果:

    15:02:53.644 [main] INFO com.mmall.concurrency.example.commonUnsafe.ArrayListExample - size:4892
    
    Process finished with exit code 0

    由结果可知,size的长度不为5000,说明了ArrayList的add操作在多线程并发环境下是线程不安全的。

    2、HashSet线程安全测试:

    private static Set<Integer> set = new HashSet<Integer>();
    
    //获取当前线程次序
    final int count = i;
    //传递线程次序
    update(count);
    
    //update方法中日志输出线程执行结果
    log.info("size:{}",set.size());
    
    private static void  update(int i){
        set.add(i);
    }

    执行并打印结果:

    15:08:23.554 [main] INFO com.mmall.concurrency.example.commonUnsafe.HashSetExample - size:4864
    
    Process finished with exit code 0

    由结果可知,size的长度不为5000,说明了HashSetadd操作在多线程并发环境下也是线程不安全的。

    3、HashMap线程安全测试:

    private static Map<Integer,Integer> map = new HashMap<Integer,Integer>();
    
    //获取当前线程次序
    final int count = i;
    //传递线程次序
    update(count);
    
    //update方法中日志输出线程执行结果
    log.info("size:{}",map.size());
    private static void  update(int i){
       map.put(i,i);
    }

    执行并打印结果:

    15:16:26.117 [main] INFO com.mmall.concurrency.example.commonUnsafe.HashMapExample - size:4870
    
    Process finished with exit code 0

    由结果可知,size的长度不为5000,说明了HashMapadd操作在多线程并发环境下也是线程不安全的。

    4、扩展

     先检查再执行: if(condition(a)) { handle(a) ; }

    七、同步容器-1

    线程安全的同步容器:

      ArrayList - > Vecotr , Stack

      HashMap - > HashTable (key 、value 不能为 null)

      Collections.synchronizedXXX(List、Set、Map)

    在多线程环境下,要使用ArrayList时,可以使用Vector、Stack替代
    
    在多线程环境下,要使用HashMap时,可以使用HashTable替代
    
    HashTable底层实现使用synchronized进行修饰,同步锁的存在保证了线程的安全。
    

    1、Vector线程安全测试:

    @Slf4j
    public class VectorExample1 {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Vector<Integer> list = new Vector<>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",list.size());
        }
    
        private static void  update(int i){
            list.add(i);
        }
    }
    

    测试执行结果:

    15:35:31.360 [main] INFO com.mmall.concurrency.example.syncContainer.VectorExample1 - size:5000
    
    Process finished with exit code 0

    由结果可知,size的长度为5000,说明了Vector的add操作在多线程并发环境下是线程安全的。

    注意:即使是线程安全的Vector也可能发生线程不安全的情况,如下演示

    @Slf4j
    @NoThreadSafe
    public class VectorExample2 {
    
        private static Vector<Integer> vector = new Vector<>();
    
        public static void main(String[] args) {
    
            while (true){
                for (int i = 0 ; i < 10 ; i++ ){
                    vector.add(i);
                }
    
                Thread thread1 = new Thread(){
                    public void run (){
                        for (int i = 0 ; i < vector.size() ; i++){
                            vector.remove(i);
                        }
                    }
                };
                Thread thread2 = new Thread(){
                    public void run (){
                        for (int i = 0 ; i < vector.size() ; i++){
                            vector.get(i);
                        }
                    }
                };
                thread1.start();
                thread2.start();
            }
    
        }
    
    }

    执行结果:

    如图,vector的多线程操作发生了异常,全都集中在执行get()操作时,一直发生数组索引越界的异常问题。

    原因分析:vector 是一个线程同步容器,所有的remove操作都是有synchronized修饰的,get操作也是有synchronized修饰的,如图:

    在Vector中由于有synchronized同步锁机制,保证了当前两个线程即thread1 和 thread2 是属于两个独立的同步线程;但是,在实际执行代码的过程中,当thread1执行了remove()删除操作时,thread2正好也执行到了get()方法,两者由于相对独立且同步,所以当thread1删除了某个索引的值时,thread2依旧会去get()获取那个索引位的值,但这时候对应的值已经被删除了,所以java会抛出索引越界的异常来提示用户,当前所要get()的值是不存在的。

    2、将HashMap替换成Hashtable实现线程安全:

    代码演示:

    @Slf4j
    @ThreadSafe
    public class HashTableExample {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer,Integer> map = new Hashtable<>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",map.size());
        }
    
        private static void  update(int i){
            map.put(i,i);
        }
    }
    

    执行结果:

    16:07:08.814 [main] INFO com.mmall.concurrency.example.syncContainer.HashTableExample - size:5000
    
    Process finished with exit code 0

    由结果可知:size的长度为5000,说明了HashTableput操作在多线程并发环境下是线程安全的。

    源码查看:

    由上图可知,HashTableputremove等方法的底层实现都是使用synchronized修饰的,是线程安全的。

    八、同步容器-2

    1、synchronizedList测试线程安全:

    @Slf4j
    @ThreadSafe
    public class CollectionsExample1 {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",list.size());
        }
    
        private static void  update(int i){
            list.add(i);
        }
    }
    

    执行测试打印结果:

    16:17:49.852 [main] INFO com.mmall.concurrency.example.syncContainer.CollectionsExample1 - size:5000
    
    Process finished with exit code 0

    由结果可知:size的长度为5000,说明了synchronizedListadd操作在多线程并发环境下是线程安全的。

    2、synchronizedSet测试线程安全:

    @Slf4j
    @ThreadSafe
    public class CollectionsExample2 {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",set.size());
        }
    
        private static void  update(int i){
            set.add(i);
        }
    }
    

    执行测试打印结果:

    16:20:59.015 [main] INFO com.mmall.concurrency.example.syncContainer.CollectionsExample2 - size:5000
    
    Process finished with exit code 0

    由结果可知:size的长度为5000,说明了synchronizedSetadd操作在多线程并发环境下是线程安全的。

    3、synchronizedMap测试线程安全:

    @Slf4j
    @NoThreadSafe
    public class HashMapExample {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer,Integer> map = new HashMap<Integer,Integer>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",map.size());
        }
    
        private static void  update(int i){
             map.put(i,i);
        }
    }
    

    执行测试打印结果:

    16:23:13.660 [main] INFO com.mmall.concurrency.example.syncContainer.CollectionsExample13 - size:5000
    
    Process finished with exit code 0

    由结果可知:size的长度为5000,说明了synchronizedMapput操作在多线程并发环境下是线程安全的。

    九、并发容器及安全共享策略总结

    ArrayList  - >  CopyOnWriteArrayList

    HashSet 、TreeSet - > CopyOnWriteArraySet ConcurrentSkipListSet

    HashMap 、 TreeMap - > ConcurrentHashMap ConcurrentSkipListMap

    1、用CopyOnWriteArrayList 替代ArrayList

    1) 使用原理:

    写操作时复制,当有新元素添加到CopyOnWriteArrayList时,会从原有数组里面拷贝一份出来,在新的数组上用写操作,写完之后把原来的数组指向新的数组,CopyOnWriteArrayList的整个add()操作都是在锁的保护下进行的,主要是为了避免在多线程情况下复制出多个副本出来把数据搞乱,导致最终返回的数据结果不是我们所期待的。

    2) 适用场景:

         CopyOnWriteArrayList适合读多写少的场景。

    3) CopyOnWriteArrayList的设计思想:

    第一点:读写分离,让读和写分开;

    第二点:最终一致性,因为在copy的过程需要一些时间,而最终一致性保证了数据是对的;

    第三点:使用时另外开辟空间,通过这种方式解决掉并发冲突。

    CopyOnWriteArrayList读操作时是在原数组上读的,不需要加锁;而写操作的时候需要加锁,以避免产生多个副本出来,影响最终的数据结果。

    4) 代码演示验证CopyOnWriteArrayList的写操作线程安全性:

    @Slf4j
    public class CopyOnWriteArrayListExample {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new CopyOnWriteArrayList<>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",list.size());
        }
    
        private static void  update(int i){
            list.add(i);
        }
    }
    

    执行测试打印结果:

    16:50:23.972 [main] INFO com.mmall.concurrency.CopyOnWriteArrayListExample - size:5000
    
    Process finished with exit code 0

    由结果可知:size的长度为5000,说明了CopyOnWriteArrayList的add操作在多线程并发环境下是线程安全的。

    2、HashSet 、TreeSet - > CopyOnWriteArraySet ConcurrentSkipListSet

    1) 代码演示验证CopyOnWriteArraySet的写操作线程安全性:

    @Slf4j
    public class CopyOnWriteArraySetExample {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = new CopyOnWriteArraySet<>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",set.size());
        }
    
        private static void  update(int i){
            set.add(i);
        }
    }
    

    执行测试打印结果:

    16:56:40.047 [main] INFO com.mmall.concurrency.example.concurrent.CopyOnWriteArraySetExample - size:5000
    
    Process finished with exit code 0
    

    由结果可知:size的长度为5000,说明了CopyOnWriteArraySet的add操作在多线程并发环境下是线程安全的。

    2) 代码演示验证ConcurrentSkipListSet的写操作线程安全性:

    @Slf4j
    public class ConcurrentSkipListSetExample {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = new ConcurrentSkipListSet<>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",set.size());
        }
    
        private static void  update(int i){
            set.add(i);
        }
    }
    

    执行测试打印结果:

    16:58:05.936 [main] INFO com.mmall.concurrency.example.concurrent.ConcurrentSkipListSetExample - size:5000
    
    Process finished with exit code 0

    由结果可知:size的长度为5000,说明了ConcurrentSkipListSetadd操作在多线程并发环境下是线程安全的。

    这里的线程安全仅限于做add操作时,如果是做remove操作,还需要其他锁机制保障线程安全。

    3、HashMap 、 TreeMap - > ConcurrentHashMap ConcurrentSkipListMap

    对并发要求比较高的时候,建议使用ConcurrentSkipListMap

    1) 代码演示验证ConcurrentHashMap的写操作线程安全性:

    @Slf4j
    public class ConcurrentHashMapExample {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer,Integer> map = new ConcurrentHashMap<>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",map.size());
        }
    
        private static void  update(int i){
            map.put(i,i);
        }
    }
    

    执行测试打印结果:

    17:04:16.524 [main] INFO com.mmall.concurrency.example.concurrent.ConcurrentHashMapExample - size:5000
    
    Process finished with exit code 0

    由结果可知:size的长度为5000,说明了ConcurrentHashMapadd操作在多线程并发环境下是线程安全的。

    2) 代码演示验证ConcurrentSkipListMap的写操作线程安全性:

    @Slf4j
    public class ConcurrentSkipListMapExample {
    
        //请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer,Integer> map = new ConcurrentSkipListMap<>();
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0 ; i < clientTotal ; i++){
                int count = i;
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    }catch (Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",map.size());
        }
    
        private static void  update(int i){
            map.put(i,i);
        }
    }
    

    执行测试打印结果:

    17:06:13.842 [main] INFO com.mmall.concurrency.example.concurrent.ConcurrentSkipListMapExample - size:5000
    
    Process finished with exit code 0

    由结果可知:size的长度为5000,说明了ConcurrentSkipListMapadd操作在多线程并发环境下是线程安全的

     

    总结:

    安全共享对象策略 - 总结:

      线程限制:一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改

      共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它

      线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它

      被守护对象:被守护对象只能通过获取特定的锁来访问

  • 相关阅读:
    Python创建空DataFrame及添加行数据
    Python读取Excel文件
    Python拆分DataFrame
    Python中识别DataFrame中的nan
    Python线性回归算法【解析解,sklearn机器学习库】
    Python鸢尾花分类实现
    Python机器学习入门
    Python使用map,reduce高阶函数模拟实现Spark的reduceByKey算子功能
    Python参数传递(传值&传引用)
    Python迭代器
  • 原文地址:https://www.cnblogs.com/wushaopei/p/11979085.html
Copyright © 2011-2022 走看看