zoukankan      html  css  js  c++  java
  • java并发基础(五)--- 线程池的使用

    第8章介绍的是线程池的使用,直接进入正题。

    一、线程饥饿死锁和饱和策略

      1.线程饥饿死锁

      在线程池中,如果任务依赖其他任务,那么可能产生死锁。举个极端的例子,在单线程的Executor中,如果一个任务提交了另一个任务到相同的Executor中,并等待其返回,那么就会发生死锁。第二个任务停留在工作队列中,第一个又一直等待(因为是单线程)。这块记住一个信息,就是如果线程池中的任务是互相依赖的,除非线程池无限大,否则就有可能产生线程饥饿死锁,而且是否产生死锁要看时机,这也就是为什么Executor框架提供的实现中提倡使用newCachedThreadPool作为默认实现,原因之一就是它的线程数无限大(当然是理论上)。

      2.饱和策略

      当线程池的有界队列填满后,该用一种什么样的策略来处理没能添加进来的任务,JDK提供了几种默认实现。

      (1)中止(Abort):默认策略,抛出未检出的RejectedExecutionException。

      (2)抛弃(Discard):新提交的任务无法保存到队列中,则被抛弃。

      (3)抛弃最旧的(Discard-Oldest):抛弃下一个将被执行的任务,然后尝试提交新的任务。这个策略不适合优先队列,因为会抛弃优先级最高的任务。

      (4)调用者运行(Caller-Runs):该策略不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

      ThreadPoolExecutor的饱和策略通过调用setRejectedExecutionHandler来修改。

    二、示例:搬箱子

      搬箱子计算从初始位置到目标位置的所有合法移动,以及每次移动的结果位置,感觉上有点像阿尔法狗下围棋的逻辑类似,当然不是一个层面的问题,我们只是用这个例子熟悉下线程池的使用。这块涉及到的比如如何判断当前位置是否是目标位置,以及如何计算所有合法的移动等等,我们先不管,用接口代替,毕竟我们要思考的是线程池的使用。

      首先是搬箱子的抽象类,该抽象类应该包括这么几个接口:1.初始化位置 2.判断当前位置是否是目标位置 3.列出所有可能的移动 4.执行移动

    //P:位置类  M:移动类 
    public interface Puzzle<P,M>{
        //初始化位置
        P initialPosition();
        //判断该位置是否是目标位置
        boolean isGoal(P position);
        //列出从position开始的所有合法移动
        Set<M> legalMoves(P position);
        //从指定位置开始移动 返回移动后的结果位置
        P move(P position,M move);
    }

    这个接口可以解决问题,找到合法移动,然后执行移动,接下来,就是如何操作了,我们先看串行代码如何写:

    public class SequentialPuzzleSolver<P,M>{
        private final Puzzle<P,M> puzzle;
        //所有移动位置的集合
        private final Set<P> seen = new HashSet<P>();
        
        public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
            super();
            this.puzzle = puzzle;
        }
        
        public List<M> solve(){
            P pos = puzzle.initialPosition();
            return search(new Node<P,M>(pos, null, null));
        }
        
        private List<M> search(Node<P,M> node){
            if (!seen.contains(node.pos)) {
                seen.add(node.pos);
                if (puzzle.isGoal(node.pos)) {
                    return node.asMoveList();
                }
                for (M move:puzzle.legalMoves(node.pos)) {
                    //向指定位置移动返回最新位置
                    P pos = puzzle.move(node.pos, move);
                    //将最新位置封装成node继续移动
                    Node<P,M> child = new Node<P,M>(pos, move, node);
                    //递归
                    List<M> result = search(child);
                    if (result != null) {
                        return result;
                    }
                }
            }
            
            return null;
        }
        
        
        static class Node<P,M>{
            final P pos;
            final M move;
            final Node<P,M> prev;
            
            Node(P pos, M move, Node<P, M> prev) {
                super();
                this.pos = pos;
                this.move = move;
                this.prev = prev;
            }
            
            List<M> asMoveList(){
                List<M> solution = new LinkedList<M>();//用链表,增删快
                for (Node<P,M> n = this;n.move != null;n=n.prev) {
                    solution.add(0,n.move);//最新一次的移动下标为0
                }
                
                return solution;
            }
            
        }
    }

      Node是对Positon的进一步封装,保存了当前node的位置position和移动move以及前一个节点。这样不断追溯就可以得到完整的移动轨迹。可以看到,串行的思路是先得到所有可能的移动,然后遍历,一个一个移动,每移动一次再查找当前位置的可能移动,再遍历......也就是循环递归调用,这种显然是没有效率的,可以并发的地方也在这里。这里必须明确任务的边界即:一次移动。

    //并发处理
    public class ConcurrentPuzzleSolver<P,M>{
        private final Puzzle<P,M> puzzle;
        private final ExecutorService exec;
        private final ConcurrentHashMap<P, Boolean> seen;
        
        final ValueLatch<Node<P,M>> solution = new ValueLatch<Node<P,M>>();
    
        public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle, ExecutorService exec,
                ConcurrentHashMap<P, Boolean> seen) {
            super();
            this.puzzle = puzzle;
            this.exec = exec;
            this.seen = seen;
        }
        
        public List<M> solve() throws InterruptedException{
            try {
                P p = puzzle.initialPosition();
                exec.execute(newTask(p,null,null));
                //阻塞直到找到答案
                Node<P,M> solnNode = solution.getValue();
                return (solnNode == null)?null:solnNode.asMoveList();
            } catch (Exception e) {
                exec.shutdown();
            }
        }
        
        protected Runnable newTask(P p,M m,Node<P,M> n){
            return new SolverTask(p,m,n);
        }
        
        class SolverTask extends Node<P,M> implements Runnable {
            public SolverTask(P pos, M move, Node<P, M> prev) {
                super(pos, move, prev);
            }
    
            public void run() {
                //首先访问闭锁,如果有答案则停止
                if (solution.isSet()||seen.putIfAbsent(pos, true) != null) {
                    return;
                }
                if (puzzle.isGoal(pos)) {
                    solution.setValue(this);
                }else {
                    for (M m:puzzle.legalMoves(pos)) {
                        exec.execute(newTask(puzzle.move(pos, m), m, this));
                    }
                }
            }
        }
    }
    //有答案后停止 闭锁实现
    public class ValueLatch<T>{
        private T value = null;
        private final CountDownLatch done = new CountDownLatch(1);
        
        public boolean isSet(){
            return (done.getCount() == 0);
        }
        
        public synchronized void setValue(T newValue){
            if (!isSet()) {
                value = newValue;
                done.countDown();
            }
        }
        
        public T getValue() throws InterruptedException{
            done.await();
            synchronized (this) {
                return value;
            }
        }
    }

      ValueLatch的作用是当线程池找到一个答案后停止其他任务,组合CountDownLatch实现,这是闭锁的另一个例子。第一个例子在java并发基础(二)的第三部分同步容器中介绍过了。在获得第一个答案之前,主线程将一直等待,ValueLatch中的getValue将一直阻塞,直到有线程设置了这个值。找到第一个答案后关闭线程池,不再接受新的任务,另外,为了避免抛出RejectedExecutionException,设置线程池饱和策略为Discard。

  • 相关阅读:
    Spring(十一):Spring配置Bean(四)SpEL
    Java中动态代理方式(使用java.lang.reflect.Proxy实现):
    设计模式(八)静态代理与动态代理模式
    Spring(十):Spring配置Bean(三)Bean的作用域、使用外部属性文件
    如何把本地代码提交到git(码云)、github代码管理项目上
    Spring(九):Spring配置Bean(二)自动装配的模式、Bean之间的关系
    centos7安装mysql5.7
    haproxy(单机)+mysql集群负载均衡
    sql server 用户创建与权限管理
    MySQL之 从复制延迟问题排查
  • 原文地址:https://www.cnblogs.com/peterxiao/p/7661187.html
Copyright © 2011-2022 走看看