线程安全的并发集合
java.util.concurrent包
ConcurrentHashMap ConcurrentLinkedDeque
CopyOnWriteArrayList 读取不会加锁,写入操作也不会阻塞读取,只是写入之间要实现同步,它的原理是当执行写入操作时,进行一次自我复制,把修改的数据写入副本,再用把副本和原来的数据替换。
它内部的array数组不会被修改,但是在写操作中被替换呈修改过后的副本。
注意一点,如果想要获取这些并发集合的大小,只能通过遍历的方法,或者用mappingCount方法确认。
AbstractQueuedSynchronizer 一个非常重要的基类
三个主要方向的方法:
tryAcquire
tryAcquireShared
getQueuedThreads
内部用的是链式列表,双端队列
ThreadLocal 的内存泄露问题
引用链问题,用过元素后,要使用remove移除掉
并发集合的批处理:搜索,归约,迭代。
LockSupport类
定义了一组公共的静态方法,这些方法提供了最基本的线程阻塞和唤醒功能。
阻塞的方法以park为开头,唤醒的方法以unpark为开头。
它采用一种类似信号量机制,与Thread.suspend相比,弥补了由于resume在之前发生,导致线程无法继续执行;和wait相比,不需要获得某个对象的锁,也不会抛出中断异常。
原子类
java.util.concurrent.atomic包
原子更新基本类型,原子更新数组,原子更新引用,原子更新属性
AtomicBoolean AtomicInteger AtomicLong
并发的工具类
等待多线程完成的CountDownLatch
CountDownLatch的构造器需要传入一个N的整数,大于0
CountDownLatch的对象调用countDown方法会使得N减去1 ;调用对象的 await 方法会阻塞当前线程,直到N的个数为0
同步屏障CyclicBarrier
让一组线程到达一个屏障时被阻塞,屏障也可称为同步点,直到最后一个线程到达同步点时,屏障才会开门。
默认的构造函数要传入一个整数N,代表有多少个线程需要到达屏障
调用CyclicBarrier对象的await方法,告知CyclicBarrier对象已经到达同步点。
高级的构造函数,需要传入一个N和一个Runnable对象,用于线程到达屏障时,优先执行这个
Runnable对象对象。
可以和其他的并发工具搭配,用于处理数据的计算。
控制并发线程数的Semaphore
Semaphore信号量 用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
流量控制
构造器传入的数量N,代表进入特定资源的线程个数
特定的资源前后要用对象的acquire release方法获取信号量和释放信号量
线程间交互数据的Exchanger
它提供一个同步点,在这个同步点,两个线程可以交互彼此的数据
通过exchange方法交换数据
如果一个线程先执行了该方法,那么它会一直等待第二个线程也只想该方法。
可以用于遗传算法 银行流水的校对工作
构造函数传入的类型是 交互的数据类型
exchange方法要传入交互的数据,同时返回另一个线程提供的数据
阻塞队列
对于多线程问题,如果是生产者和消费者问题,可以使用阻塞队列替代同步方法,使得数据同步。
生产者向队尾添加元素,消费者从队头取出元素,当队列满时添加元素,或队列空时取出元素,阻塞队列导致线程阻塞。
add,remove,element在队列满或空时操作会抛出异常,所以不要用这些方法。
而是阻塞队列当管理工具使用采用put take方法,多线程采用 offer poll peek方法。
操作 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add | offer | put | offer |
移除方法 | remove | poll | take | poll |
检查方法 | element | peel |
-
-
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
-
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
-
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
-
SynchronousQueue:一个不存储元素的阻塞队列。
-
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
-
LinkedBlockingDeque
fork join框架
实现并行计算,可以将一个大规模计算问题分解成为两个小规模的问题,如果需要合并最终的结果,那么就通过join方法合并最终的结果。
想要实现这个框架,需要继承其中的一个类RecursiveTask<T>类或者 RecursiveActicn类,RecursiveTask类 和RecursiveActicn类相比compute方法,有返回值,如果要解决的问题不需要返回值,那么就继承RecursiveActicn类。
那么,使用invokeAll 方法,将分解的两个子问题交给线程池处理,使用join方法而不是get方法获取返回值,get可能会抛出异常,但是compute方法不允许抛出异常。
class SumTask extends RecursiveTask<Integer> { private int threshold ; private static final int segmentation = 10; private int[] src; private int fromIndex; private int toIndex; public SumTask(int formIndex,int toIndex,int[] src){ this.fromIndex = formIndex; this.toIndex = toIndex; this.src = src; this.threshold = src.length/segmentation; } @Override protected Integer compute() { //如果该条件满足,那么直接把执行任务,得到结果 if((toIndex - fromIndex)<threshold ){ int count = 0; System.out.println(" from index = "+fromIndex +" toIndex="+toIndex); for(int i = fromIndex;i<=toIndex;i++){ count+=src[i]; } return count; //join得到的结果 }else{ //条件不满足,拆分任务得到两个子任务 int mid = (fromIndex+toIndex)/2; SumTask left = new SumTask(fromIndex,mid,src); SumTask right = new SumTask(mid+1,toIndex,src); //将子任务插入线程池,执行子任务 invokeAll(left,right); //返回子任务的结果 return left.join()+right.join(); } } public static void main(String[] args) { int[] array = new int[40]; Arrays.fill(array,10); //专门的线程池 ForkJoinPool forkJoinPool= new ForkJoinPool(); SumTask sumTask = new SumTask(0,array.length-1,array); long start = System.currentTimeMillis(); //提交任务 forkJoinPool.invoke(sumTask); System.out.println("The count is "+sumTask.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } }
线程池
三个接口,一个包装器,一个线程池接口
Runnable Callable Future FutureTask ExecutorService
FutureTask包装器,可以将Callable转换成Runnable 和Future
Executor类有许多的静态工厂方法用来构建线程池
ScheduledExecutorService 接口,具有用于预定任务或者重复执行的任务的方法
ExecutorService 接口的submit方法,返回一个future接口;execute方法提交没有返回值的线程
invokeAny 提交一个 Callable 对象的集合给线程池 , 并返回某个已经完成了的任务的结果。 无法知道返回的究竟是哪个任务的结果 ,也许是最先完成的那个任务的结果。对于搜索问题 , 如果你愿意接受任何一种解决方案的话,你就可以使用这个方法 。
invokeAll 同上方法,只不过返回所有任务的结果。
注意一点,创建线程池 一般使用的是
new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,millseconds,runnableTaskQueue,handler); //** runnableTaskQueue任务队列,用于保存等待执行的任务的阻塞队列,可以旋转4种阻塞队列 handler参数指明了拒绝策略,及线程池超载了,已经塞不下新的任务了,该如何处理新的任务既是拒绝策略。JDK内部有4种策略,一般采用的是不做任何处理,允许丢失DiscardPolicy策略。 */
自定义线程池
ThreadFactory接口,只有一个方法,那就是创建线程 Thread newThread(Runnable r) 可以在这个方法内部对线程进行设置改造,这个接口可以作为参数传入 ThreadPoolExecutor 实例对象,但需要实现ThreadFactory接口。
ThreadPoolExecutor 也是一个可扩展的线程池,它提供了三个方法,对线程池运行之前,之后,消耗进行控制。
beforeExecute afterExecute terminated
ThreadPoolExecutor .Worker 是ThreadPoolExecutor 的内部类,它实现了 Runnable接口,ThreadPoolExecutor 线程池的工作线程就是Worker的实例对象
Worker.runTask()方法会以多线程异步的形式调用
合理地选择线程池的线程数量
线程池需要考虑CPU,内存的大小
线程数量= CPU数量 * 目标CPU使用率 *(1+ 等待时间/计算时间)
CPU数量= Runtime.getRuntime().availableProcessors()
线程池中寻找堆栈信息
如何向线程池讨要堆栈信息
一个简单的方法就是 使用 execute 放弃 submit
另外的办法就是 继承ThreadPoolExecutor ,让它在任务调度前,保持堆栈信息。wrap方法
多线程框架
Future框架
无锁缓存的Disruptor框架
新的并发模型 Actor ,Akka框架