===============================================================================================
基于进程的服务:
系统初始化线程:
负责进程池,监控,安全,日志,进程心跳检测机制,定期检测服务是否僵死
业务进程服务:
具体的业务逻辑执行
================================================================================================
线程池:类似与数据库连接池(来回切换,可能中断CPU的通信总线)
任务队列:对请求可以高效的处理---高效的任务队列 一种缓存机制
执行策略:失败重试、取消的中断策略、异步获取结果
并发:信号量机制对资源进行同步控制
编程模型:
==================================================================================================
1、指令重排序---只要保证程序执行的结果一直,计算机可以对指令进行重排序,可以最好的提高CPU的执行效率
2、Happens-before原则:一个执行完毕,另一个看到结果再执行
3、对象都有一个监视器,同时监视器附带一个锁
4、volatile:一致性(无锁的一致性)
5、锁导致通信总线的阻塞,最好的方式其实是比较重试(自选锁-死循环)
6、CAS机制(无阻塞机制)--通过JNI,借助CPU的指令来执行,synchronized会因为争取锁而阻塞
7、AQS一个JDK中JUC里面最为核心的类AbstractQueuedSynchronizer-同步状态计数器64位,LockSupport挂起线程的方法 CHL队列机制保证有序
-
tryAcquire(int)试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。此方法总是由执行 acquire 的线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。也就是说此方法是一种尝试性方法,如果成功获取锁那最好,如果没有成功也没有关系,直接返回false。
tryRelease(int)试图设置状态来反映独占模式下的一个释放。 此方法总是由正在执行释放的线程调用。释放锁可能失败或者抛出异常,这个在后面会具体分析。tryAcquireShared(int) 试图在共享模式下获取对象状态。tryReleaseShared(int) 试图设置状态来反映共享模式下的一个释放。isHeldExclusively() 如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回true。
可重入锁:就是可以立即获取或者获取失败,不会阻塞
自旋锁:非阻塞
==================================================================================================
下面介绍一些线程编程例子:
1.有一个任务(有三个阶段-前期准备,任务完成,后期检查),要求多个工作者参与共同完成,每个阶段必须所有的工作者完成后才可以进行下一个阶段,三个阶段都完成,总部(一个特殊的工作者)完成后期总结。
CyclicBarrier:故障点,可以重复使用---await(串并转换--分解合并分解合并-fork、join)类似与map、reduce
static class SummaryService {
private Random random;
SummaryService(Random random) {
this.random = random;
}
void doService(String name, String phase) {
System.out
.println("-----------------------------------------------");
System.out.println("任务阶段:" + phase + ",当期工作者:" + name + ",正在汇总数据");
try {
Thread.currentThread().sleep(random.nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务阶段:" + phase + ",当期工作者:" + name + ",汇总数据完成");
System.out
.println("-----------------------------------------------");
}
}
static class DataTask implements Runnable {
private String name;
private CyclicBarrier barrier;
private SummaryService service;
DataTask(String name, CyclicBarrier barrier, SummaryService service) {
this.name = name;
this.barrier = barrier;
this.service = service;
}
@Override
public void run() {
service.doService(name, "前期任务准备");
System.out.println("体系单位:[" + name
+ "]汇总[前期任务准备都已完成],等待进入[完成任务阶段],未完成前期的单位还有:"
+ barrier.getNumberWaiting() + "个");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
service.doService(name, "完成任务");
System.out.println("体系单位:[" + name
+ "]汇总[完成任务工作都已完成],等待进入[监控任务阶段],未完成前期的单位还有:"
+ barrier.getNumberWaiting() + "个");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
service.doService(name, "监控任务阶段");
System.out.println("体系单位:[" + name + "]汇总[监控任务阶段都已完成]"
+ barrier.getNumberWaiting() + "个");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException,
IOException {
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("总部汇总工作!!!");
}
});
SummaryService service = new SummaryService(new Random());
new Thread(new DataTask("A单位", barrier, service)).start();
new Thread(new DataTask("B单位", barrier, service)).start();
new Thread(new DataTask("C单位", barrier, service)).start();
}
2.CountDownLatch --await(等待所有的任务完成),countDown(某一个任务已经完成)-不可重复使用
static class CTask implements Runnable {
private String name;
private CountDownLatch countDownLatch;
CTask(String name,CountDownLatch countDownLatch) {
this.name = name;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("工作者:" + name + "处理任务....");
countDownLatch.countDown();
}
}
public static void main(String[] args) throws InterruptedException,
IOException {
final CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(new CTask("小云", countDownLatch )).start();
new Thread(new CTask("小航", countDownLatch )).start();
new Thread(new CTask("小月", countDownLatch )).start();
countDownLatch.await();
System.out.println("任务都已完成!!");
}
3.exchanger-交换器(分片思想)--两个线程交换数据
public static void main(String[] args) throws InterruptedException,
IOException {
final Exchanger exchanger = new Exchanger();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.currentThread().sleep(1000);
System.out.println("换出数据:A,换回的数据:" + exchanger.exchange("A") );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("换出数据:B,换回的数据:" + exchanger.exchange("B") );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
4.Semaphore(申请许可方可运行线程,负责线程处于挂起状态)--运行线程执行前申请许可,方可运行。
static class SemaphoreTask implements Runnable {
String name;
Semaphore semaphore ;
public SemaphoreTask(String name,Semaphore semaphore) {
this.name = name;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(name + "获取锁....");
System.out.println(name + " do task .....");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
semaphore.release();
System.out.println(name + "释放锁");
}
}
}
public static void main(String[] args) throws InterruptedException,
IOException {
Semaphore semaphore = new Semaphore(3);
for(int i = 0 ; i < 10 ; i ++ ){
new Thread(new SemaphoreTask("task" + i , semaphore)).start();
}
}
发出中断请求,停止线程运行--本质就是在任务中能够处理中中断请求
public static void main(String[] args) throws InterruptedException,
IOException {
Thread tha = new Thread(new Task_Service_A());
tha.start();
Thread.sleep(3000);
tha.interrupt();//tha发出中断请求命令
System.out.println("结束");
}
static class Task_Service_A implements Runnable {
MyService myService = new MyService();
@Override
public void run() {
try {
myService.doWork();
}catch(InterruptedException e ) {
System.out.println("响应中断请求");
}
}
}
static class MyService {
void doWork() throws InterruptedException{
System.out.println("do work....");
int seed = new Random().nextInt(10);
System.out.println(seed);
if(seed == 8 ){
System.out.println("耗时操作需要取消");
throw new InterruptedException();//业务方法执行期间肯可能会出现异常
}else {
System.out.println("非耗时操作");
}
}
}
编程模型:
Sync:隐式的锁,不可中断在申请期间
Lock lock = new ReentrantLock();//显示锁,可以中断 Condition condition = lock.newCondition(); condition.signal(); condition.await(); lock.tryLock();//自旋(未加锁,死循环)-可以取消 lock.unlock(); lock.lockInterruptibly();//可以响应中断取消 ReentrantReadWriteLock lock2 = new ReentrantReadWriteLock(); lock2.readLock().lock(); lock2.readLock().unlock(); lock2.writeLock().lock(); lock2.writeLock().unlock();
LockSupport.park(); //禁用当前线程
5.Lock,Condition-对象监视器的锁与锁资源的通信协调机制
每个共享资源都是一个对象,而对象就有一个默认的监视器,监视器默认有一个锁对象,多个线程对于这个共享资源(对象)的操作都是原子的,所以首先要获取对象的锁,之后进入资源的临界区,操作资源,其他的线程对象,则处在资源对应的队列上边等待,对资源可以有各种操作,可以说是一种谓词,一个资源对象可以对应多个谓词,同时也就存在多个队列(存储等到此资源谓词的操作),所以,一个资源上有一个锁,一个锁上可以有多个Condition(多路等待)。
===========================================================================================
1.线程状态(运行,等待,睡眠,结束)
2.线程协作
3.线程通信
4.线程的中断机制(其他的线程可以发送指令,终止另一个线程------阻塞或者运行中(运行线程的run业务可以响应中断指令即可))
5.线程的同步原语
6.编程模型