Java 8 并发教程:同步和锁
这篇文章中展示的中心概念也适用于Java的旧版本,然而代码示例适用于Java 8,并严重依赖于lambda表达式和新的并发特性。如果你还不熟悉lambda,我推荐你先阅读我的Java 8 教程。
出于简单的因素,这个教程的代码示例使用了定义在这里的两个辅助函数sleep(seconds)
和 stop(executor)
。
同步
在上一章中,我们学到了如何通过执行器服务同时执行代码。当我们编写这种多线程代码时,我们需要特别注意共享可变变量的并发访问。假设我们打算增加某个可被多个线程同时访问的整数。
我们定义了count
字段,带有increment()
方法来使count
加一:
int count = 0;
void increment() {
count = count + 1;
}
当多个线程并发调用这个方法时,我们就会遇到大麻烦:
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
.forEach(i -> executor.submit(this::increment));
stop(executor);
System.out.println(count); // 9965
我们没有看到count
为10000的结果,上面代码的实际结果在每次执行时都不同。原因是我们在不同的线程上共享可变变量,并且变量访问没有同步机制,这会产生竞争条件。
增加一个数值需要三个步骤:(1)读取当前值,(2)使这个值加一,(3)将新的值写到变量。如果两个线程同时执行,就有可能出现两个线程同时执行步骤1,于是会读到相同的当前值。这会导致无效的写入,所以实际的结果会偏小。上面的例子中,对count
的非同步并发访问丢失了35次增加操作,但是你在自己执行代码时会看到不同的结果。
幸运的是,Java自从很久之前就通过synchronized
关键字支持线程同步。我们可以使用synchronized
来修复上面在增加count
时的竞争条件。
synchronized void incrementSync() {
count = count + 1;
}
在我们并发调用incrementSync()
时,我们得到了count
为10000的预期结果。没有再出现任何竞争条件,并且结果在每次代码执行中都很稳定:
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
.forEach(i -> executor.submit(this::incrementSync));
stop(executor);
System.out.println(count); // 10000
synchronized
关键字也可用于语句块:
void incrementSync() {
synchronized (this) {
count = count + 1;
}
}
Java在内部使用所谓的“监视器”(monitor),也称为监视器锁(monitor lock)或内在锁( intrinsic lock)来管理同步。监视器绑定在对象上,例如,当使用同步方法时,每个方法都共享相应对象的相同监视器。
所有隐式的监视器都实现了重入(reentrant)特性。重入的意思是锁绑定在当前线程上。线程可以安全地多次获取相同的锁,而不会产生死锁(例如,同步方法调用相同对象的另一个同步方法)。
锁
并发API支持多种显式的锁,它们由Lock
接口规定,用于代替synchronized
的隐式锁。锁对细粒度的控制支持多种方法,因此它们比隐式的监视器具有更大的开销。
锁的多个实现在标准JDK中提供,它们会在下面的章节中展示。
ReentrantLock
ReentrantLock
类是互斥锁,与通过synchronized
访问的隐式监视器具有相同行为,但是具有扩展功能。就像它的名称一样,这个锁实现了重入特性,就像隐式监视器一样。
让我们看看使用ReentrantLock
之后的上面的例子。
ReentrantLock lock = new ReentrantLock();
int count = 0;
void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
锁可以通过lock()
来获取,通过unlock()
来释放。把你的代码包装在try-finally
代码块中来确保异常情况下的解锁非常重要。这个方法是线程安全的,就像同步副本那样。如果另一个线程已经拿到锁了,再次调用lock()
会阻塞当前线程,直到锁被释放。在任意给定的时间内,只有一个线程可以拿到锁。
锁对细粒度的控制支持多种方法,就像下面的例子那样:
executor.submit(() -> {
lock.lock();
try {
sleep(1);
} finally {
lock.unlock();
}
});
executor.submit(() -> {
System.out.println("Locked: " + lock.isLocked());
System.out.println("Held by me: " + lock.isHeldByCurrentThread());
boolean locked = lock.tryLock();
System.out.println("Lock acquired: " + locked);
});
stop(executor);
在第一个任务拿到锁的一秒之后,第二个任务获得了锁的当前状态的不同信息。
Locked: true
Held by me: false
Lock acquired: false
tryLock()
方法是lock()
方法的替代,它尝试拿锁而不阻塞当前线程。在访问任何共享可变变量之前,必须使用布尔值结果来检查锁是否已经被获取。
ReadWriteLock
ReadWriteLock
接口规定了锁的另一种类型,包含用于读写访问的一对锁。读写锁的理念是,只要没有任何线程写入变量,并发读取可变变量通常是安全的。所以读锁可以同时被多个线程持有,只要没有线程持有写锁。这样可以提升性能和吞吐量,因为读取比写入更加频繁。
ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
executor.submit(() -> {
lock.writeLock().lock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.writeLock().unlock();
}
});
上面的例子在暂停一秒之后,首先获取写锁来向映射添加新的值。在这个任务完成之前,两个其它的任务被启动,尝试读取映射中的元素,并暂停一秒:
Runnable readTask = () -> {
lock.readLock().lock();
try {
System.out.println(map.get("foo"));
sleep(1);
} finally {
lock.readLock().unlock();
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
当你执行这一代码示例时,你会注意到两个读任务需要等待写任务完成。在释放了写锁之后,两个读任务会同时执行,并同时打印结果。它们不需要相互等待完成,因为读锁可以安全同步获取,只要没有其它线程获取了写锁。
StampedLock
Java 8 自带了一种新的锁,叫做StampedLock
,它同样支持读写锁,就像上面的例子那样。与ReadWriteLock
不同的是,StampedLock
的锁方法会返回表示为long
的标记。你可以使用这些标记来释放锁,或者检查锁是否有效。此外,StampedLock
支持另一种叫做乐观锁(optimistic locking)的模式。
让我们使用StampedLock
代替ReadWriteLock
重写上面的例子:
ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.writeLock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.unlockWrite(stamp);
}
});
Runnable readTask = () -> {
long stamp = lock.readLock();
try {
System.out.println(map.get("foo"));
sleep(1);
} finally {
lock.unlockRead(stamp);
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
通过readLock()
或 writeLock()
来获取读锁或写锁会返回一个标记,它可以在稍后用于在finally
块中解锁。要记住StampedLock
并没有实现重入特性。每次调用加锁都会返回一个新的标记,并且在没有可用的锁时阻塞,即使相同线程已经拿锁了。所以你需要额外注意不要出现死锁。
就像前面的ReadWriteLock
例子那样,两个读任务都需要等待写锁释放。之后两个读任务同时向控制台打印信息,因为多个读操作不会相互阻塞,只要没有线程拿到写锁。
下面的例子展示了乐观锁:
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.tryOptimisticRead();
try {
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(1);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(2);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
} finally {
lock.unlock(stamp);
}
});
executor.submit(() -> {
long stamp = lock.writeLock();
try {
System.out.println("Write Lock acquired");
sleep(2);
} finally {
lock.unlock(stamp);
System.out.println("Write done");
}
});
stop(executor);
乐观的读锁通过调用tryOptimisticRead()
获取,它总是返回一个标记而不阻塞当前线程,无论锁是否真正可用。如果已经有写锁被拿到,返回的标记等于0。你需要总是通过lock.validate(stamp)
检查标记是否有效。
执行上面的代码会产生以下输出:
Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false
乐观锁在刚刚拿到锁之后是有效的。和普通的读锁不同的是,乐观锁不阻止其他线程同时获取写锁。在第一个线程暂停一秒之后,第二个线程拿到写锁而无需等待乐观的读锁被释放。此时,乐观的读锁就不再有效了。甚至当写锁释放时,乐观的读锁还处于无效状态。
所以在使用乐观锁时,你需要每次在访问任何共享可变变量之后都要检查锁,来确保读锁仍然有效。
有时,将读锁转换为写锁而不用再次解锁和加锁十分实用。StampedLock
为这种目的提供了tryConvertToWriteLock()
方法,就像下面那样:
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.readLock();
try {
if (count == 0) {
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
System.out.println("Could not convert to write lock");
stamp = lock.writeLock();
}
count = 23;
}
System.out.println(count);
} finally {
lock.unlock(stamp);
}
});
stop(executor);
第一个任务获取读锁,并向控制台打印count
字段的当前值。但是如果当前值是零,我们希望将其赋值为23
。我们首先需要将读锁转换为写锁,来避免打破其它线程潜在的并发访问。tryConvertToWriteLock()
的调用不会阻塞,但是可能会返回为零的标记,表示当前没有可用的写锁。这种情况下,我们调用writeLock()
来阻塞当前线程,直到有可用的写锁。
信号量
除了锁之外,并发API也支持计数的信号量。不过锁通常用于变量或资源的互斥访问,信号量可以维护整体的准入许可。这在一些不同场景下,例如你需要限制你程序某个部分的并发访问总数时非常实用。
下面是一个例子,演示了如何限制对通过sleep(5)
模拟的长时间运行任务的访问:
ExecutorService executor = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(5);
Runnable longRunningTask = () -> {
boolean permit = false;
try {
permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
if (permit) {
System.out.println("Semaphore acquired");
sleep(5);
} else {
System.out.println("Could not acquire semaphore");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} finally {
if (permit) {
semaphore.release();
}
}
}
IntStream.range(0, 10)
.forEach(i -> executor.submit(longRunningTask));
stop(executor);
执行器可能同时运行10个任务,但是我们使用了大小为5的信号量,所以将并发访问限制为5。使用try-finally
代码块在异常情况中合理释放信号量十分重要。
执行上述代码产生如下结果:
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
信号量限制对通过sleep(5)
模拟的长时间运行任务的访问,最大5个线程。每个随后的tryAcquire()
调用在经过最大为一秒的等待超时之后,会向控制台打印不能获取信号量的结果。
Java 8 并发教程:原子变量和 ConcurrentMap
译者:飞龙
欢迎阅读我的Java8多线程编程系列教程的第三部分。这个教程包含并发API的两个重要部分:原子变量和ConcurrentMap
。由于最近发布的Java8中的lambda表达式和函数式编程,二者都有了极大的改进。所有这些新特性会以一些简单易懂的代码示例来描述。希望你能喜欢。
- 第一部分:线程和执行器
- 第二部分:同步和锁
- 第三部分:原子变量和 ConcurrentMap
出于简单的因素,这个教程的代码示例使用了定义在这里的两个辅助函数sleep(seconds)
和 stop(executor)
。
AtomicInteger
java.concurrent.atomic
包包含了许多实用的类,用于执行原子操作。如果你能够在多线程中同时且安全地执行某个操作,而不需要synchronized
关键字或上一章中的锁,那么这个操作就是原子的。
本质上,原子操作严重依赖于比较与交换(CAS),它是由多数现代CPU直接支持的原子指令。这些指令通常比同步块要快。所以在只需要并发修改单个可变变量的情况下,我建议你优先使用原子类,而不是上一章展示的锁。
译者注:对于其它语言,一些语言的原子操作用锁实现,而不是原子指令。
现在让我们选取一个原子类,例如AtomicInteger
:
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> executor.submit(atomicInt::incrementAndGet));
stop(executor);
System.out.println(atomicInt.get()); // => 1000
通过使用AtomicInteger
代替Integer
,我们就能线程安全地并发增加数值,而不需要同步访问变量。incrementAndGet()
方法是原子操作,所以我们可以在多个线程中安全调用它。
AtomicInteger
支持多种原子操作。updateAndGet()
接受lambda表达式,以便在整数上执行任意操作:
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.updateAndGet(n -> n + 2);
executor.submit(task);
});
stop(executor);
System.out.println(atomicInt.get()); // => 2000
accumulateAndGet()
方法接受另一种类型IntBinaryOperator
的lambda表达式。我们在下个例子中,使用这个方法并发计算0~1000所有值的和:
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.accumulateAndGet(i, (n, m) -> n + m);
executor.submit(task);
});
stop(executor);
System.out.println(atomicInt.get()); // => 499500
其它实用的原子类有AtomicBoolean
、AtomicLong
和 AtomicReference
。
LongAdder
LongAdder
是AtomicLong
的替代,用于向某个数值连续添加值。
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> executor.submit(adder::increment));
stop(executor);
System.out.println(adder.sumThenReset()); // => 1000
LongAdder
提供了add()
和increment()
方法,就像原子数值类一样,同样是线程安全的。但是这个类在内部维护一系列变量来减少线程之间的争用,而不是求和计算单一结果。实际的结果可以通过调用sum()
或sumThenReset()
来获取。
当多线程的更新比读取更频繁时,这个类通常比原子数值类性能更好。这种情况在抓取统计数据时经常出现,例如,你希望统计Web服务器上请求的数量。LongAdder
缺点是较高的内存开销,因为它在内存中储存了一系列变量。
LongAccumulator
LongAccumulator
是LongAdder
的更通用的版本。LongAccumulator
以类型为LongBinaryOperator
lambda表达式构建,而不是仅仅执行加法操作,像这段代码展示的那样:
LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10)
.forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
stop(executor);
System.out.println(accumulator.getThenReset()); // => 2539
我们使用函数2 * x + y
创建了LongAccumulator
,初始值为1。每次调用accumulate(i)
的时候,当前结果和值i
都会作为参数传入lambda表达式。
LongAccumulator
就像LongAdder
那样,在内部维护一系列变量来减少线程之间的争用。
ConcurrentMap
ConcurrentMap
接口继承自Map
接口,并定义了最实用的并发集合类型之一。Java8通过将新的方法添加到这个接口,引入了函数式编程。
在下面的代码中,我们使用这个映射示例来展示那些新的方法:
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
forEach()
方法接受类型为BiConsumer
的lambda表达式,以映射的键和值作为参数传递。它可以作为for-each
循环的替代,来遍历并发映射中的元素。迭代在当前线程上串行执行。
map.forEach((key, value) -> System.out.printf("%s = %s
", key, value));
新方法putIfAbsent()
只在提供的键不存在时,将新的值添加到映射中。至少在ConcurrentHashMap
的实现中,这一方法像put()
一样是线程安全的,所以你在不同线程中并发访问映射时,不需要任何同步机制。
String value = map.putIfAbsent("c3", "p1");
System.out.println(value); // p0
getOrDefault()
方法返回指定键的值。在传入的键不存在时,会返回默认值:
String value = map.getOrDefault("hi", "there");
System.out.println(value); // there
replaceAll()
接受类型为BiFunction
的lambda表达式。BiFunction
接受两个参数并返回一个值。函数在这里以每个元素的键和值调用,并返回要映射到当前键的新值。
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2")); // d3
compute()
允许我们转换单个元素,而不是替换映射中的所有值。这个方法接受需要处理的键,和用于指定值的转换的BiFunction
。
map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo")); // barbar
除了compute()
之外还有两个变体:computeIfAbsent()
和 computeIfPresent()
。这些方法的函数式参数只在键不存在或存在时被调用。
最后,merge()
方法可以用于以映射中的现有值来统一新的值。这个方法接受键、需要并入现有元素的新值,以及指定两个值的合并行为的BiFunction
。
map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal);
System.out.println(map.get("foo")); // boo was foo
ConcurrentHashMap
所有这些方法都是ConcurrentMap
接口的一部分,因此可在所有该接口的实现上调用。此外,最重要的实现ConcurrentHashMap
使用了一些新的方法来改进,便于在映射上执行并行操作。
就像并行流那样,这些方法使用特定的ForkJoinPool
,由Java8中的ForkJoinPool.commonPool()
提供。该池使用了取决于可用核心数量的预置并行机制。我的电脑有四个核心可用,这会使并行性的结果为3:
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
这个值可以通过设置下列JVM参数来增减:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
我们使用相同的映射示例来展示,但是这次我们使用具体的ConcurrentHashMap
实现而不是ConcurrentMap
接口,所以我们可以访问这个类的所有公共方法:
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
Java8引入了三种类型的并行操作:forEach
、search
和 reduce
。这些操作中每个都以四种形式提供,接受以键、值、元素或键值对为参数的函数。
所有这些方法的第一个参数是通用的parallelismThreshold
。这一阈值表示操作并行执行时的最小集合大小。例如,如果你传入阈值500,而映射的实际大小是499,那么操作就会在单线程上串行执行。在下一个例子中,我们使用阈值1,始终强制并行执行来展示。
forEach
forEach()
方法可以并行迭代映射中的键值对。BiConsumer
以当前迭代元素的键和值调用。为了将并行执行可视化,我们向控制台打印了当前线程的名称。要注意在我这里底层的ForkJoinPool
最多使用三个线程。
map.forEach(1, (key, value) ->
System.out.printf("key: %s; value: %s; thread: %s
",
key, value, Thread.currentThread().getName()));
// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main
search
search()
方法接受BiFunction
并为当前的键值对返回一个非空的搜索结果,或者在当前迭代不匹配任何搜索条件时返回null
。只要返回了非空的结果,就不会往下搜索了。要记住ConcurrentHashMap
是无序的。搜索函数应该不依赖于映射实际的处理顺序。如果映射的多个元素都满足指定搜索函数,结果是非确定的。
String result = map.search(1, (key, value) -> {
System.out.println(Thread.currentThread().getName());
if ("foo".equals(key)) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar
下面是另一个例子,仅仅搜索映射中的值:
String result = map.searchValues(1, value -> {
System.out.println(Thread.currentThread().getName());
if (value.length() > 3) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo
reduce
reduce()
方法已经在Java 8 的数据流之中用过了,它接受两个BiFunction
类型的lambda表达式。第一个函数将每个键值对转换为任意类型的单一值。第二个函数将所有这些转换后的值组合为单一结果,并忽略所有可能的null
值。
String result = map.reduce(1,
(key, value) -> {
System.out.println("Transform: " + Thread.currentThread().getName());
return key + "=" + value;
},
(s1, s2) -> {
System.out.println("Reduce: " + Thread.currentThread().getName());
return s1 + ", " + s2;
});
System.out.println("Result: " + result);
// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar
文章来源 Github 上找到这篇文章中所有的代码示例,所以欢迎你fork这个仓库,并收藏它。
¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥
感谢三xx丰xx云提供的免费云服务器和免费虚拟主机,1C1G5M配置,安装的centos7,运行十分流畅,毫无延迟,
可以学习linux系统,yum之类,也可以把自己本地服搭配内网穿透给别人访问,真香,看视频听歌曲无压力,可以发布文件到服务给朋友随时下载*
运行起来也相当流畅,网速个人使用是真的赞,特别是延迟,通过域名可以很快查找,响应速度也特别快,存储空间也是相当可以,作为自己云盘也是无压力的
满足了基本使用,搭建一些web服务,文件云盘属于自己的博客都是极好的,欢迎大家使用
¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥