
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
当阻塞队列是空,从队列中获取元素的操作将会被阻塞,直到其他线程往空的队列插入新的元素
当阻塞队列是满,往队列里添加元素的操作将会被阻塞.直到其他线程从队列中移除一个或多元素或者完全清空队列,是队列变得空闲起来才能新增.
分类:
ArrayBlockingQueue :由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Inter.MAX_VALUE)阻塞队列
PriorityBlockingQueue:支持优先级排序的无界阻塞队列
DelayQueue:使用优先级队列实现的延迟无阻塞队里.
SynchronousQueue:不存储元素的阻塞队列,即单个元素的队列
LinkedTransferQueue:由链表组成的无界阻塞队列
LinkedBlockingDeque:右于链表构成的双向阻塞队列


同步队列的代码:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronouQueueDemo {
//SynchronousQueue没有容量
//与其他的BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue
//每一个put操作必须等待一个take操作,反之不能继续添加元素
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AAA").start();
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+ blockingQueue.take());
try {
TimeUnit.SECONDS.sleep(5);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+ blockingQueue.take());
try {
TimeUnit.SECONDS.sleep(5);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+ blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BBB").start();
}
}
lock生产一个消费一个代码:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class SharData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception {
lock.lock();
try {
//1.判断
while (number != 0) {
//等待,不生产
condition.await();
}
//2.干活
number++;
System.out.println(Thread.currentThread().getName() + " " + number);
//3.通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception {
lock.lock();
try {
//判断
while (number == 0)
condition.await();
number--;
System.out.println(Thread.currentThread().getName() + " " + number);
//3.通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
//1.线程操作资源类
public class ProduConsumer_TraditionDemo {
public static void main(String[] args) {
SharData sharData = new SharData();
new Thread(() -> {
for (int i = 0; i <= 5; i++) {
try {
sharData.increment();
}catch(Exception e){
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i <= 5; i++) {
try {
sharData.decrement();
}catch(Exception e){
e.printStackTrace();
}
}
}, "BB").start();
}
}
//多线程中的判断不要用if,用while

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareResoure {
private int number = 1;
private Lock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
//1.判断
public void print5() {
lock.lock();
try {
while (number != 1) {
c1.await();
}
//干活
for (int i = 0; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
//3.通知
number=2;
c2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//2.干活
public void print10() {
lock.lock();
try {
while (number != 2) {
c1.await();
}
//干活
for (int i = 0; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
//3.通知
number=3;
c3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//3.通知
public void print15() {
lock.lock();
try {
while (number != 3) {
c1.await();
}
//干活
for (int i = 0; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
//3.通知
number=1;
c1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class SyncAndReentrantLockDemo {
public static void main(String[] args) {
ShareResoure shareResoure=new ShareResoure();
new Thread(()->{
for(int i=1;i<=10;i++){
shareResoure.print5();
}
},"A").start();
new Thread(()->{
for(int i=1;i<=10;i++){
shareResoure.print10();
}
},"B").start();
new Thread(()->{
for(int i=1;i<=10;i++){
shareResoure.print15();
}
},"C").start();
}
}
线程池

Java 中的线程池是通过Excutor框架来实现的,该框架中用到了Excutor,Excutors,ExcutorService,ThreadPoolExecutor这几个类


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolDemo {
public static void main(String[] args) {
//cpu核数
//System.out.println(Runtime.getRuntime().availableProcessors());
// ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池5个处理线程
// ExecutorService threadPool=Executors.newSingleThreadExecutor();//一池1個處理線程
ExecutorService threadPool=Executors.newCachedThreadPool();//一池N个线程
try {
for(int i=0;i<=10;i++){
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName()+" 辦理業務");
});
try {
TimeUnit.MILLISECONDS.sleep(200);
}catch (InterruptedException e){
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}




七大参数:

七大参数描述:

拒绝请求执行的runnable的策略。
corePoolSize:


拒绝策略:

jdk内置的拒绝策略划分:



手写线程池:
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 0; i <= 5; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 辦理業務");
});
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}