zoukankan      html  css  js  c++  java
  • J.U.C体系进阶(四):juc-sync 同步器框架

    Java - J.U.C体系进阶

    作者:Kerwin

    邮箱:806857264@qq.com

    说到做到,就是我的忍道!

    juc-sync 同步器框架

    同步器名称 作用
    CountDownLatch 倒数计数器,构造时设定计数值,当计数值归零后,所有阻塞线程恢复执行;其内部实现了AQS框架
    CyclicBarrier 循环栅栏,构造时设定等待线程数,当所有线程都到达栅栏后,栅栏放行;其内部通过ReentrantLock和Condition实现同步
    Semaphore 信号量,类似于“令牌”,用于控制共享资源的访问数量;其内部实现了AQS框架
    Exchanger 交换器,类似于双向栅栏,用于线程之间的配对和数据交换;其内部根据并发情况有“单槽交换”和“多槽交换”之分
    Phaser 多阶段栅栏,相当于CyclicBarrier的升级版,可用于分阶段任务的并发控制执行;其内部比较复杂,支持树形结构,以减少并发带来的竞争

    CountDownLatch

    注意:CountDownLatch和CyclicBarrier非常相似,且CyclicBarrier是可以重用的,根据具体的场景不同,代码结构不同,其实两者之间可以相互转化,详见CyclicBarrier模块,下文是CountDownLatch-Demo

    // 用法比较简单,直接上代码即可
    // 1.CountDownLatch的同一对象传递
    // 2.构造参数的默认值需要指定
    // 3.线程完成的countDown()->会使默认值减一
    // 4.主线程awiw()等待,所有线程都countDown之后,主线程执行
    // 应用场景:比如五个子线程文件输出导出数据,主线程等所有子线程都完成之后开始压缩操作,上传文件
              
    public class TestCountDownLatch {
        
    	/***
    	 * 关键点:面向对象的方式->参数传递,把CountDownLatch进行传递,使其共用同一个参数
    	 * @param args
    	 */
        public static void main(String[] args) {
        	CountDownLatch latch = new CountDownLatch(5);
    
            ExecutorService executorService = Executors.newCachedThreadPool();
            MyWoker m1 = new MyWoker("work1", latch);
            MyWoker m2 = new MyWoker("work2", latch);
            MyWoker m3 = new MyWoker("work3", latch);
            MyWoker m4 = new MyWoker("work4", latch);
            MyWoker m5 = new MyWoker("work5", latch);
            Boss boss = new Boss("boos", latch);
            executorService.submit(m1);
            executorService.submit(m2);
            executorService.submit(m3);
            executorService.submit(m4);
            executorService.submit(m5);
            executorService.submit(boss);
            
            executorService.shutdown();
        }
    }
    
    class MyWoker implements Callable<String> {
    	
    	private String name;
    	private CountDownLatch latch;
    	
    	public MyWoker (String name, CountDownLatch latch) {
    		this.name = name;
    		this.latch = latch;
    	}
    	
    	@Override
    	public String call() throws Exception {
    		System.out.println(name + " 工人开始工作");
    		int time = (int)(Math.random() * 100) * 50;
    		Thread.sleep(time);
    		System.out.println(name + " 工人已经完成任务!");
    		latch.countDown();
    		return "successful";
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public CountDownLatch getLatch() {
    		return latch;
    	}
    
    	public void setLatch(CountDownLatch latch) {
    		this.latch = latch;
    	}
    }
    
    class Boss implements Callable<String> {
    	
    	private String name;
    	private CountDownLatch latch;
    	
    	public Boss (String name, CountDownLatch latch) {
    		this.name = name;
    		this.latch = latch;
    	}
    	
    	@Override
    	public String call() throws Exception {
    		System.out.println("老板准备就绪,等工人都完成了就来视察~");
    		latch.await();
    		System.out.println("老板来了,快跑啊~");
    		return "successful";
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public CountDownLatch getLatch() {
    		return latch;
    	}
    
    	public void setLatch(CountDownLatch latch) {
    		this.latch = latch;
    	}
    }
    

    CyclicBarrier

    CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点

    一句话概述就是:“人满发车”

    重点理解:

    CountDownLatch主要用于主线程阻塞,等待子线程执行完毕后,主线程执行,例如报表导出压缩上传,子线程处理报表,主线程等都执行完毕后,压缩,上传

    CyclicBarrier侧重点是人满发车,比如LOL,需要等待是个用户都加载好了之后,再开启主线程执行工作,值得注意的是,这是一般意义的CyclicBarrier

    但是,CyclicBarrier提供了另一个构造方法,即可以指定默认额外的执行线程

    CyclicBarrier barrier = new CyclicBarrier(5,  new TotalTask(totalService));
    

    这意味着,在很多情况CyclicBarrier可以代替CountDownLatch,主要看代码的结构设计

    比如刚才的问题:报表导出压缩上传,子线程处理报表,主线程等都执行完毕后,压缩,上传

    如果我用着这种构造方法,配合awit()的位置,让压缩上传线程默认作为最后执行的线程,即可保证执行的顺序,来看个Demo吧:

    Demo-1 CyclicBarrier普通使用方法:

    public class TestCyclicBarrier {
    
    	public static void main(String[] args) throws InterruptedException {
    		CyclicBarrier cycli = new CyclicBarrier(10);
    		
    		for (int i = 0; i < 9; i++) {
    			new Thread(new BarrierThread("张" + i, cycli)).start();
    		}
    		
    		Thread.sleep(3000);
    		new Thread(new BarrierThread("张" + 10, cycli)).start();
    		
    		Thread.sleep(5000);
    	}
    
    }
    
    class BarrierThread implements Runnable{
    	
    	private String  name;
    	private CyclicBarrier cycli;
    	
    	public BarrierThread(String name, CyclicBarrier cycli) {
    		super();
    		this.name = name;
    		this.cycli = cycli;
    	}
    
    	@Override
    	public void run() {
    		System.out.println(name + " 准备就绪");
    		try {
    			cycli.await();
    		} catch (InterruptedException | BrokenBarrierException e) {
    			e.printStackTrace();
    		}
    		System.out.println(name + " 开始执行");
    	}
    	
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public CyclicBarrier getCycli() {
    		return cycli;
    	}
    
    	public void setCycli(CyclicBarrier cycli) {
    		this.cycli = cycli;
    	}
    }
    

    Demo-2 CyclicBarrier 另一种构造方法的使用:

    注意 CyclicBarrier barrier = new CyclicBarrier(5, new TotalTask(totalService));

    再注意子线程中,awit等待的代码位置,这个代码位置在程序的最后面,因此CyclicBarrier 的灵活性完全可以由我们来把控,到底在哪一点阻塞,完全是我们自己控制的,这样的变化,就可以在一定程度上替代CountDownLatch,达到更加灵活的目的,CyclicBarrier 且是可重复使用的,细节可以再去深入了解

    /**  
     * 各省数据独立,分库存偖。为了提高计算性能,统计时采用每个省开一个线程先计算单省结果,最后汇总。  
     *   
     * @author guangbo email:weigbo@163.com  
     *   
     */  
    public class Total {   
      
        // private ConcurrentHashMap result = new ConcurrentHashMap();   
      
        public static void main(String[] args) {   
            TotalService totalService = new TotalServiceImpl();   
            CyclicBarrier barrier = new CyclicBarrier(5,   
                    new TotalTask(totalService));   
      
            // 实际系统是查出所有省编码code的列表,然后循环,每个code生成一个线程。   
            new BillTask(new BillServiceImpl(), barrier, "北京").start();   
            new BillTask(new BillServiceImpl(), barrier, "上海").start();   
            new BillTask(new BillServiceImpl(), barrier, "广西").start();   
            new BillTask(new BillServiceImpl(), barrier, "四川").start();   
            new BillTask(new BillServiceImpl(), barrier, "黑龙江").start();   
      
        }   
    }   
      
    /**  
     * 主任务:汇总任务  
     */  
    class TotalTask implements Runnable {   
        private TotalService totalService;   
      
        TotalTask(TotalService totalService) {   
            this.totalService = totalService;   
        }   
      
        public void run() {   
            // 读取内存中各省的数据汇总,过程略。   
            totalService.count();   
            System.out.println("=======================================");   
            System.out.println("开始全国汇总");   
        }   
    }   
      
    /**  
     * 子任务:计费任务  
     */  
    class BillTask extends Thread {   
        // 计费服务   
        private BillService billService;   
        private CyclicBarrier barrier;   
        // 代码,按省代码分类,各省数据库独立。   
        private String code;   
      
        BillTask(BillService billService, CyclicBarrier barrier, String code) {   
            this.billService = billService;   
            this.barrier = barrier;   
            this.code = code;   
        }   
      
        public void run() {   
            System.out.println("开始计算--" + code + "省--数据!");   
            billService.bill(code);   
            // 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略   
            System.out.println(code + "省已经计算完成,并通知汇总Service!");   
            try {   
                // 通知barrier已经完成   
                barrier.await();   
            } catch (InterruptedException e) {   
                e.printStackTrace();   
            } catch (BrokenBarrierException e) {   
                e.printStackTrace();   
            }   
        }   
      
    } 
    

    相关方法:

    — getParties()

    获取CyclicBarrier打开屏障的线程数量,也成为方数。

    — getNumberWaiting()

    获取正在CyclicBarrier上等待的线程数量。

    —await()

    —await(timeout,TimeUnit)

    —isBroken()

    获取是否破损标志位broken的值,此值有以下几种情况:

    CyclicBarrier初始化时,broken=false,表示屏障未破损。
    如果正在等待的线程被中断,则broken=true,表示屏障破损。
    如果正在等待的线程超时,则broken=true,表示屏障破损。
    如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。
    —reset()

    使得CyclicBarrier回归初始状态,直观来看它做了两件事:

    如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
    将是否破损标志位broken置为false。

    CountDownLatch和CyclicBarrier的主要联系和区别如下:

    1.闭锁CountDownLatch做减计数,而栅栏CyclicBarrier则是加计数。

    2.CountDownLatch是一次性的,CyclicBarrier可以重用。

    3.CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成。

    4.鉴于上面的描述,CyclicBarrier在一些场景中可以替代CountDownLatch实现类似的功能

    Semaphore

    信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可

    Semaphore用来控制并发线程数,但有个问题FixedThreadPool也可以控制最大并发数,那两者有何不一样呢?首先,量级来看,Semaphore轻量级,是一个并发工具类,线程池重量级无疑,其次特点来讲,Semaphore内的线程是我们实实在在自己创建的,FixedThreadPool是分配给我们的线程池里面的线程

    另外,Semaphore如果默认大小为1的时候,还可以当作互斥锁使用,且有公平锁和非公平锁之分(是否按顺序执行,是则就是公平的,但是非常耗性能)

    代码Demo,线程队列和Semaphore配合使用

    public class TestQueeThread_2 {
    	static Semaphore semaphore = new Semaphore(1);
        public static void main(String[] args) throws InterruptedException {
            System.out.println("begin:" + (System.currentTimeMillis() / 1000)); 
            BlockingQueue<String> myQueue = new ArrayBlockingQueue<String>(1);
           
            for (int i = 0; i < 100; i++) {
            	new Thread(new Runnable() {
    				@Override
    				public void run() {
    					try {
    						semaphore.acquire();
    						String log = myQueue.take();
    						System.out.println(Thread.currentThread().getName() + ":" + doSome(log));
    						semaphore.release();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    			}).start();
            }
            
            for (int i = 0; i < 100; i++) { // 这行代码不能改动
                String input = i + "";
                myQueue.put(input);
            }
        }
    
        public static String doSome(String input) {
        	String output = null;
            try {
                Thread.sleep(1000);
                output = input + ":" + (System.currentTimeMillis() / 1000);
                return output;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
    		return output;
        }
    }
    
    // 这种用法可能违背了oracle的本意,我们来看看oracle的官方Demo
    // Semaphore是用来约束线程使用共享资源的,控制数据一致性,当然还得是锁
    class Pool {
        private static final int MAX_AVAILABLE = 100; // 可同时访问资源的最大线程数
        private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
        protected Object[] items = new Object[MAX_AVAILABLE];   //共享资源
        protected boolean[] used = new boolean[MAX_AVAILABLE];
        public Object getItem() throws InterruptedException {
            available.acquire();
            return getNextAvailableItem();
        }
        public void putItem(Object x) {
            if (markAsUnused(x))
                available.release();
        }
        private synchronized Object getNextAvailableItem() {
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                if (!used[i]) {
                    used[i] = true;
                    return items[i];
                }
            }
            return null;
        }
        private synchronized boolean markAsUnused(Object item) {
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                if (item == items[i]) {
                    if (used[i]) {
                        used[i] = false;
                        return true;
                    } else
                        return false;
                }
            }
            return false;
        }
    }
    

    Exchanger

    Exchanger是用作线程并发协作的工具类,简单一句话讲,如果A,B线程都拥有Exchanger对象,如果某一个调用Exchanger的交换方法exchange时候,快的那个会主动等慢的那个(等的意思就是挂起),然后都到位之后,互相唤醒交换数据

    代码Demo:

    public class ExchangerTest {
    	static class Producer extends Thread {
    		private Exchanger<Integer> exchanger;
    		private static int data = 0;
    
    		Producer(String name, Exchanger<Integer> exchanger) {
    			super("Producer-" + name);
    			this.exchanger = exchanger;
    		}
    
    		@Override
    		public void run() {
    			for (int i = 1; i < 5; i++) {
    				try {
    					TimeUnit.SECONDS.sleep(1);
    					data = i;
    					System.out.println(getName() + " 交换前:" + data);
    					data = exchanger.exchange(data);
    					System.out.println(getName() + " 交换后:" + data);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    
    	static class Consumer extends Thread {
    		private Exchanger<Integer> exchanger;
    		private static int data = 0;
    
    		Consumer(String name, Exchanger<Integer> exchanger) {
    			super("Consumer-" + name);
    			this.exchanger = exchanger;
    		}
    
    		@Override
    		public void run() {
    			while (true) {
    				data = 0;
    				System.out.println(getName() + " 交换前:" + data);
    				try {
    					TimeUnit.SECONDS.sleep(1);
    					data = exchanger.exchange(data);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				System.out.println(getName() + " 交换后:" + data);
    			}
    		}
    	}
    
    	public static void main(String[] args) throws InterruptedException {
    		Exchanger<Integer> exchanger = new Exchanger<Integer>();
    		new Producer("", exchanger).start();
    		new Consumer("", exchanger).start();
    		TimeUnit.SECONDS.sleep(7);
    		System.exit(-1);
    	}
    }
    
    结果打印:
    Consumer- 交换前:0
    Producer- 交换前:1
    Consumer- 交换后:1
    Producer- 交换后:0
    Consumer- 交换前:0
    Producer- 交换前:2
    Producer- 交换后:0
    Consumer- 交换后:2
    Consumer- 交换前:0
    Producer- 交换前:3
    Producer- 交换后:0
    Consumer- 交换后:3
    Consumer- 交换前:0
    Producer- 交换前:4
    Producer- 交换后:0
    Consumer- 交换后:4
    Consumer- 交换前:0
    

    我暂时没有碰到用需要此特点的需求,不过很显然它可以用作生产者消费者模式,通过网上的解释来看,Exchanger的实现是非常复杂的,主要是依赖CAS自旋操作

    Phase

    Phaser 是一个多栅栏的同步工具

    phase(阶段) - Phaser也有栅栏,在Phaser中,栅栏的名称叫做phase(阶段),在任意时间点,Phaser只处于某一个phase(阶段),初始阶段为0,最大达到Integerr.MAX_VALUE,然后再次归零。当所有parties参与者都到达后,phase值会递增

    parties(参与者) - 其实就是CyclicBarrier中的参与线程的概念,CyclicBarrier中的参与者在初始构造指定后就不能变更,而Phaser既可以在初始构造时指定参与者的数量,也可以中途通过register、bulkRegister、arriveAndDeregister等方法注册/注销参与者

    arrive(到达) / advance(进阶) - Phaser注册完parties(参与者)之后,参与者的初始状态是unarrived的,当参与者到达(arrive)当前阶段(phase)后,状态就会变成arrived。当阶段的到达参与者数满足条件后(注册的数量等于到达的数量),阶段就会发生进阶(advance)——也就是phase值+1

    public class SwimmerTest {
    
    	// 游泳选手个数
    	private static int swimmerNum = 6;
    
    	public static void main(String[] args) {
    
    		Phaser phaser = new Phaser(7){
                @Override
                protected boolean onAdvance(int phase, int registeredParties) {
                    System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
                    return phase >= 2  || registeredParties == 0;
                }
            };
    
    		for (int i = 0; i < swimmerNum; i++) {
    			new Thread(new Swimmer(phaser), "swimmer" + i).start();
    		}
    
    		// 主线程到达,开启第二阶段
    		phaser.arriveAndAwaitAdvance();
    		
    		// 主线程销毁,开启第三阶段
    		phaser.arriveAndDeregister();
    		
    		// 加while是为了防止其它线程没结束就打印了"比赛结束"
    		while (!phaser.isTerminated()) {}
    
    		System.out.println("===== 比赛结束 =====");
    	}
    }
    
    class Swimmer implements Runnable {
    	
    	private Phaser phaser;
    
    	public Swimmer(Phaser phaser) {
    		this.phaser = phaser;
    	}
    
    	@Override
    	public void run() {
    
    		// 从这里到第一个phaser.arriveAndAwaitAdvance()是第一阶段做的事
    		System.out.println("游泳选手-" + Thread.currentThread().getName() + ":已到达赛场");
    
    		phaser.arriveAndAwaitAdvance();
    
    		// 从这里到第二个phaser.arriveAndAwaitAdvance()是第二阶段做的事
    		System.out.println("游泳选手-" + Thread.currentThread().getName() + ":已准备好");
    
    		phaser.arriveAndAwaitAdvance();
    
    		// 从这里到第三个phaser.arriveAndAwaitAdvance()是第三阶段做的事
    		System.out.println("游泳选手-" + Thread.currentThread().getName() + ":完成比赛");
    
    		phaser.arriveAndAwaitAdvance();
    	}
    }
    

    上文说到,Phase阶段概念,且注册数量和到达数量一致的之后,就会进入下一个阶段,代码中即是如此,一开始注册七个指标,游泳子线程会运行到达6个,然后由主线程控制到达,进入到下一个阶段,注意的是参与的线程可以注册也可以销毁,所以主线程阶段二是到达,阶段三测试了销毁

    Phaser 的onAdvance方法:

    Phaser phaser = new Phaser(7){
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
            return phase >= 2  || registeredParties == 0;
        }
    };
    

    当前阶段,最后一个线程到达后,会触发onAdvance方法,此处是打印了信息,且写明了Phaser终止的标志,注册线程数为0或阶段数到达2 (0,1,2)

  • 相关阅读:
    mybatis中resultMap配置细则
    关于mybatis中typeHandler的两个案例
    Sharding模式
    Data Partitioning Guidance
    算法相关——Java排序算法之桶排序(一)
    Java并发——线程中断学习
    Android开发——ListView使用技巧总结(二)
    Android开发——ListView使用技巧总结(一)
    [原]Jenkins(十八) jenkins再出发之jenkins 内置变量
    [原]Jenkins(十七) jenkins再出发之配置SVN
  • 原文地址:https://www.cnblogs.com/kkzhilu/p/12859506.html
Copyright © 2011-2022 走看看