线程
一、线程创建的3种方式
1、继承Thread 类
public class Thread implements Runnable
重写run方法即可
public class ThreadTest extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("继承Thread方法实现多线程");
}
}
//调用
ThreadTest th1=new ThreadTest();
th1.start();
2、实现Runnable接口
本质上和继承Thread类一样,因为Thread类也实现了Rannable接口
public class ThreadTest1 implements Runnable{
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("实现Runable的线程开启方式");
}
}
3、实现Callable接口并返回值
public class ThreadTest2 implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "hello";
}
}
//调用
Callable<String> callable = new ThreadTest2();
FutureTask<String> task=new FutureTask<>(callable);
Thread th4 = new Thread(task);
th4.start();
//task.get() 获取返回值
//这个get方法可能会产生阻塞
System.out.println("从子线程中获取:"+task.get());
分析:FutureTask 实现RunnableFuture,RunnableFuture继承了Runnable接口
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
二、线程池
//ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService threadPool =Executors.newFixedThreadPool(5);//创建固定的线程池大小
//ExecutorService threadPool =Executors.newCachedThreadPool();//可伸缩的线程池
/**
* 最大线程该如何定义
*
* 1. CPU密集型,几核CPU就定义为几,可以保证CPU效率最高 Runtime.getRuntime().availableProcessors()
* 2. IO 密集型 判断你程序中十分消耗IO的线程数
*/
System.out.println("CPU核心数:"+Runtime.getRuntime().availableProcessors());
//自定义线程池 工作中,使用ThreadPoolExecutor 自己创建
ExecutorService threadPool=new ThreadPoolExecutor(2,//核心线程池大小
5,//最大线程池大小
3,//非核心线程闲置存活时间
TimeUnit.SECONDS,//超时单位
new LinkedBlockingQueue<>(5),//阻塞队列
Executors.defaultThreadFactory(),//线程工厂,创建线程,一般不修改
new ThreadPoolExecutor.AbortPolicy()//拒绝策略:对新来的线程不做处理,抛出异常
);
//阻塞队列存放个数+最大线程池个数=同时允许使用线程池的个数
for(int i=0;i<10;i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
//线程池使用完,程序结束,要关闭线程池
threadPool.shutdown();
拒绝策略
AbortPolicy()//对新来的线程不做处理,抛出异常
CallerRunsPolicy()//哪来的回哪里,用线程池的线程处理
DiscardPolicy()//队列满了,丢弃当前任务,不会抛出异常
DiscardOldestPolicay()//队列满了,把队列首个poll丢出,自己加入队列,不抛出异常 e.getQueue().poll();
并发
1、synchronized
非静态方法上
锁的对象是 该方法的类的实例
静态方法上
锁的对象是类的字节码 如:Person.class
同步代码块
锁的对象是自己定义的
2、ReentrantLock
创建锁
Lock lock = new ReentrantLock();
加锁
lock.lock();
解锁
lock.unlock();
3、CAS
java.util.concurrent.atomic
//CAS compareAndSet() 比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//如果我期望的值达到了,就更新
atomicInteger.compareAndSet(2020,2021);
System.out.println(atomicInteger.get());
}
线程之间的通信
等待唤醒 synchronized
public synchronized void incrnumber() throws InterruptedException {
while (number!=0){//不要用if判断,应该用while防止虚假唤醒
//等待
this.wait();
}
number ++;
System.out.println(Thread.currentThread().getName()+"number="+number);
//通知其他线程
this.notifyAll();
}
等待唤醒 ReentrantLock
private int number=0;
final Lock lock=new ReentrantLock();
final Condition conditionA = lock.newCondition();
final Condition conditionB = lock.newCondition();
//condition.wait();//等待
//condition.signalAll();//唤醒全部
//+1
public void incrnumber() throws InterruptedException {
//1、加锁
lock.lock();
try{
while (number!=0){//不要用if判断,应该用while防止虚假唤醒
// A 等待
conditionA.await();
}
number ++;
System.out.println(Thread.currentThread().getName()+" number="+number);
//唤醒 B
conditionB.signal();
}catch (Exception e){}finally {
//解锁
lock.unlock();
}
}
//-1
public void decrnumber() throws InterruptedException {
//加锁
lock.lock();
try{
while(number==0){
//B 等待
conditionB.await();
}
number-- ;
System.out.println(Thread.currentThread().getName()+" number="+number);
//唤醒 A
conditionA.signal();
}catch (Exception e){}
finally {
//解锁
lock.unlock();
}
}
减法计数器CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i=0;i<6;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"---go---out");
//数量 减1
countDownLatch.countDown();
},String.valueOf(i)).start();
}
//等待计数器归零 再向下执行
countDownLatch.await();
System.out.println("执行 完成 6");
加法计数器CyclicBarrier
/**
* 集齐7颗龙珠召唤神龙
*/
//计数,线程
//如果收集不齐,会阻塞
CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功");
});
for(int i=0;i<7;i++){
new Thread(()->{
//这里面如果要拿外面的变量,需要拿 final 类型变量
System.out.println(Thread.currentThread().getName()+":收集龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
读写锁ReadWriteLock
/**
* 独占锁:写锁 一次只能被一个线程占有
* 共享锁:读锁 可以被多个线程拥有
*
* 读-读:共存
* 读-写:不能共存
* 写-写:不能共存
*/
class MyCacheLock{
private volatile Map<String,Object> map=new HashMap<>();
//读写锁
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存 写入的时候只希望同时只有一个线程 写
public void put(String key,Object value){
//写锁
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入");
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
readWriteLock.writeLock().unlock();
}
}
//取 所有人都可以读
public void get(String key){
//读锁
readWriteLock.readLock().lock();
try {
map.get(key);
System.out.println(Thread.currentThread().getName()+"读取");
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
readWriteLock.readLock().unlock();
}
}
}
信号量Semaphore
Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。
Semaphore的主要方法摘要:
void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void release():释放一个许可,将其返回给信号量。
int availablePermits():返回此信号量中当前可用的许可数。
boolean hasQueuedThreads():查询是否有线程正在等待获取。
/**
* 抢车位
* 如果已经满了,一直等待直到释放为止
*/
//作用:多个共享资源互斥的使用 并发限流,控制最大线程数
public static void main(String[] args) {
//线程数量 有3个停车位
final Semaphore semaphore = new Semaphore(3);
for (int i=0;i<6;i++){
new Thread(()->{
//得到/获取 semaphore.acquire();
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
//停两秒钟
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//离开车位
semaphore.release();
}
//释放 semaphore.release();
},String.valueOf("线程"+i)).start();
}
}
并发集合
1、List
//并发下arraylist是不安全的
// List<String> list = new ArrayList<>();
//1、解决方案 1
//List<String> list = new Vector<>();
//解决方案 2
// List<String> list =Collections.synchronizedList(new ArrayList<>());
//解决方案 3 写入时复制COW 计算机程序设计领域的一种优化策略
//CopyOnWriteArrayList 比Vector好,Vector有syncthronized,效率会降低
List<String> list=new CopyOnWriteArrayList<>();
for(int i=0;i<10;i++){
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
2、Set
//Set集合在多线程下 也是不安全的
// Set<String> set = new HashSet<>();
//1解决方案
Set<String> set= Collections.synchronizedSet(new HashSet<>());
//2解决方案
// Set<String> set=new CopyOnWriteArraySet<>();
for(int i=0;i<30;i++){
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
3、Map
//Map集合在多线程下 也是不安全的
Map<String,String> map = new HashMap<>();
//1解决方案
//Map<String,String> map= Collections.synchronizedMap(new HashMap<>());
//2解决方案
//Map<String,String> map=new ConcurrentHashMap<>();
for(int i=0;i<30;i++){
new Thread(()->{
map.put(UUID.randomUUID().toString().substring(0,5),"AAAA");
System.out.println(map);
},String.valueOf(i)).start();
}
BlockingQueue
ArrayBlockingQueue
public static void main(String[] args) {
//collection
//List
//Set
//BlockingQueue 不是新的东西
/**
* 什么情况下使用 阻塞队列
* 多线程:线程池,多线程并发处理 A 调用 B
*
*/
}
//抛出异常
@Test
public void test1(){
//要设置队列的大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.add("a"));
System.out.println(arrayBlockingQueue.add("b"));
System.out.println(arrayBlockingQueue.add("c"));
//java.lang.IllegalStateException: Queue full
//超过大小会抛出异常
//System.out.println(arrayBlockingQueue.add("c"));
//移除元素并返回 移除的元素
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
//java.util.NoSuchElementException
//没有元素后移除会抛出异常
//System.out.println(arrayBlockingQueue.remove());
}
/**
* 不抛出异常
*/
@Test
public void test2(){
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
//element()监测 队列首个元素,要抛出异常
//arrayBlockingQueue.element();
//peek监测 队列首个元素 没有返回null 不抛出异常
//System.out.println(arrayBlockingQueue.peek());
//存值 offer
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
//满了 不抛出异常,返回 Boolean 值
System.out.println(arrayBlockingQueue.offer("d"));
// element() 返回队列首个元素值,不做移除
System.out.println(arrayBlockingQueue.element());
System.out.println(arrayBlockingQueue.peek());
//移除 poll 返回移除的值
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
//若移除空了 没有值返回null
System.out.println(arrayBlockingQueue.poll());
}
/**
* 等待,阻塞(一直阻塞)
*/
public void test3() throws InterruptedException {
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
//存值 put
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");
//队列没有位置了,会一直等待 阻塞
//arrayBlockingQueue.put("d");
//取值 take()
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
//如果没有元素 会一直阻塞
System.out.println(arrayBlockingQueue.take());
}
/**
* 等待,阻塞(等待超时)
*/
@Test
public void test4() throws InterruptedException {
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
//存 offer
arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
//队列满了等待2秒 等待超过两秒就退出
arrayBlockingQueue.offer("a",2, TimeUnit.SECONDS);
//取 poll
arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
//等待超过两秒 队列中还没有值就 超时退出
arrayBlockingQueue.poll(2,TimeUnit.SECONDS);
}
总结:
1、add() remove() 添加 移除,当添加时超过大小,移除时没有元素时抛出异常
2、offer() poll() 存值、取值,存值时空间不够返回false,取值时没有时返回Null
3、put() take() 存值、取值 ,存值时空间不够会阻塞,取值时没有元素会一直阻塞
4、offer() pool(),设置阻塞等待的时间,超过时间没有存值或取值就放弃等待退出
SynchronousQueue
/**
* 同步队列
* 和其他的BlockingQueue 不一样 ,SynchronousQueue不存元素
* put 了一个元素,必须从里面先take取出来,否则不能在put进去值
*/
public class SynchronousQueueDemo {
@Test
public void Test1() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
BlockingQueue<String> queue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
queue.put("3");
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
},"t1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"take"+queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"take"+queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"take"+queue.take());
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
},"t2").start();
countDownLatch.await();
}
}