zoukankan      html  css  js  c++  java
  • 4. 线程安全策略

    本章内容

      1.不可变对象:不可变对象条件、final关键字、Collections.unmodifiableXXX、Guava ImmutableXXX

      2.线程封闭:Ad-hoc线程封闭、堆栈封闭、ThreadLocal-JDBC分析

      3.线程不安全类:StringBuilder->StringBufferSimpleDataFormat->JodaTime、ArrayListHashMap

      4.同步容器:ArrayList->Vector,Stack、HashMap->HashTable、Collections.synchronizedXXX

      5.并发容器:ArrayList->CopyOnWriteArrayList、HashSet,TreeSet->CopyOnWriteArraySet,ConcurrentSkipListSet、       HashMap,TreeMap->CopyOnWriteArrayMap,ConcurrentSkipListMap


    一、不可变对象 

      1.不可变对象需要满足的条件:

        ①对象创建之后其状态不能修改

        ②对象所有域都是final类型的

        ③对象是正确创建的(创建期间,this引用没有逸出)

        可参考String类,可以采用的方法是将类声明成final,将所有成员都声明成私有的,对变量不提供set方法。将所有可变成员声明成final,通过构造器初始化所有成员,进行深度拷贝时在get方法中返回对象的拷贝,而不是对象本身。

      2.final关键字

        ①修饰类:该类不能被继承(类中的方法隐式被final修饰)

        ②修饰方法:锁定方法不被子类修改,提高效率,被private修饰的方式隐式的被final修饰

        ③修饰变量:基本数据类型变量、引用类型变量

        特例:一个容器被final修饰,但是容器里的内容可以修改。

      3.Collections.unmodifiableXXX:Collection、List、Set、Map

        <import java.util.Collections;>

        此方法建立的Collection、List、Set、Map为严格上的不可变(内部元素也不可变)。

      4.ImmutableXXX:Collection、List、Set、Map

       <com.google.common.collect<>

       作用和原理与Collection.unmodifyXXX相同。

        

     二、线程封闭

      实现好的并发是一件困难的事情,所以很多时候我们都想躲避并发。避免并发最简单的方法就是线程封闭。线程封闭就是把对象封装到一个线程里,只有这一个线程能看到此对象。那么这个对象就算不是线程安全的也不会出现任何安全问题。实现线程封闭的方法如下:

    1.Ad-hoc线程封闭:程序控制实现,比较差,可忽略

    2.堆栈封闭:堆栈封闭简单的说就是局部变量。多个线程访问一个方法,此方法中的局部变量都会被拷贝一分儿到线程栈中。所以局部变量是不被多个线程所共享的,也就不会出现并发问题。所以能用局部变量就别用全局的变量,全局变量容易引起并发问题

    3.ThreadLocal:使用ThreadLocal是实现线程封闭的最好方法。ThreadLocal内部维护了一个Map,Map的key是每个线程的名称,而Map的值就是我们要封闭的对象。每个线程中的对象都对应着Map中一个值,也就是ThreadLocal利用Map实现了对象的线程封闭

    API:

    void set(Object value) 设置当前线程的线程局部变量的值。 
    public Object get() 该方法返回当前线程所对应的线程局部变量。 
    public void remove() 将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是JDK 5.0新增的方法。
    注意,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。 
    protected Object initialValue() 返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。
    这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次。ThreadLocal中的缺省实现直接返回一个null。

     三、线程不安全类

    1.StringBuilder-》StringBuffer

    StringBuffer同一时间只有一个线程执行任务,不出错但是性能差。

     2.SimpleDataFormat-》JodaTime

      SimpleDataFormat线程不安全,使用时应定义为成员变量(堆栈封闭策略)。

      org.joda.time包内的时间转换类线程安全。

    3.、ArrayList、HashMap、HashSet

      都线程不安全

    四、同步容器

    1.ArrayList->Vector,Stack、HashMap->HashTable

      Vector、Stack、HashTable内部方法被synchronized修饰,synchronized关键字将所有使用到非线程安全的容器代码全部同步执行。这种方式存在几个明显的问题:首先编码上存在一定的复杂性,相关的代码段都需要添加锁。其次这种一刀切的做法在高并发情况下性能并不理想,基本相当于串行执行。这三各类并不是线程安全的,synchronized只能保证该方法同一时刻只被一个线程调用,但是多个方法组合依然会导致线程不安全。

      例如下图中remove和get方法,多线程情况下,remove已经全部将vector内元素移除,但是依然有get方法在进行,此时便会报错。

    2.Collections.synchronizedXXX

       此方法是线程安全的

     

     五、并发容器

      并发容器是针对同步容器而出现的,位于JUC包内。

    1.ArrayList->CopyOnWriteArrayList

       CopyOnWriteArrayList、CopyOnWriteArraySet通过多线程下读写分离来达到提高并发行的目的,任何时候都可以读,但写操作需要加锁。对容器的修改加锁后,通过copy一个新的容器来进行修改,修改完之后再将容器替换为新的容器。

      因为需要copy数组,需要消耗内存,可能引发young GC,并且不能做到实时性,适合读多写少的场景。由于修改中copy了新的数组进行替换,同时原数组依然在被使用,那么新的数据就不能及时读取到,这样就造成了数据不一致问题

     2.ConcurrentHashMap

      ConcurrentHashMap容器相较于CopyOnWrite容器在并发加锁粒度上有了更大一步的优化,它通过修改对单个hash桶元素加锁的达到了更细粒度的并发控制。在底层数据结构上,ConcurrentHashMap和HashMap都使用了数组+链表+红黑树的方式,只是在HashMap的基础上添加了并发相关的一些控制,所以这里只对ConcurrentHashMap中并发相关代码做一些分析。

     1 final V putVal(K key, V value, boolean onlyIfAbsent) {
     2     if (key == null || value == null) throw new NullPointerException();
     3     int hash = spread(key.hashCode()); //计算桶的hash值
     4     int binCount = 0;
     5     //循环插入元素,避免并发插入失败
     6     for (Node<K,V>[] tab = table;;) {
     7         Node<K,V> f; int n, i, fh;
     8         if (tab == null || (n = tab.length) == 0)
     9             tab = initTable();
    10         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    11             //如果当前桶无元素,则通过cas操作插入新节点
    12             if (casTabAt(tab, i, null,
    13                             new Node<K,V>(hash, key, value, null)))
    14                 break;                   
    15         }
    16         //如果当前桶正在扩容,则协助扩容
    17         else if ((fh = f.hash) == MOVED)
    18             tab = helpTransfer(tab, f);
    19         else {
    20             V oldVal = null;
    21             //hash冲突时锁住当前需要添加节点的头元素,可能是链表头节点或者红黑树的根节点
    22             synchronized (f) { 
    23                 if (tabAt(tab, i) == f) {
    24                     if (fh >= 0) {
    25                         binCount = 1;
    26                         for (Node<K,V> e = f;; ++binCount) {
    27                             K ek;
    28                             if (e.hash == hash &&
    29                                 ((ek = e.key) == key ||
    30                                     (ek != null && key.equals(ek)))) {
    31                                 oldVal = e.val;
    32                                 if (!onlyIfAbsent)
    33                                     e.val = value;
    34                                 break;
    35                             }
    36                             Node<K,V> pred = e;
    37                             if ((e = e.next) == null) {
    38                                 pred.next = new Node<K,V>(hash, key,
    39                                                             value, null);
    40                                 break;
    41                             }
    42                         }
    43                     }
    44                     else if (f instanceof TreeBin) {
    45                         Node<K,V> p;
    46                         binCount = 2;
    47                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    48                                                         value)) != null) {
    49                             oldVal = p.val;
    50                             if (!onlyIfAbsent)
    51                                 p.val = value;
    52                         }
    53                     }
    54                 }
    55             }
    56             if (binCount != 0) {
    57                 if (binCount >= TREEIFY_THRESHOLD)
    58                     treeifyBin(tab, i);
    59                 if (oldVal != null)
    60                     return oldVal;
    61                 break;
    62             }
    63         }
    64     }
    65     addCount(1L, binCount);
    66     return null;
    67 }
    • 如果当前桶对应的节点还没有元素插入,通过典型的无锁cas操作尝试插入新节点,减少加锁的概率,并发情况下如果插入不成功,很容易想到自旋
    • 如果当前桶正在扩容,则协助扩容。这里是一个重点,ConcurrentHashMap的扩容和HashMap不一样,它在多线程情况下或使用多个线程同时扩容,每个线程扩容指定的一部分hash桶,当前线程扩容完指定桶之后会继续获取下一个扩容任务,直到扩容全部完成。扩容的大小和HashMap一样,都是翻倍,这样可以有效减少移动的元素数量,也就是使用2的幂次方的原因,在HashMap中也一样。
    • 在发生hash冲突时仅仅只锁住当前需要添加节点的头元素即可,可能是链表头节点或者红黑树的根节点,其他桶节点都不需要加锁,大大减小了锁粒度。

      通过ConcurrentHashMap添加元素的过程,知道了ConcurrentHashMap容器是通过CAS + synchronized一起来实现并发控制的。

    3.ConcurrentSkipListMap

      java.util中对应的容器在java.util.concurrent包中基本都可以找到对应的并发容器:List和Set有对应的CopyOnWriteArrayList与CopyOnWriteArraySet,HashMap有对应的ConcurrentHashMap,但是有序的TreeMap或并没有对应的ConcurrentTreeMap。

      这是因为TreeMap内部使用了红黑树来实现,红黑树是一种自平衡的二叉树,当树被修改时,需要重新平衡,重新平衡操作可能会影响树的大部分节点,如果并发量非常大的情况下,这就需要在许多树节点上添加互斥锁,那并发就失去了意义。所以提供了另外一种并发下的有序map实现:ConcurrentSkipListMap。

      ConcurrentSkipListMap内部使用跳表(SkipList)这种数据结构来实现,他的结构相对红黑树来说非常简单理解,实现起来也相对简单,而且在理论上它的查找、插入、删除时间复杂度都为log(n)。在并发上,ConcurrentSkipListMap采用无锁的CAS+自旋来控制。

      跳表简单来说就是一个多层的链表,底层是一个普通的链表,然后逐层减少,通常通过一个简单的算法实现每一层元素是下一层的元素的二分之一,这样当搜索元素时从最顶层开始搜索,可以说是另一种形式的二分查找。

      1 private V doPut(K key, V value, boolean onlyIfAbsent) {
      2     Node<K,V> z;             // added node
      3     if (key == null)
      4         throw new NullPointerException();
      5     Comparator<? super K> cmp = comparator;
      6     outer: for (;;) {
      7         for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前继节点
      8             if (n != null) { //查找到前继节点
      9                 Object v; int c;
     10                 Node<K,V> f = n.next; //获取后继节点的后继节点
     11                 if (n != b.next)  //发生竞争,两次节点获取不一致,并发导致
     12                     break;
     13                 if ((v = n.value) == null) {  // 节点已经被删除
     14                     n.helpDelete(b, f);
     15                     break;
     16                 }
     17                 if (b.value == null || v == n) 
     18                     break;
     19                 if ((c = cpr(cmp, key, n.key)) > 0) { //进行下一轮查找,比当前key大
     20                     b = n;
     21                     n = f;
     22                     continue;
     23                 }
     24                 if (c == 0) { //相等时直接cas修改值
     25                     if (onlyIfAbsent || n.casValue(v, value)) {
     26                         @SuppressWarnings("unchecked") V vv = (V)v;
     27                         return vv;
     28                     }
     29                     break; // restart if lost race to replace value
     30                 }
     31                 // else c < 0; fall through
     32             }
     33 
     34             z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
     35             if (!b.casNext(n, z)) //cas修改值 
     36                 break;         // restart if lost race to append to b
     37             break outer;
     38         }
     39     }
     40 
     41     int rnd = ThreadLocalRandom.nextSecondarySeed(); //获取随机数
     42     if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
     43         int level = 1, max;
     44         while (((rnd >>>= 1) & 1) != 0) // 获取跳表层级
     45             ++level;
     46         Index<K,V> idx = null;
     47         HeadIndex<K,V> h = head;
     48         if (level <= (max = h.level)) { //如果获取的调表层级小于等于当前最大层级,则直接添加,并将它们组成一个上下的链表
     49             for (int i = 1; i <= level; ++i)
     50                 idx = new Index<K,V>(z, idx, null);
     51         }
     52         else { // try to grow by one level //否则增加一层level,在这里体现为Index<K,V>数组
     53             level = max + 1; // hold in array and later pick the one to use
     54             @SuppressWarnings("unchecked")Index<K,V>[] idxs =
     55                 (Index<K,V>[])new Index<?,?>[level+1];
     56             for (int i = 1; i <= level; ++i)
     57                 idxs[i] = idx = new Index<K,V>(z, idx, null);
     58             for (;;) {
     59                 h = head;
     60                 int oldLevel = h.level;
     61                 if (level <= oldLevel) // lost race to add level
     62                     break;
     63                 HeadIndex<K,V> newh = h;
     64                 Node<K,V> oldbase = h.node;
     65                 for (int j = oldLevel+1; j <= level; ++j) //新添加的level层的具体数据
     66                     newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
     67                 if (casHead(h, newh)) {
     68                     h = newh;
     69                     idx = idxs[level = oldLevel];
     70                     break;
     71                 }
     72             }
     73         }
     74         // 逐层插入数据过程
     75         splice: for (int insertionLevel = level;;) {
     76             int j = h.level;
     77             for (Index<K,V> q = h, r = q.right, t = idx;;) {
     78                 if (q == null || t == null)
     79                     break splice;
     80                 if (r != null) {
     81                     Node<K,V> n = r.node;
     82                     // compare before deletion check avoids needing recheck
     83                     int c = cpr(cmp, key, n.key);
     84                     if (n.value == null) {
     85                         if (!q.unlink(r))
     86                             break;
     87                         r = q.right;
     88                         continue;
     89                     }
     90                     if (c > 0) {
     91                         q = r;
     92                         r = r.right;
     93                         continue;
     94                     }
     95                 }
     96 
     97                 if (j == insertionLevel) {
     98                     if (!q.link(r, t))
     99                         break; // restart
    100                     if (t.node.value == null) {
    101                         findNode(key);
    102                         break splice;
    103                     }
    104                     if (--insertionLevel == 0)
    105                         break splice;
    106                 }
    107 
    108                 if (--j >= insertionLevel && j < level)
    109                     t = t.down;
    110                 q = q.down;
    111                 r = q.right;
    112             }
    113         }
    114     }
    115     return null;

      可以分为3大步来理解:第一步获取前继节点后通过CAS来插入节点;第二步对level层数进行判断,如果大于最大层数,则插入一层;第三步插入对应层的数据。整个插入过程全部通过CAS自旋的方式保证并发情况下的数据正确性。

  • 相关阅读:
    pipelinewise 学习二 创建一个简单的pipeline
    pipelinewise 学习一 docker方式安装
    Supercharging your ETL with Airflow and Singer
    ubuntu中使用 alien安装rpm包
    PipelineWise illustrates the power of Singer
    pipelinewise 基于singer 指南的的数据pipeline 工具
    关于singer elt 的几篇很不错的文章
    npkill 一个方便的npm 包清理工具
    kuma docker-compose 环境试用
    kuma 学习四 策略
  • 原文地址:https://www.cnblogs.com/qmillet/p/12084750.html
Copyright © 2011-2022 走看看