目录
1. 锁用来做什么?
2. 锁的实现方式
4. 生产者消费者中的锁
5. Condition 接口
6. ReadWriteLock
7. 关于使用线程安全的集合
8. 关于队列
9. 关于i++的不安全问题与AtomicInteger
1. 锁用来做什么?
解决线程同步问题,当多线程共同访问同一个对象(临界资源)的时候, 如果破坏了不可分割的操作(原子操作),就可能发生数据不一致,有可能出现多个线程先后更改数据,造成所得到的数据是脏数据。锁是锁定临界资源。
2. 锁的实现方式
在Java中通常实现锁有两种方式,一种是synchronized关键字,另一种是Lock。
①使用 synchronized。必须要获取当前对象的互斥锁标记,如果得不到就被阻塞,直到得到互斥锁标记。线程执行完同步方法,会自动归还互斥锁标记
②使用Lock。 Lock接口的常用实现类 ReentrantLock /riː'entrənt/ :互斥锁
两者的区别:
①首先最大的不同:synchronized是基于JVM层面实现的,而Lock是基于JDK层面实现的。
②synchronized是一个关键字,Lock是一个接口.
③synchronized代码块执行完成之后会自动释放锁对象,Lock必须手动调用方法释放锁对象。
④synchronized代码块出现了异常也会自动释放锁对象,Lock接口中出现异常也需要手动释放锁对象。
⑤在并发量比较小的情况下,使用synchronized;但是在并发量比较高的情况下,其性能下降会很严重,此时推荐使用ReentrantLock。
实例:
package day20; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLock { public static void main(String[] args) throws Exception { MyList2 list = new MyList2(); Thread t1 = new Thread(new Runnable(){ public void run(){ list.add("C"); } }); Thread t2 = new Thread(new Runnable(){ public void run(){ list.add("D"); } }); t1.start(); t2.start(); t1.join(); t2.join(); list.add("E"); list.print(); } } class MyList2{ String[] data = {"A","B","","","",""}; int index = 2; Lock lock = new ReentrantLock();//Lock接口,ReentrantLock为实现类 public void add(String s){ try{ lock.lock();//加锁 //lock.tryLock();尝试加锁,失败时返回false,此时可进行其他操作,但有可能造成活锁。 data[index] = s ; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } index++; } finally{ lock.unlock();//释放锁,为了避免锁内的代码块出现异常后直接返回而没有释放锁的问题,将此句代码放到Finally中 } } public void print(){ for(int i = 0 ; i < data.length ; i++){ System.out.println(data[i]); } } }
怎么来避免死锁?
o.wait():线程会释放锁标记,进入等待状态
o.notify()/o.notifyAll():从等待状态中释放一个/全部线程
注意:以上三个方法必须出现在对o加锁的同步代码块中
4. 生产者消费者中的锁
使用准则:
1 永远在synchronized的方法或对象里使用wait、notify和notifyAll,不然Java虚拟机会生成 IllegalMonitorStateException。
2 永远在while循环里而不是if语句下使用wait。这样,循环会在线程睡眠前后都检查wait的条件,并在条件实际上并未改变的情况下处理唤醒通知。
3 永远在多线程间共享的对象(在生产者消费者模型里即缓冲区队列)上使用wait。
实例:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestProducerConsumer { public static void main(String[] args) { MyStack stack = new MyStack(); Runnable task1 = new Runnable(){ public void run(){ for(char c = 'A' ; c<='Z' ; c++){ stack.push(c+""); } } }; Runnable task2 = new Runnable(){ public void run(){ for(int i = 1 ; i <= 26; i++){ stack.pop(); } } }; new Thread(task1).start(); new Thread(task1).start(); new Thread(task2).start(); new Thread(task2).start(); } } class MyStack{ String[] data = {"","","","","",""}; int index; Lock lock = new ReentrantLock(); Condition full = lock.newCondition();//获得Condition实例 Condition empty = lock.newCondition(); public void push(String s){ try { lock.lock(); while (data.length == index) { try { full.await();//(不符合条件的等待)满了即等待 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(s + " pushed "); data[index] = s; index++; print(); empty.signalAll();//通知消费者 } finally{ lock.unlock(); } } public void pop(){ try { lock.lock(); while (index == 0) { try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } index--; String o = data[index]; data[index] = ""; System.out.print(o + " poped "); print(); full.signalAll(); //通知生产者 } finally{ lock.unlock(); } } public void print(){ for(int i = 0 ; i < data.length ; i++){ System.out.print(data[i]+" "); } System.out.println(); } }
5. Condition 接口
public interface Condition (接口)
条件(Conditio也称为条件队列或条件变量 )为一个线程的暂停执行(“等待”)提供了一种方法,直到另一个线程通知某些状态现在可能为真。
Condition取代了对象监视器方法的使用。可以使用两个Condition实例来实现
一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,使用其newCondition()方法。
void | await() 导致当前线程等到发信号或 interrupted 。 |
---|---|
void | signal() 唤醒一个等待线程。 |
void | signalAll() 唤醒所有等待线程。 |
例如,假设我们有一个有限的缓冲区,它支持put
和take
方法。 如果在一个空的缓冲区尝试一个take
,则线程将阻塞直到一个项目可用; 如果put
试图在一个完整的缓冲区,那么线程将阻塞,直到空间变得可用。 我们希望在单独的等待集中等待put
线程和take
线程,以便我们可以在缓冲区中的项目或空间可用的时候使用仅通知单个线程的优化。 这可以使用两个Condition
实例来实现。
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await();//满了,put等待 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal();//唤醒take } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await();//空了,take等待 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal();//唤醒put return x; } finally { lock.unlock(); } } }
( ArrayBlockingQueue
类提供此功能,因此没有理由实现此示例使用类。)
实例:数字和字母交替打印
public class TestNumberCharPrint { public static void main(String[] args) throws InterruptedException { final Object o = new Object();//全局对象,用于分别不同时间拿到锁标记来交替 Runnable task1 = new Runnable(){ public void run(){ synchronized (o) {//加锁保证此处原子操作 for (int i = 1; i <= 52; i++) { System.out.println(i); if (i % 2 ==0){ o.notifyAll();//释放字母线程 try { if(i!=52) o.wait();//若等于52时进入等待,则此线程已完成全部任务单还没结束 } catch (InterruptedException e) { e.printStackTrace(); } } } } } }; Runnable task2 = new Runnable(){ public void run(){ synchronized (o) { for (char c = 'A'; c <= 'Z'; c++) { System.out.println(c); o.notifyAll();//释放数字线程 try { if (c!='Z') o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }; Thread t1 = new Thread(task1); Thread t2 = new Thread(task2); t1.start(); Thread.sleep(1); t2.start(); } }
6.
Lock接口的一个实现类
ReadWriteLock (读写锁)是什么
ReadWriteLock维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。
读锁和写锁不能被同时加载,写锁加载则不能读,读锁加载则不能写。
若写锁未被加载,读取锁可以多个读线程同时保持,
若读锁未被加载,写入锁也是独占的,不能同时写。
实例:
package day20; import java.util.ArrayList; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class TestMyList { public static void main(String[] args) { CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); list.add("A"); } } //改造ArrayList为线程安全的(部分方法,通过加读或写锁来实现) class MyList extends ArrayList{ ReadWriteLock rwl = new ReentrantReadWriteLock();//读写锁 Lock rl = rwl.readLock();//读锁 Lock wl = rwl.writeLock();//写锁 @Override public int size() { try{ rl.lock(); return super.size(); } finally{ rl.unlock(); } } @Override public Object get(int index) { try{ rl.lock(); return super.get(index); } finally{ rl.unlock(); } } @Override public boolean add(Object e) { try{ wl.lock(); return super.add(e); } finally{ wl.unlock(); } } @Override public Object remove(int index) { try{ wl.lock(); return super.remove(index); } finally{ wl.unlock(); } } @Override public void clear() { try{ wl.lock(); super.clear(); } finally{ wl.unlock(); } } }
7. 关于使用线程安全的集合
Collection
List (ArrayList LinkedList Vector CopyOnWriteArrayList)
Set (HashSet LinkedHashSet CopyOnWriteArraySet)
SortedSet (TreeSet)
Queue (LinkedList ConcurrentLinkedQueue)
BlockingQueue (ArrayBlockingQueue LinkedBlockingQueue)
Map (HashMap LinkedHashMap Hashtable Properties ConcurrentHashMap )
SortedMap (TreeMap)
以下集合的效率都比直接加锁的效率高
CopyOnWriteArrayList 利用复制数组的方式实现数组元素的修改, 写效率低 读效率高(读操作远多于写操作) 总体效率提高
CopyOnWriteArraySet 线程安全的Set集合
ConcurrentHashMap 分段锁,将HashMap的数组链表分为16段,多个线程读取和写入同一段时,需依次进行(需等待),读取或写入不同段时互不 影响,由于HashCode相等的概率不大,所以效率远高于HashTable。
ConcurrentLinkedQueue 线程安全的队列(链表实现的) 利用一个无锁算法(CAS,和预期值比较,不同则重试)实现线程安全——效率高
8. 关于队列
Queue:队列 FIFO
常用方法:
add() :添加元素
offer():添加元素 优先使用
remove():删除元素
poll ():删除元素,优先使用
element():获取队列的头元素
peek():获取队列的头元素 优先使用
实现类:LinkedList ConcurrentLinkedQueue
BlockQueue 阻塞队列 (是个接口)
put () 添加元素到队列中 如果队列满,则等待
take()删除队列头元素 , 如果队列空,则等待
实现类:ArrayBlockingQueue 数组实现 有界队列 put方法可能会等待
LinkedBlockingQueue 链表实现 无界队列 put方法不等待
package day21; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestBlockingQueue { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<String>(6);//队列 Runnable task1 = new Runnable(){ public void run(){ for(int i = 1 ; i<= 100; i++){ try { queue.put("A"+i); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable task2 = new Runnable(){ public void run(){ for(int i = 1 ; i<= 100; i++){ try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }; ExecutorService es = Executors.newFixedThreadPool(2); es.submit(task1); es.submit(task2); es.shutdown(); } }
9. 关于i++的不安全问题与AtomicInteger
i++是先把数i读到另外一个寄存器,加1运算后再写回到原寄存器,中间过程被另外一个线程打断时就不是原子操作了,会造成结果不一致
package day21; import java.util.concurrent.atomic.AtomicInteger; public class TestAtomicInteger { static int i = 0 ; //error! static AtomicInteger a = new AtomicInteger(0);//利用AtomicInteger解决,还有AtomicBoolean等,利用了不加锁的比较算法(不是预期值时,撤回,重新加) static Integer b = Integer.valueOf(0);//error!无临界资源,因为Integer+1后成为了另外一个对象Integer,可以定义其他类型的对象来加锁,如下列的obj static MyObject obj = new MyObject(); public static void main(String[] args) throws Exception{ Thread[] ts = new Thread[10]; for(int k = 0 ; k<ts.length ; k++){ ts[k] = new Thread(new Runnable(){ public void run(){ for(int k = 1 ; k <= 10000; k++){ i++; a.incrementAndGet(); synchronized(b){ b = Integer.valueOf(b.intValue()+1); } synchronized(obj){ obj.x++; } } } }); ts[k].start(); } for(int k=0; k <ts.length ; k++){ ts[k].join(); } System.out.println(i); System.out.println(a); System.out.println(b); System.out.println(obj.x); } } class MyObject{ public int x=0; }
Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量(控制线程的数量)。
方法 acquire( int permits ) 参数作用,及动态添加 permits 许可数量
acquire( int permits ) 中的参数是什么意思呢?
new Semaphore(6) 表示初始化了 6个通路,
semaphore.acquire(2) 表示每次线程进入将会占用2个通路,
semaphore.release(2) 运行时表示归还2个通路。没有通路,则线程就无法进入代码块。
void | acquire() 从该信号量获取许可证,阻止直到可用,或线程为 interrupted 。 |
---|---|
void | acquire(int permits) 从该信号量获取给定数量的许可证,阻止直到所有可用,否则线程为 interrupted 。 |
void | release() 释放许可证,将其返回到信号量。 |
---|---|
void | release(int permits) 释放给定数量的许可证,将其返回到信号量。 |
package day21; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; public class TestSemaphore { public static void main(String[] args) { List<PhoneRoom> rooms = new ArrayList<>(); rooms.add(new PhoneRoom("Room 1")); rooms.add(new PhoneRoom("Room 2")); rooms.add(new PhoneRoom("Room 3")); rooms.add(new PhoneRoom("Room 4")); rooms.add(new PhoneRoom("Room 5")); Semaphore s = new Semaphore(5); class Task implements Runnable{ public void run(){ try { s.acquire(); } catch (InterruptedException e1) { e1.printStackTrace(); } for(int i = 0 ; i < rooms.size() ; i++){ PhoneRoom room = rooms.get(i); if (room.isFree()){ room.setFree(false); System.out.println(Thread.currentThread().getName()+" entered "+room.getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" exited "+room.getName()); room.setFree(true); s.release(); return; } } } } for(int i = 1 ; i <= 10 ; i++){ Runnable task = new Task(); Thread t = new Thread(task); t.start(); } } } class PhoneRoom{ AtomicBoolean isFree = new AtomicBoolean(true);//原子操作的boolean,只允许一个线程拿到 String name; public PhoneRoom(String name) { super(); this.name = name; } public boolean isFree() { return isFree.get(); } public void setFree(boolean flag) { this.isFree.set(flag); } public String getName(){ return name; } }
Package java.util.concurrent.atomic
一个小型工具包,支持单个变量上的无锁线程安全编程。
常用:
AtomicBoolean | 一个 boolean 值可以用原子更新。 |
---|---|
AtomicInteger | 可能原子更新的 int 值。 |