1.什么是JUC?使用JUC的目的?
并发编程中使用的工具类,Java.util.concurrent 包。
充分利用CPU的资源。
2.线程和进程
进程:一个程序,比如QQ.exe,Music.exe,也可以是程序的集合。
一个进程一般包含多个线程,至少包含一个;线程依赖于进程而存在。
线程:开了一个Typora,打字是一个线程,自动保存是另一个线程。
Java默认有几个线程?如何开启线程?Java真的可以开启线程吗?
默认有2个线程,一个是main,一个是GC。
对于Java而言,有三种方式:Thread、Runnable、Callable
不可以,底层实际上使用native方法调用了c++,Java无法直接操作硬件
程序、进程、线程的区别?
程序:就是一段代码。
进程:运行中的程序。
线程:进程里面执行程序的小单元。
联系与区别 | 线程 | 进程 |
---|---|---|
定义方面 | 线程是进程中的一个执行路径。 | 进程是程序在某个数据集合上的一次运行活动 |
角色方面 | 操作系统调度的最小单元 | 操作系统分配资源的最小单元 |
资源共享方面 | 线程共享所在进程的地址空间和其它资源。同时线程还有自己的栈和栈指针,程序计数器等寄存器。 | 进程之间不能共享资源 |
独立性方面 | 线程没有,线程必须依赖于进程而存在。 | 进程有自己独立的地址空间 |
开销方面 | 线程切换的开销小 | 进程切换的开销较大 |
并发、并行的区别和联系
并发(多个线程同时操作同一个资源):CPU是单核的,模拟出来多条线程,天下武功,唯快不破,快速交替。
并行(多个人一起行走):CPU是多核的,多个线程可以同时执行。
System.out.println(Runtime.getRuntime().availableProcessors());//打印本机的CPU内核个数
线程的状态有哪些?
public enum State {
//新生
NEW,
//运行
RUNNABLE,
//阻塞
BLOCKED,
//等待,死死的等
WAITING,
//超时等待
TIMED_WAITING,
//终止
TERMINATED;
}
wait/sleep的区别和联系?
区别 | wait | sleep |
---|---|---|
来自不同的类 | 来自Object类 | 来自Thread类 |
调用方式不同 | 对象.wait() | Thread.sleep() |
理解不同 | 对象调用该方法,访问对象的其他线程等待 | 哪个位置调用,哪个线程就等待 |
是否需要唤醒 | 需要被唤醒,用notify()/notifyAll() | 不需要唤醒,时间到了自动就醒了 |
是否会释放锁 | 会释放锁 | 抱着锁睡着了,不会释放锁 |
相同点 | ||
功能类似 | 都可以使当前的线程暂停执行,进入阻塞状态 | |
是否需要捕获异常 | 必须要 |
3.锁
传统锁:synchronized,可以加在属性和方法前面,还有代码块
lock锁
synchronized和Lock的区别
区别 | synchronized | Lock |
---|---|---|
出发点不同 | java关键字 | java类 |
是否可以获得锁的状态 | 无法判断获取锁的状态 | 使用tryLock方法可以判断是否获取到了锁 |
是否可以释放锁 | 可以自动释放锁 | 必须手动释放,如果不手动释放,会发生死锁 |
是否是公平锁 | 可重入锁,不可以中断,非公平锁 | 可重入锁,可以判断锁,可以自己设置锁的公平性 |
作用对象 | 适合锁少量的代码同步问题 | 适合锁大量的同步代码 |
公平锁:公平的,先到先得,排队
非公平锁:不公平的,可以插队(java默认使用非公平锁)
4.生产者和消费者问题
两种实现方式:
1.使用传统的synchronized
2.JUC版的
5.8锁现象
锁是什么?锁的是谁?
6.集合类不安全
List不安全
并发情况下ArrayList是不安全的,发生异常:ConcurrentModificationException
解决方案:
1.使用Vector,底层使用synchronized,效率较低
2.List<String> list = Collections.synchronizedList(new ArrayList<>());Set
3.List<String> list = new CopyOnWriteArrayList<>(); 效率较高,底层使用Lock锁
set不安全
并发情况下Set是不安全的,发生异常:ConcurrentModificationException
解决方案:
1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
2.Set<String> set = new CopyOnWriteArraySet<>();
Set的底层是什么?
public HashSet() { //Set本质就是Map,只使用了里面的Key值
map = new HashMap<>();
}
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
Map不安全
原始的:Map<String, String> map = new HashMap<>(); //默认的加载因子是0.75,初始容量是16
解决方案:
1.Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
2.Map<String, String> map = new ConcurrentHashMap<>();
7.Callable
Callable接口类似于Runnable,因为它们都是为其实例可能由另一个线程执行的类设计的。
区别 | Runnable | Callable |
---|---|---|
是否有返回值 | 没有 | 有 |
是否抛出异常 | 否 | 是 |
执行方法不同 | run方法 | call方法 |
8.常用的辅助类
8.1 CountDownLatch
- 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
//总数是6,必须要执行任务的时候再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" go out");
countDownLatch.countDown();//出去了一个人
},String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归零,然后再向下执行
System.out.println("close door");
}
}
原理:每次有线程调用countDown方法时,计数器减一;用await方法检测是否减到0,当计数器减到0时,才继续向下执行。
8.2 CyclicBarrier
- 允许一组线程全部等待彼此达到共同屏障点的同步辅助。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
//集齐7颗龙珠召唤神龙
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功");
});
for (int i = 1; i <= 7 ; i++) {
final int temp=i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"线程收集到第"+temp+"颗龙珠");
try {
cyclicBarrier.await();//等待计数器变成7
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
8.3 Semaphore
-
一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。每个release()添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象;
-
只保留可用数量的计数,并相应地执行。
-
信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。
import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class SemaphoreTest { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3);//假设有三个车位,有六辆车要进入 for (int i = 1; i <= 6; i++) { new Thread(() -> { try { semaphore.acquire();//抢到了车位 System.out.println(Thread.currentThread().getName() + "抢到了车位"); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + "离开了车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release();//释放车位 } }, String.valueOf(i)).start(); } } } 原理: semaphore.acquire();//抢到,如果资源没有了,就等待资源被释放 semaphore.release();//释放,将当前的信号量释放,然后唤醒等待的线程 作用:多个共享资源互斥的使用。用于并发限流、控制最大线程个数。
9.读写锁
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ReadWriteLock三种模式
* 读-读 可以共存
* 读-写 不能共存
* 写-写 不能共存
* 也叫独占锁(写锁),共享锁(读锁)。
*/
public class ReadWriteTest {
public static void main(String[] args) {
// MyCache myCache = new MyCache();
MyCacheLock myCache = new MyCacheLock();
for (int i = 1; i <= 6; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp + "", temp + "");
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 6; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "");
}, String.valueOf(i)).start();
}
}
}
//自定义缓存
class MyCache {
private volatile Map<String, Object> map = new HashMap();
//存,写
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + "写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入完毕");
}
//取,读
public void get(String key) {
System.out.println(Thread.currentThread().getName() + "读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完毕");
}
}
//自定义缓存
class MyCacheLock {
private volatile Map<String, Object> map = new HashMap();
//读写锁:更加细粒度的控制
ReadWriteLock readWriteLock=new ReentrantReadWriteLock();//创建一个读写锁
//存,写入的时候,只希望有一个线程能写
public void put(String key, Object value) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//取,所有人都可以读
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
10.阻塞队列
普通队列存在的读写问题
写入:如果队列满了,就不得不阻塞。
读取:如果队列是空的,也不得不阻塞。
什么情况下使用阻塞队列
多线程并发处理、线程池。
队列有哪些
队列有哪些方法
添加、移除
四组API
方式 | 抛出异常 | 不抛出异常,有返回值 | 阻塞 等待(一直等待着) | 超时等待,时间到了自动退出 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(,,) |
移除 | remove() | poll() | take() | poll(,) |
判断队首 | element() | peek() |
//抛出异常
public static void test1() {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);//队列的大小为3
System.out.println(arrayBlockingQueue.add("a"));
System.out.println(arrayBlockingQueue.add("b"));
System.out.println(arrayBlockingQueue.add("c"));
//队列满了,继续添加会抛出异常:IllegalStateException: Queue full
// System.out.println(arrayBlockingQueue.add("d"));
System.out.println(arrayBlockingQueue.element());//查看队首是谁
System.out.println("---------------------");
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
//队列空了,继续读取会抛出异常:java.util.NoSuchElementException
// System.out.println(arrayBlockingQueue.remove());
}
//不抛出异常,有返回值
public static void test2() {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);//队列的大小为3
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println(arrayBlockingQueue.offer("d"));
System.out.println("---------------------");
System.out.println(arrayBlockingQueue.peek());//查看队首是谁
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
}
//阻塞 等待(一直等待着)
public static void test3() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");
// arrayBlockingQueue.put("d");//队列没有位置了,一直阻塞着
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
// System.out.println(arrayBlockingQueue.take());//队列空了,一直阻塞
}
//超时等待
public static void test4() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.offer("a");
arrayBlockingQueue.offer("b");
arrayBlockingQueue.offer("c");
// arrayBlockingQueue.offer("d",2, TimeUnit.SECONDS); //等待超过2秒就退出
System.out.println("---------------------");
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
arrayBlockingQueue.poll(2,TimeUnit.SECONDS); //等待超过2秒就退出
}
SynchronousQueue 同步队列
没有容量,进去一个元素,就必须等待取出来之后,才能再往里面添加一个元素。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueTest {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+" take "+ blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+" take "+ blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+" take "+ blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
11.线程池
池化技术
程序运行的本质:占用系统的资源
如何优化系统资源的使用?利用池化技术。
池化技术:事先准备好一些资源,有人要用就来我这里拿,用完之后还给我。
常见的池化技术:线程池、内存池、常量池、对象池、连接池
线程池的好处:线程复用、可以控制最大并发数、管理线程
- 降低资源的消耗。
- 提高响应的速度。
- 方便管理。
线程池:三大种类(三个方法)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorTest {
public static void main(String[] args) {
// ExecutorService threadPool=Executors.newSingleThreadExecutor();//单个线程
// ExecutorService threadPool=Executors.newFixedThreadPool(3);//创建一个固定大小的线程池
ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩的,遇强则强,遇弱则弱
try {
for (int i = 1; i <= 20; i++) {
//使用了线程池技术后,使用线程池技术来创建线程,不要再用new Thread了
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();//线程池使用完毕后,一定要关闭
}
}
}
线程池:7大参数
3个方法的源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
本质上:三个方法都是调用了ThreadPoolExecutor方法,该方法有7个参数
public ThreadPoolExecutor(int corePoolSize, //核心线程池的大小
int maximumPoolSize, //最大核心线程池的大小
long keepAliveTime, //超时了,没有人调用就会释放
TimeUnit unit, // 超时单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, //线程工程,创建线程的,一般不用动
RejectedExecutionHandler handler //拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手动创建一个线程池
import java.util.concurrent.*;
public class PoolTest {
public static void main(String[] args) {
/**
* 如何定义最大线程:
* 1.CPU密集型:处理器是几核的就设置为几,可以保持CPU效率最高
* 2.IO密集型:io十分占用资源,判断系统中有多少个大型任务,一般设置为任务个数的两倍。
*/
ExecutorService threadPool= new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),//获取CPU的核数
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人进来,不处理这个人,抛出异常。
new ThreadPoolExecutor.CallerRunsPolicy()//哪里来的去哪里
// new ThreadPoolExecutor.DiscardPolicy()//队列满了,丢掉任务,不抛出异常。
// new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,尝试和第一个竞争,不抛出异常。
);
try {
//最大承载:max+Deque。如果超过最大承载,抛出异常:RejectedExecutionException:
for (int i = 1; i <= 9; i++) {
//使用了线程池技术后,使用线程池技术来创建线程,不要再用new Thread了
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();//线程池使用完毕后,一定要关闭
}
}
}
四种拒绝策略
new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人进来,不处理这个人,抛出异常。
new ThreadPoolExecutor.CallerRunsPolicy()//哪里来的去哪里
new ThreadPoolExecutor.DiscardPolicy()//队列满了,丢掉任务,不抛出异常。
new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,尝试和第一个竞争,不抛出异常。
如何定义最带线程个数
-
CPU密集型:处理器是几核的就设置为几,可以保持CPU效率最高
-
IO密集型:io操作十分占用资源,判断系统中有多少个大型任务,一般设置为任务个数的两倍。
12.四大函数式接口
传统的程序员除了掌握基础,还需要掌握注解、泛型、反射、枚举
新时代的程序员要掌握的:lambda表达式、链式编程、函数式接口、Stream流式计算...
函数式接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
定义:
1.如果一个接口只有一个抽象方法,那么该接口就是一个函数式接口。如果该接口只有一个抽象方法,但我们并没有给该接口声明 @FunctionalInterface 注解,那么编译器依旧会将该接口看作是函数式接口。
2.如果我们在某个接口上声明了 @FunctionalInterface 注解,那么编译器就会按照函数式接口的定义来要求该接口。
作用:
函数式接口的实例可以通过 lambda 表达式、方法引用或者构造方法引用来创建。
函数型接口
接口:
public interface Function<K, V> {
V apply(K var1);
}
使用:
import java.util.function.Function;
//函数型接口:有一个输入参数,有一个输出
//只要是函数型接口,都可以使用lambda表达式简化
public class FunctionTest {
public static void main(String[] args) {
// 工具类:输出输入的值
// Function<String, String> function = new Function<String, String>() {
// @Override
// public String apply(String str) {
// return str;
// }
// };
// 使用Lambda表达式简化
Function<String, String> function = (str) -> {
return str;
};
System.out.println(function.apply("hello"));
}
}
断定型接口
接口:
@FunctionalInterface
public interface Predicate<T> {
boolean test(T t);
default Predicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}
default Predicate<T> negate() {
return (t) -> !test(t);
}
default Predicate<T> or(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}
static <T> Predicate<T> isEqual(Object targetRef) {
return (null == targetRef)
? Objects::isNull
: object -> targetRef.equals(object);
}
}
使用:
import java.util.function.Predicate;
//断定型接口:有一个输入参数,返回值只能是布尔值
public class PredicateTest {
public static void main(String[] args) {
//判断一个字符串是否为空
// Predicate<String> predicate = new Predicate<String>() {
// @Override
// public boolean test(String str) {
// return str.isEmpty();
// }
// };
// 使用Lambda表达式简化
Predicate<String> predicate=(str)->{return str.isEmpty();};
System.out.println(predicate.test("hello"));
}
}
消费型接口
接口:
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}
使用:
import java.util.function.Consumer;
//消费型接口:只有输入,没有返回值
public class ConsumerTest {
public static void main(String[] args) {
//打印一个字符串
// Consumer<String> consumer = new Consumer<String>() {
// @Override
// public void accept(String str) {
// System.out.println(str);
// }
// };
//使用lambda表达式简化
Consumer<String> consumer=(str)->{
System.out.println(str);
};
consumer.accept("hello");
}
}
供给型接口
接口:
@FunctionalInterface
public interface Supplier<T> {
T get();
}
使用:
import java.util.function.Supplier;
//供给型接口:没有输入参数,只有返回值
public class SupplierTest {
public static void main(String[] args) {
//返回一个Integer
// Supplier<Integer> supplier=new Supplier<Integer>() {
// @Override
// public Integer get() {
// return 1024;
// }
// };
//使用lambda表达式简化
Supplier<Integer> supplier=()->{
return 1024;
};
System.out.println(supplier.get());
}
}
13.Stream流式计算
什么是Stream流式计算?
大数据:存储+计算
存储:使用集合、数据库等工具。
计算:使用流来操作
import java.util.Arrays;
import java.util.List;
/**
* 题目要求:一分钟内完成这个题目,只能用一行代码实现
* 现在有5个用户,晒选:
* 1.ID必须是偶数;
* 2.年龄必须大于23岁;
* 3.用户名转为大写字母;
* 4.用户名字母倒着排序;
* 5.只输出一个用户;
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1, "a", 21);
User u2 = new User(2, "b", 22);
User u3 = new User(3, "c", 23);
User u4 = new User(4, "d", 24);
User u5 = new User(5, "e", 25);
User u6 = new User(6, "f", 26);
//存储:使用集合
List<User> list = Arrays.asList(u1, u2, u3, u4, u5, u6);
//计算:交给Stream流来操作
list.stream().filter(u -> {
return u.getId() % 2 == 0;
}).filter(u -> {
return u.getAge() > 23;
}).map(u -> {
return u.getName().toUpperCase();
}).sorted((uu1, uu2) -> {
return uu2.compareTo(uu1);
}).limit(1).forEach(System.out::println);
}
}
14.ForkJoin
什么是ForkJoin
ForkJoin是在JDK1.7出现的,主要是并行执行任务,在大数据量时提高效率。
ForkJoin特点:工作窃取。底层通过双端队列实现,先完成的线程可以帮助其他的线程做部分工作。
import java.util.concurrent.RecursiveTask;
/**
* 如何使用ForkJoin
* 1.ForkJoinPool 用它来执行
* 2.计算任务 ForkJoinPool.execute(ForkJoinTask task)
* 3.计算类要继承ForkJoinTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp=1000_0000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
if((end-start)<temp){
Long sum=0L;
for (Long i = start; i <= end ; i++) {
sum+=i;
}
return sum;
}else { //ForkJoin 类似递归
Long middle=(start+end)/2;
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork();//拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
task2.fork();
return task1.join()+task2.join();
}
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
* 求和计算
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}
//传统的计算方式
public static void test1() {
long sum = 0L;
long start = System.currentTimeMillis();
for (long i = 1L; i <= 10_0000_0000L; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + ",花费时间为:" + (end - start));
}
//使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L); //任务
ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务
long sum=submit.get();//返回结果
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + ",花费时间为:" + (end - start));
}
//使用Stream流
public static void test3() {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + ",花费时间为:" + (end - start));
}
}
15.异步回调
Future 设计的初衷:对将来的某个事件的结果进行建模。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 异步调用:CompletableFuture
* 异步执行:成功回调、失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1();
test2();
}
//没有返回值的runAsync 异步回调
public static void test1() throws ExecutionException, InterruptedException {
//发起一个请求
CompletableFuture<Void> completedFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+": runAsync=>Void");
});
System.out.println("hello");
completedFuture.get();//获取执行的结果
}
//有返回值的 异步回调
public static void test2() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+": supplyAsync=>Integer");
int i=10/0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
System.out.println("t=>" + t); //正常的返回结果
System.out.println("u=>" + u);//错误信息:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233;//可以获取到产生错误的返回值
}).get());
}
}
16.JMM
谈谈对volatile的理解
volatile是java虚拟机提供的轻量级的同步机制。
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是JMM
定义:Java内存模型。不存在的东西,只是一个概念。
关于JMM的一些约定
- 线程解锁前,必须把共享变量立刻刷回主存。
- 线程加锁前,必须读取主存中的最新值到工作内存中。
- 加锁和解锁是同一把锁。
内存交互操作
内存交互操作
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可再分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则: - 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
17.volatile关键字
1.保证可见性
import java.util.concurrent.TimeUnit;
public class JMMDemo {
//不加volatile,程序会发生死循环,加了volatile,保证可见性
// private static int number=0;
private volatile static int number=0;
public static void main(String[] args) {
new Thread(()->{
while (number==0){}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
number=1;
System.out.println(number);
}
}
2.不保证原子性
原子性:不可分割
多线程中,线程A在执行任务的时候,不能被其他线程打扰。要么同时成功,要么同时失败。
public class VDemo02 {
// volatile 不保证原子性
private static int number=0;
public static void add(){
number++;//该操作不是一个原子性操作
}
public static void main(String[] args) {
//理论上,计算结果应该是10_0000
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 5000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){ //GC、main
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+" "+number);
}
}
如果不使用synchronized和lock锁,如何保证原子性?
使用原子类解决原子性问题。
private static AtomicInteger number=new AtomicInteger();
public static void add(){
number.getAndIncrement();//+1操作 使用了CAS原理
}
原子类Atomic的底层都和操作系统挂钩,在内存中修改值。Unsafe类是一个很特殊的存在。
3.指令重排
定义:你写的程序,可能并不是按照你写得那样去执行的。
源代码-->编译器优化的重排-->指令并行也可能会重排-->内存系统也可能会重排-->执行。
处理器在进行指令重排的时候,会考虑:数据之间的依赖性。
inx x=1;//1
int y=2;//2
x=x+3;//3
y=x*x;//4
我们希望该程序执行的过程是1234,但是可能在执行的时候会变成2134或1324
为什么volatile可以禁止指令重排?
内存屏障。其作用有:
- 保证特定的操作的执行顺序。
- 保证某些变量的内存可见性。
18.彻底玩转单例模式
饿汉式
//饿汉式单例
public class Hungry {
private Hungry(){} //构造器私有
private final static Hungry HUNGRY=new Hungry();
public static Hungry getInstance(){
return HUNGRY;
}
}
懒汉式
//懒汉式单例
public class LazyMan01 {
private LazyMan01(){
System.out.println(Thread.currentThread().getName()+" ok");
}
private static LazyMan01 LazyMan;
public static LazyMan01 getInstance(){
if(LazyMan==null){
LazyMan=new LazyMan01();
}
return LazyMan;
}
//单线程下是没有问题的,但是多线程下会有问题
public static void main(String[] args) {
for (int i = 1; i <=100 ; i++) {
new Thread(()->{
LazyMan.getInstance();
}).start();
}
}
}
懒汉式DCL
//懒汉式单例 DCL
public class LazyMan02 {
private LazyMan02(){
//此处加锁之后就不会再被反射破坏了
synchronized (LazyMan02.class){
throw new RuntimeException("不要试图使用反射破坏DCL");
}
}
private volatile static LazyMan02 LazyMan02;//此处volatile是new对象的时候发生指令重排
public static LazyMan02 getInstance(){
//双重锁检验机制,简称DCL懒汉式
if(LazyMan02==null){
synchronized (LazyMan02.class){
LazyMan02=new LazyMan02();
//new对象的过程不是一个原子性操作 1.分配内存空间;2.执行构造方法,初始化对象;3.把这个对象指向这个内存空间
}
}
return LazyMan02;
}
public static void main(String[] args) throws Exception {
// LazyMan02 instance1=LazyMan02.getInstance();
Constructor<LazyMan02> declaredConstructor = LazyMan02.class.getDeclaredConstructor();
declaredConstructor.setAccessible(true);
//使用反射创建对象
LazyMan02 instance1=declaredConstructor.newInstance();
LazyMan02 instance2=declaredConstructor.newInstance();
System.out.println(instance1.equals(instance2));
}
}
静态内部类
//静态内部类
public class Holder {
private Holder(){}
public static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER=new Holder();
}
//但是依然可以使用反射破解
}
枚举
import java.lang.reflect.Constructor;
public enum EnumSingleTon {
INSTANCE;
public EnumSingleTon getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws Exception {
EnumSingleTon instance1 = EnumSingleTon.INSTANCE;
Constructor<EnumSingleTon> declaredConstructor = EnumSingleTon.class.getDeclaredConstructor(String.class,int.class);
//注意此处的构造器是有参构造器
declaredConstructor.setAccessible(true);
EnumSingleTon instance2 = declaredConstructor.newInstance();
System.out.println(instance1.equals(instance2));
}
}
19.深入理解CAS
CAS是CPU的并发原语
import java.util.concurrent.atomic.AtomicInteger;
//CAS:compareAndSet 比较并交换
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//期望、更新:如果我期望的值达到了,那么就更新;否则就不更新。CAS是CPU原语。
System.out.println(atomicInteger.compareAndSet(2020,2021));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2020,2021));
System.out.println(atomicInteger.get());
}
}
CSA底层使用Unsafe类
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);//获取内存地址中的值
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));//内存操作,效率很高
return var5;
}
//使用了自旋锁
CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作;否则就一直循环。使用了自旋锁。
CAS的缺点:
- 循环会耗时。
- 一次只能保证一个共享变量的原子性。
- ABA问题。
CAS中存在的ABA问题
import java.util.concurrent.atomic.AtomicInteger;
public class ABATest {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//期望、更新:如果我期望的值达到了,那么就更新;否则就不更新。CAS是CPU原语。
//捣乱的线程
System.out.println(atomicInteger.compareAndSet(2020,2021));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021,2020));
System.out.println(atomicInteger.get());
//两次CAS操作后值又变回了期望的值。
System.out.println(atomicInteger.compareAndSet(2020,666));
System.out.println(atomicInteger.get());
}
}
20.原子引用解决ABA问题
带版本号的原子操作,可以解决ABA问题
包装类Integer的一个大坑
Integer使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf()获取对象实例,而不是使用new,因为valueOf使用缓存,而new一定会创建新的对象分配到新的内存空间。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
public class CASReferenceTest {
public static void main(String[] args) {
//注意:如果泛型是一个包装类,注意对象的引用问题
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
new Thread(()->{
int stamp = atomicStampedReference.getStamp(); //获得版本号
System.out.println("a1->"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(1,2,
atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("a2->"+atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(2,1,
atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("a3->"+atomicStampedReference.getStamp());
},"a").start();
new Thread(()->{
int stamp = atomicStampedReference.getStamp();//获得版本号
System.out.println("b1->"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(1,666,stamp,stamp+1);
System.out.println("b2->"+atomicStampedReference.getStamp());
},"b").start();
}
}
21.各种锁的理解
公平锁、非公平锁
公平锁:公平的,需要排队,先到先得。
非公平锁:不公平的,可以插队,默认的。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁
也叫可重入锁,拿到外面的锁之后,自动获得内部的锁。
synchronized实现
public class SynchronizedLock {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sendMessage();
},"A").start();
new Thread(()->{
phone.sendMessage();
},"B").start();
}
}
class Phone{
public synchronized void sendMessage(){
System.out.println(Thread.currentThread().getName()+"->sendMessage");
call();//这里也加了锁
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"->call");
}
}
ReentraLock实现
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentraLockDemo {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(() -> {
phone.sendMessage();
}, "A").start();
new Thread(() -> {
phone.sendMessage();
}, "B").start();
}
}
class Phone2 {
Lock lock = new ReentrantLock();
public void sendMessage() {
lock.lock();//细节问题:lock.lock();lock.unlock(); 锁必须要成对出现,否则发生死锁
try {
System.out.println(Thread.currentThread().getName() + "->sendMessage");
call();//这里也加了锁
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void call() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "->call");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
自旋锁 spinLock,CAS就是使用了自旋锁
不断的尝试,直到成功为止。
public class SpinLockDemo {
public static void main(String[] args) throws InterruptedException {
SpinLockDemo lock = new SpinLockDemo();
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnLock();
}
},"T1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnLock();
}
},"T2").start();
}
AtomicReference<Thread> atomicReference = new AtomicReference<>();
//加锁
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "=>myLock");
//自旋锁
while (!atomicReference.compareAndSet(null, thread)) {
}
}
//解锁
public void myUnLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "=>myUnLock");
atomicReference.compareAndSet(thread, null);
}
}
22.死锁
import java.util.concurrent.TimeUnit;
public class DeadLockDemo {
public static void main(String[] args) {
String locaA="lockA";
String locaB="lockB";
new Thread(new MyThread(locaA,locaB),"T1").start();
new Thread(new MyThread(locaB,locaA),"T2").start();
}
}
class MyThread implements Runnable {
private String lockA;//两个对象
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "lock:" + lockA + "=>get" + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "lock:" + lockB + "=>get" + lockA);
}
}
}
}
java里面如何解决死锁?
- 使用 jps -l 命令查看进程号。
- 使用【jstack 进程号】 查看死锁信息
- 看日志、堆栈信息等