Java还为线程安全提供了一些工具类,如ThreadLocal类,它代表一个线程局部变量,通过把数据放在ThreadLocal中就可以让每个线程创建一个该变量的副本,从而避免并发访问的线程的安全问题。除此之外Java 5还新增了大量的线程安全类。
一、ThreadLocal类
1.1 概述
JDK 1.2提供了一个ThreadLocal类;从Java 5以后,Java引入了泛型支持,即ThreadLocal
ThreadLocal,是Thread Local Variable(线程局部变量)的意思,也许将它命名为ThreadLocalVar更加合适。线程局部变量(ThreadLocal)其实的功用非常简单,就是为每一个使用该变量的线程都提供一个变量值的副本,使每一个线程都可以独立地改变自己的副本,而不会和其它线程的副本冲突。从线程的角度看,就好像每一个线程都完全拥有该变量。
1.2 ThreadLocal的方法
ThreadLocal类的用法非常简单,它只提供了如下三个public方法:
(1)T get():返回此线程局部变量中当前线程副本中的值。
(2)void remove():删除此线程局部变量中当前线程的值。
(3)void set(T value):设置此线程局部变量中当前线程副本中的值。
下面程序证明ThreadLocal的作用:
package section9;
class Account
{
//定义一个ThreadLocal类型的变量,该变量将是一个线程局部变量
//每个线程都会保留该剧不变量的一个副本
private ThreadLocal<String> name=new ThreadLocal<>();
//定义一个name成员变量的构造器
public Account(String str)
{
this.name.set(str);
//下面代码用于访问当前线程name副本的值
System.out.println("-----"+this.name.get());
}
public String getName()
{
return this.name.get();
}
public void setName(String str)
{
this.name.set(str);
}
}
class MyTest extends Thread
{
//定义一个Account类型的成员变量
private Account acct;
public MyTest(Account acct,String name)
{
super(name);
this.acct=acct;
}
public void run()
{
//循环10次
for(var i=0;i<10;i++)
{
//当i=6时输出将账户名替换为当前线程名
if(i==6)
{
acct.setName(getName());
}
//输出同一个账户的账户名和循环遍历
System.out.println(acct.getName()+"账户i的值:"+i);
}
}
}
public class ThreadLocalTest
{
public static void main(String[] args)
{
//启动两个线程,两个线程共享一个Account
var at=new Account("初始化名");
/*虽然两个线程共享一个账户,即只有一个账户名
*但由于账户名都是ThreadLocal类型的,所以,诶个线程都完全拥有账户名副本,
* 因此在i=6时,将看到两个线程访问同一个账户时出现不同的账户名
*/
new MyTest(at,"线程甲").start();
new MyTest(at,"线程乙").start();
}
}
由于程序中的账户名是一个ThreadLocal变量,虽然只有一个Account对象,但时两个子线程将会产生两个账户名(主线程耶持有一个账户名的副本)。两个线程进行循环时都会在i==6时将账户名改为与线程名相同,这样就可以看到两个线程拥有两个账户名的情形,如图所示:
从上面的结果可以看出,实际上账户名有三个副本,主线程一个,另外两个启动的线程各一个,它们的值互不干扰,每个线程完全拥有自己的ThreadLocal变量,这就是ThreadLocal的用途。
1.3 ThreadLocal和同步机制的区别
LocalThread和其他所有的同步机制一样,都是为了解决多线程对同一变量的访问冲突,在普通的同步机制中,通过对象加锁来实现多个线程对同一个变量的安全访问。改变量是多个线程共享的,所以要使用这种同步机制,需要很细致地分析在什么时候对变量进行读写,什么时候需要锁定某个对象,什么时候释放该对象的锁。在这种情况下,系统中并没有将这份资源复制多份,只是采用安全机制来控制对这份资源的访问而已。
ThreadLocal从另一个角度解决了多线程的并发问题,ThreadLocal将需要并发访问的资源复制多份,每个线程拥有一份资源,每个线程拥有自己的资源副本,从而也就没有必要对改变了进行同步了。ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的整个变量封装进ThreadLocal,或则把该对象相关的状态使用ThreadLocal来访问。
通常建议是:如果多个线程需要共享资源,以达到线程之间的通信功能,就使用同步机制;如果仅仅只是需要隔离多个线程之间的共享冲突,则可以使用ThreadLocal。
二、包装线程不安全的集合
前面介绍Java集合时所讲的ArrayList、LinkedList、HashSet、HashMap、TreeMap等都是线程不安全的,也就是说,当多个线程向这些集合存、取元素时,就可能会破坏这些集合的数据完整性。
如果程序有多条线程可能访问以上ArrayList、HashMap等集合,可以使用Collections提供的静态方法来把这些集合包装成线程安全的集合。Collections提供了如下几个静态方法:
(1)static
(2)static
(3)static <K,V> Map<K,V> synchronizedMap(Map<K,V> m):返回指定Map对象对应的线程安全的Map对象。
(4)static
(5)static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m):返回指定SortedMap对象对应的线程安全的SortedMap对象。
(6)static
例如需要在多线程中使用线程安全的HashMap对象,则可以采用以下代码:
//使用Collections的synchronizedMap方法将一个普通的HashMap包装成线程安全的类
HashMap m=Collections.synchronizedMap(new HashMap);
注意:如果需要把某个集合包装成线程安全的集合,则应该在创建后立即包装。
三、线程安全的集合类
实际上从Java 5开始,在java.util.concurrent包下提供了大量支持高效并发访问集合的接口和实现类,如图所示:
从上图可以看出,这些线程安全的集合类可分为两类:
(1)以Concurrent开头的集合类代表了支持并发访问的集合,它们支持多线程并发写入访问,这些写入线程的所有操作都是线程安全的,但读取操作不必锁定。
(2)以CopyOnWrite开头的集合类采用更复杂的算法来保证永远不会锁住整个集合,因此在并发写入时有较好的性能。
四、Java 9新增的发布-订阅框架
Java 9新增了一个发布-订阅框架,这个发布-订阅框架是基于异步响应流的。这个发布-订阅框架可以非常方便地处理异步线程之间的流数据交换(比如两个线程之间需要交换数据)。而且这个发布-订阅框架不需要使用数据中心来缓冲数据,同时具有非常高效的性能。
这个发布-订阅框架使用Flow类的四个静态内部接口作为核心API:
(1)Flow.Publisher:代表数据发布者、生产者。
(2)Flow.Subscriber:代表数据订阅者、消费者。
(3)Flow.Subscription:代表发布者和订阅者之间的链接纽带。订阅者通过调用该对象的request()方法来获取
(4)Flow.Processor:数据处理器,它可同时作为发布者和订阅者使用。
Flow.Publisher发布者作为生产者,负责发布数据项,并注册订阅者。Flow.Publisher接口定义了以下方法来注册订阅者:
(1)void subscribe(Flow.Subscriber<? super T> subscriber):程序调用此方法注册订阅者时,会触发订阅者的onSubscribe()方法,而Flow.Subscription对象作为参数传给该方法;如果注册失败,将会触发订阅者的onError()方法。
Flow.Subscriber接口定义了如下方法。
(1)void onSubscribe(Flow.Subscription subscription):订阅者注册时自动触发该方法。
(2)void onComplete():当订阅结束时触发发该方法。
(3)void onError(Throwable throwable):订阅失败时触发该方法。
(4)void onNext(T item):订阅者从发布者处获取数据项触发该方法,订阅者可通过该方法获取数据项。
为了处理一些通用发布者的场景,Java 9为Flow.Publisher提供了一个SubmissionPublisher实现类,它可以向当前订阅者异步提交非空的数据项,直到它被关闭。每个订阅者都能以相同的顺序接受到新提交的数据项。
程序创建SubmissionPublisher对象时,需要传入一个线程池作为支撑;该类也提供一个无参数的构造器,该构造器使用ForkJoinPool.commonPool()方法来提交发布者,以此实现发布者向订阅者提供数据项的异步特性。
下面示范了使用SubmissionPublisher作为发布者的用法:
import java.util.concurrent.Flow.*;
import java.util.*;
import java.util.concurrent.*;
/**
* Description:<br>
* 网站: <a href="http://www.crazyit.org">疯狂Java联盟</a><br>
* Copyright (C), 2001-2020, Yeeku.H.Lee<br>
* This program is protected by copyright laws.<br>
* Program Name:<br>
* Date:<br>
* @author Yeeku.H.Lee kongyeeku@163.com<br>
* @version 5.0
*/
public class PubSubTest
{
public static void main(String[] args)
{
// 创建一个SubmissionPublisher作为发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 创建订阅者
MySubscriber<String> subscriber = new MySubscriber<>();
// 注册订阅者
publisher.subscribe(subscriber);
// 发布几个数据项
System.out.println("开发发布数据...");
List.of("Java", "Kotlin", "Go", "Erlang", "Swift", "Lua")
.forEach(im -> {
// 提交数据
publisher.submit(im);
try
{
Thread.sleep(500);
}
catch (Exception ex){}
});
// 发布结束
publisher.close();
// 发布结束后,为了让发布者线程不会死亡,暂停线程
synchronized ("fkjava")
{
try
{
"fkjava".wait();
}
catch (Exception ex){}
}
}
}
// 创建订阅者
class MySubscriber<T> implements Subscriber<T>
{
// 发布者与订阅者之间的纽带
private Subscription subscription;
@Override // 订阅时触发该方法
public void onSubscribe(Subscription subscription)
{
this.subscription = subscription;
// 开始请求数据
subscription.request(1);
}
@Override // 接收到数据时触发该方法
public void onNext(T item)
{
System.out.println("获取到数据: " + item);
// 请求下一条数据
subscription.request(1);
}
@Override // 订阅出错时触发该方法
public void onError(Throwable t)
{
t.printStackTrace();
synchronized ("fkjava")
{
"fkjava".notifyAll();
}
}
@Override // 订阅结束时触发该方法
public void onComplete()
{
System.out.println("订阅结束");
synchronized ("fkjava")
{
"fkjava".notifyAll();
}
}
}