zoukankan      html  css  js  c++  java
  • Java并发编程核心方法与框架-Semaphore的使用

    Semaphore中文含义是信号、信号系统,这个类的主要作用就是限制线程并发数量。如果不限制线程并发数量,CPU资源很快就会被耗尽,每个线程执行的任务会相当缓慢,因为CPU要把时间片分配给不同的线程对象,而且上下文切换也要耗时,最终造成系统运行效率大幅降低,所以限制并发线程的数量是很有必要的。

    类Semaphore的同步性

    类Semaphore的构造方法参数permits表示同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码。

    public class Service {
    	private Semaphore semaphore = new Semaphore(1);
    	public void testMethod() {
    		try {
    			System.out.println(Thread.currentThread().getName() + "进入testMethod方法 time:" + System.currentTimeMillis());
    			semaphore.acquire();
    			System.out.println(Thread.currentThread().getName() + " begin time:" + System.currentTimeMillis());
    			Thread.sleep(2000);
    			System.out.println(Thread.currentThread().getName() + " end time:" + System.currentTimeMillis());
    			semaphore.release();	
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    public class ThreadA extends Thread {
    	private Service service;
    	public ThreadA(Service service) {
    		super();
    		this.service = service;
    	}
    	@Override
    	public void run() {
    		service.testMethod();
    	}
    }
    
    public class ThreadB extends Thread {
    	private Service service;
    	public ThreadB(Service service) {
    		super();
    		this.service = service;
    	}
    	@Override
    	public void run() {
    		service.testMethod();
    	}
    }
    
    public class ThreadC extends Thread {
    	private Service service;
    	public ThreadC(Service service) {
    		super();
    		this.service = service;
    	}
    	@Override
    	public void run() {
    		service.testMethod();
    	}
    }
    
    public class Main {
    	public static void main(String[] args) {
    		Service service = new Service();
    		ThreadA a = new ThreadA(service);
    		a.setName("A");
    		ThreadB b = new ThreadB(service);
    		b.setName("B");
    		ThreadC c = new ThreadC(service);
    		c.setName("C");
    		a.start();
    		b.start();
    		c.start();
    	}
    }
    

    运行程序,控制台打印结果如下:

    A进入testMethod方法 time:1468497756729
    C进入testMethod方法 time:1468497756729
    B进入testMethod方法 time:1468497756729
    A begin time:1468497756729
    A end time:1468497758733
    C begin time:1468497758733
    C end time:1468497760738
    B begin time:1468497760738
    B end time:1468497762742
    

    从打印结果来看,A、B、C三个线程同时进入testMethod方法,三个线程排队执行acquire()和release()之间的代码。修改Semaphore的构造方法参数:

    public class Service {
    	private Semaphore semaphore = new Semaphore(2);
    	public void testMethod() {
    		try {
    			System.out.println(Thread.currentThread().getName() + "进入testMethod方法 time:" + System.currentTimeMillis());
    			semaphore.acquire();
    			System.out.println(Thread.currentThread().getName() + " begin time:" + System.currentTimeMillis());
    			Thread.sleep(2000);
    			System.out.println(Thread.currentThread().getName() + " end time:" + System.currentTimeMillis());
    			semaphore.release();	
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    重新运行程序,控制台打印结果如下:

    B进入testMethod方法 time:1468497974695
    C进入testMethod方法 time:1468497974695
    A进入testMethod方法 time:1468497974695
    C begin time:1468497974695
    B begin time:1468497974695
    B end time:1468497976699
    C end time:1468497976699
    A begin time:1468497976699
    A end time:1468497978703
    

    可见,此A、B、C三个线程同时进入testMethod方法,时B、C线程同时开始执行acquire()和release()之间的代码。B、C线程执行完后,A线程开始执行acquire()和release()之间的代码。


    方法acquire(int permits)参数作用及动态添加permits许可数量

    有参方法acquire(int permits)的功能是每调用1次此方法,就使用permits个许可。

    修改以上Service类:

    //Semaphore的构造方法参数permits表示同一时间内
    //最多允许多少个线程同时执行acquire()和release()之前的代码
    public class Service {
    	private Semaphore semaphore = new Semaphore(10);//一共有10个许可
    	public void testMethod() {
    		try {
    			System.out.println(Thread.currentThread().getName() + "进入testMethod方法 time:" + System.currentTimeMillis());
    			semaphore.acquire(2);//每次执行消耗掉2个许可
    			System.out.println(Thread.currentThread().getName() + " begin time:" + System.currentTimeMillis());
    			Thread.sleep(2000);
    			System.out.println(Thread.currentThread().getName() + " end time:" + System.currentTimeMillis());
    			semaphore.release(2);	
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    public class Main {
    	public static void main(String[] args) {
    		Service service = new Service();
    		ThreadA[] a = new ThreadA[10];//ThreadA类同上面的ThreadA类
    		for (int i = 0; i < a.length; i++) {
    			a[i] = new ThreadA(service);
    			a[i].start();
    		}
    	}
    }
    

    程序运行结果如下:

    Thread-0进入testMethod方法 time:1468587142883
    Thread-3进入testMethod方法 time:1468587142883
    Thread-2进入testMethod方法 time:1468587142883
    Thread-1进入testMethod方法 time:1468587142883
    Thread-5进入testMethod方法 time:1468587142883
    Thread-2 begin time:1468587142883
    Thread-3 begin time:1468587142883
    Thread-4进入testMethod方法 time:1468587142883
    Thread-0 begin time:1468587142883
    Thread-8进入testMethod方法 time:1468587142883
    Thread-7进入testMethod方法 time:1468587142883
    Thread-6进入testMethod方法 time:1468587142883
    Thread-5 begin time:1468587142883
    Thread-1 begin time:1468587142883
    Thread-9进入testMethod方法 time:1468587142884
    Thread-0 end time:1468587144888
    Thread-3 end time:1468587144888
    Thread-2 end time:1468587144888
    Thread-5 end time:1468587144888
    Thread-1 end time:1468587144888
    Thread-6 begin time:1468587144889
    Thread-7 begin time:1468587144889
    Thread-8 begin time:1468587144889
    Thread-4 begin time:1468587144889
    Thread-9 begin time:1468587144889
    Thread-7 end time:1468587146892
    Thread-4 end time:1468587146892
    Thread-8 end time:1468587146892
    Thread-6 end time:1468587146892
    Thread-9 end time:1468587146892
    

    由程序运行结果可见,10个线程同时进入testMethod()方法。由于一共有10个许可,每个线程acquire()时消耗2个许可,所以第一批有5个线程可以同时执行acquire()方法和release()方法之间的代码。第一批的5个线程执行完毕之后,每个线程释放掉2个许可,一共释放掉10个许可。剩下的5个线程一共获取10个许可同时开始执行。


    方法acquireUninterruptibly()的使用

    方法acquireUninterruptibly()的作用是使等待进入acquire()方法的线程不允许被中断。

    先看一段能中断的代码

    public class Service {
    	private Semaphore semaphore = new Semaphore(1);
    	public void testMethod() {
    		try {
    			semaphore.acquire();
    			System.out.println(Thread.currentThread().getName() + " begin time=" + System.currentTimeMillis());
    			for (int i = 0; i < Integer.MAX_VALUE/50; i++) {
    				String newString = new String();
    				Math.random();
    			}
    			System.out.println(Thread.currentThread().getName() + " end time=" + System.currentTimeMillis());
    			semaphore.release();
    		} catch (Exception e) {
    			System.out.println(Thread.currentThread().getName() + " 进入了catch");
    			e.printStackTrace();
    		}
    	}
    }
    //省略ThreadA和ThreadB的代码
    public class Main {
    	public static void main(String[] args) throws InterruptedException {
    		Service service = new Service();
    		ThreadA a = new ThreadA(service);
    		a.setName("A");
    		a.start();
    		
    		ThreadB b = new ThreadB(service);
    		b.setName("B");
    		b.start();
    		
    		Thread.sleep(1000);
    		
    		b.interrupt();
    		System.out.println("main中断了a");
    	}
    }
    

    程序运行结果如下:

    A begin time=1468592693921
    main中断了a
    java.lang.InterruptedException
    B 进入了catch
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:996)
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
    	at java.util.concurrent.Semaphore.acquire(Semaphore.java:317)
    	at com.concurrent.chapter1.concurrent02.Service.testMethod(Service.java:9)
    	at com.concurrent.chapter1.concurrent02.ThreadB.run(ThreadB.java:11)
    A end time=1468592695193
    

    线程B成功被中断。对以上代码做如下修改:

    public class Service {
    	private Semaphore semaphore = new Semaphore(1);
    	public void testMethod() {
    		try {
    			semaphore.acquireUninterruptibly();//不允许被中断
    			System.out.println(Thread.currentThread().getName() + " begin time=" + System.currentTimeMillis());
    			for (int i = 0; i < Integer.MAX_VALUE/50; i++) {
    				String newString = new String();
    				Math.random();
    			}
    			System.out.println(Thread.currentThread().getName() + " end time=" + System.currentTimeMillis());
    			semaphore.release();
    		} catch (Exception e) {
    			System.out.println(Thread.currentThread().getName() + " 进入了catch");
    			e.printStackTrace();
    		}
    	}
    }
    

    重新运行程序,控制台的打印结果如下:

    A begin time=1468592930516
    main中断了a
    A end time=1468592931792
    B begin time=1468592931792
    B end time=1468592932998
    

    acquireUninterruptibly()方法还有重载的写法acquireUninterruptibly(int permits),作用是在等待许可的情况下不允许中断,如果成功获得锁,则取得指定permits个许可。


    方法availablePermits()和drainPermits()

    availablePermits()返回此Semaphore对象中当前可用的许可数。

    public class Service {
    	private Semaphore semaphore = new Semaphore(10);//一共有10个许可
    	public void testMethod() {
    		try {
    			semaphore.acquire(2);//每次执行消耗掉2个许可
    			System.out.println(semaphore.getQueueLength() + "个线程正在等待");
    			System.out.println("是否有线程正在等待semaphore:" + semaphore.hasQueuedThreads());
    			System.out.println("可用许可个数" + semaphore.availablePermits());
    			Thread.sleep(2000);
    			semaphore.release(2);
    			System.out.println("可用许可个数" + semaphore.availablePermits());
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    //省略ThreadA类代码
    
    public class Main {
    	public static void main(String[] args) throws InterruptedException {
    		Service service = new Service();
    		ThreadA[] a = new ThreadA[10];
    		for (int i = 0; i < a.length; i++) {
    			a[i] = new ThreadA(service);
    			a[i].start();
    			Thread.sleep(100);
    		}
    	}
    }
    

    运行程序,控制台打印结果如下:

    0个线程正在等待
    是否有线程正在等待semaphore:false
    可用许可个数8
    0个线程正在等待
    是否有线程正在等待semaphore:false
    可用许可个数6
    0个线程正在等待
    是否有线程正在等待semaphore:false
    可用许可个数4
    0个线程正在等待
    是否有线程正在等待semaphore:false
    可用许可个数2
    0个线程正在等待
    是否有线程正在等待semaphore:false
    可用许可个数0
    可用许可个数2
    4个线程正在等待
    是否有线程正在等待semaphore:true
    可用许可个数0
    可用许可个数0
    3个线程正在等待
    是否有线程正在等待semaphore:true
    可用许可个数0
    可用许可个数2
    2个线程正在等待
    是否有线程正在等待semaphore:true
    可用许可个数0
    可用许可个数2
    1个线程正在等待
    是否有线程正在等待semaphore:true
    可用许可个数0
    可用许可个数2
    0个线程正在等待
    是否有线程正在等待semaphore:false
    可用许可个数0
    可用许可个数2
    可用许可个数4
    可用许可个数6
    可用许可个数8
    可用许可个数10
    

    availablePermits()通常用于调试,因为许可的数量有可能实时在改变。

    drainPermits()可以获取并返回立即可用的许可数,并且将许可置为0.

    getQueueLength()获取等待许可的线程的个数。

    hasQueuedThreads()判断是否有线程在等待这个许可。


    公平与非公平信号量的测试

    公平信号量是获得锁的顺序与线程启动顺序有关,但不代表100%得获得信号量,仅仅是在概率上能得到保证。非公平信号量就是获得锁的顺序与线程启动顺序无关。

    public class Service {
    	private boolean isFair = false;
    	private Semaphore semaphore = new Semaphore(1, isFair);
    	public void testMethod() {
    		try {
    			semaphore.acquire();
    			System.out.println(Thread.currentThread().getName());
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			semaphore.release();
    		}
    	}
    }
    
    public class MyThread extends Thread {
    	private Service service;
    	public MyThread(Service service) {
    		super();
    		this.service = service;
    	}
    	@Override
    	public void run() {
    		System.out.println(Thread.currentThread().getName() + "启动了");
    		service.testMethod();
    	}
    }
    
    public class Main {
    	public static void main(String[] args) {
    		Service service = new Service();
    		MyThread thread = new MyThread(service);
    		thread.start();
    		MyThread[] threads = new MyThread[4];
    		for (int i = 0; i < threads.length; i++) {
    			threads[i] = new MyThread(service);
    			threads[i].start();
    		}
    	}
    }
    

    运行程序,控制台打印结果如下:

    Thread-0启动了
    Thread-3启动了
    Thread-2启动了
    Thread-1启动了
    Thread-4启动了
    Thread-0
    Thread-3
    Thread-2
    Thread-1
    Thread-4
    

    此时的信号量为非公平信号量,线程的启动顺序与其调用semaphore.acquire()无关。先启动的线程不一定先获得许可。

    对以上程序做如下修改:

    public class Service {
    	private boolean isFair = true;//公平锁
    	private Semaphore semaphore = new Semaphore(1, isFair);
    	public void testMethod() {
    		try {
    			semaphore.acquire()
    			System.out.println(Thread.currentThread().getName());
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			semaphore.release();
    		}
    	}
    }
    

    重新运行程序,控制台打印结果如下:

    Thread-0启动了
    Thread-3启动了
    Thread-2启动了
    Thread-1启动了
    Thread-0
    Thread-4启动了
    Thread-3
    Thread-2
    Thread-1
    Thread-4
    

    此时线程启动的顺序与线程执行semaphore.acquire()的顺序一致。先启动的线程先获得许可(非100%)。


    方法tryAcquire()的使用

    无参方法tryAcquire()的作用是尝试获得1一个许可,如果获取不到则返回false,此方法通常与if语句结合使用,具有无阻塞的特点。

    public class Service {
    	private Semaphore semaphore = new Semaphore(1);
    	public void testMethod() {
    		if (semaphore.tryAcquire()) {
    			System.out.println(Thread.currentThread().getName() + "首选进入");
    			for (int i = 0; i < Integer.MAX_VALUE; i++) {
    				String newString = new String();
    				Math.random();
    			}
    			semaphore.release();
    		} else {
    			System.out.println(Thread.currentThread().getName() + "未成功进入");
    		}
    	}
    }
    //省略ThreadA、ThreadB代码
    public class Main {
    	public static void main(String[] args) throws InterruptedException {
    		Service service = new Service();
    		ThreadA a = new ThreadA(service);
    		a.setName("A");
    		ThreadB b = new ThreadB(service);
    		b.setName("B");
    		a.start();
    		b.start();
    	}
    }
    

    程序运行结果如下:

    A首选进入
    B未成功进入
    

    方法tryAcquire(long timeout, TimeUnit unit)的使用

    有参方法tryAcquire(long timeout, TimeUnit unit)的作用是在指定的时间内尝试获得1个许可,如果获取不到就返回false。

    public class Service {
    	private Semaphore semaphore = new Semaphore(1);
    	public void testMethod() {
    		try {
    			if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
    				System.out.println(Thread.currentThread().getName() + "首选进入");
    				for (int i = 0; i < Integer.MAX_VALUE; i++) {
    					String newString = new String();
    					Math.random();
    				}
    				semaphore.release();
    			} else {
    				System.out.println(Thread.currentThread().getName() + "未成功进入");
    			}
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    //省略ThreadA、ThreadB
    //省略面函数
    

    运行程序,控制台打印结果如下:

    A首选进入
    B未成功进入
    

    对以上代码做如下修改:

    public class Service {
    	private Semaphore semaphore = new Semaphore(1);
    	public void testMethod() {
    		try {
    			if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
    				System.out.println(Thread.currentThread().getName() + "首选进入");
    				for (int i = 0; i < Integer.MAX_VALUE; i++) {
    					//String newString = new String();
    					//Math.random();
    				}
    				semaphore.release();
    			} else {
    				System.out.println(Thread.currentThread().getName() + "未成功进入");
    			}
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    重新运行程序,控制台打印结果如下:

    A首选进入
    B首选进入
    

    方法tryAcquire(int permits, long timeout, TimeUnit unit)的使用

    有参方法tryAcquire(int permits, long timeout, TimeUnit unit)的作用是在指定的时间内尝试去的permits个许可,如果获取不到则返回false。

    public class Service {
    	private Semaphore semaphore = new Semaphore(3);
    	public void testMethod() {
    		try {
    			if (semaphore.tryAcquire(3, 3, TimeUnit.SECONDS)) {
    				System.out.println(Thread.currentThread().getName() + "首选进入");
    				for (int i = 0; i < Integer.MAX_VALUE; i++) {
    					String newString = new String();
    					Math.random();
    				}
    				semaphore.release(3);
    			} else {
    				System.out.println(Thread.currentThread().getName() + "未成功进入");
    			}
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    //省略ThreadA、ThreadB
    //省略面函数
    

    运行程序,控制台打印结果如下:

    A首选进入
    B未成功进入
    

    注释掉for循环中的两行代码,A、B就都可以获得许可。


    多进路-多处理-多出路实验

    public class Service {
    	private Semaphore semaphore = new Semaphore(3);
    	public void sayHello() {
    		try {
    			semaphore.acquire();
    			System.out.println(Thread.currentThread().getName() + "准备:" + System.currentTimeMillis());
    			for (int i = 0; i < 5; i++) {
    				System.out.println(Thread.currentThread().getName() + "打印:" + i);
    			}
    			System.out.println(Thread.currentThread().getName() + "结束:" + System.currentTimeMillis());
    			semaphore.release();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    //省略MyThread代码
    public class Main {
    	public static void main(String[] args) {
    		Service service = new Service();
    		MyThread[] threads = new MyThread[10];
    		for (int i = 0; i < threads.length; i++) {
    			threads[i] = new MyThread(service);
    			threads[i].start();
    		}
    	}
    }
    

    运行程序,控制台打印结果如下:

    Thread-0准备:1469151758503
    Thread-2准备:1469151758503
    Thread-1准备:1469151758503
    Thread-2打印:0
    Thread-2打印:1
    Thread-0打印:0
    Thread-2打印:2
    Thread-1打印:0
    Thread-2打印:3
    Thread-0打印:1
    Thread-2打印:4
    Thread-1打印:1
    Thread-1打印:2
    Thread-2结束:1469151758504
    Thread-0打印:2
    Thread-3准备:1469151758504
    Thread-1打印:3
    Thread-3打印:0
    Thread-0打印:3
    Thread-0打印:4
    Thread-3打印:1
    Thread-1打印:4
    Thread-3打印:2
    Thread-0结束:1469151758505
    Thread-3打印:3
    Thread-4准备:1469151758505
    Thread-1结束:1469151758505
    Thread-4打印:0
    Thread-4打印:1
    Thread-3打印:4
    Thread-4打印:2
    Thread-5准备:1469151758505
    Thread-4打印:3
    Thread-3结束:1469151758505
    Thread-4打印:4
    Thread-5打印:0
    Thread-4结束:1469151758505
    Thread-6准备:1469151758505
    Thread-7准备:1469151758505
    Thread-6打印:0
    Thread-5打印:1
    Thread-5打印:2
    Thread-6打印:1
    Thread-7打印:0
    Thread-6打印:2
    Thread-5打印:3
    Thread-6打印:3
    Thread-7打印:1
    Thread-6打印:4
    Thread-5打印:4
    Thread-6结束:1469151758506
    Thread-7打印:2
    Thread-8准备:1469151758506
    Thread-5结束:1469151758506
    Thread-8打印:0
    Thread-9准备:1469151758506
    Thread-7打印:3
    Thread-9打印:0
    Thread-8打印:1
    Thread-8打印:2
    Thread-9打印:1
    Thread-7打印:4
    Thread-9打印:2
    Thread-9打印:3
    Thread-9打印:4
    Thread-8打印:3
    Thread-8打印:4
    Thread-9结束:1469151758507
    Thread-7结束:1469151758506
    Thread-8结束:1469151758507
    

    可见,在某一时刻最多有三个线程同时在执行。


    多进路-单处理-多出路实验

    对以上代码做如下修改:

    public class Service {
    	private Semaphore semaphore = new Semaphore(3);
    	private ReentrantLock lock = new ReentrantLock();
    	public void sayHello() {
    		try {
    			semaphore.acquire();
    			System.out.println(Thread.currentThread().getName() + "准备:" + System.currentTimeMillis());
    			lock.lock();//加锁
    			for (int i = 0; i < 5; i++) {
    				System.out.println(Thread.currentThread().getName() + "打印:" + i);
    			}
    			System.out.println(Thread.currentThread().getName() + "结束:" + System.currentTimeMillis());
    			lock.unlock();
    			semaphore.release();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    运行程序,控制台打印结果如下:

    Thread-1准备:1469151895747
    Thread-0准备:1469151895747
    Thread-2准备:1469151895747
    Thread-1打印:0
    Thread-1打印:1
    Thread-1打印:2
    Thread-1打印:3
    Thread-1打印:4
    Thread-1结束:1469151895748
    Thread-0打印:0
    Thread-3准备:1469151895748
    Thread-0打印:1
    Thread-0打印:2
    Thread-0打印:3
    Thread-0打印:4
    Thread-0结束:1469151895748
    Thread-2打印:0
    Thread-4准备:1469151895748
    Thread-2打印:1
    Thread-2打印:2
    Thread-2打印:3
    Thread-2打印:4
    Thread-2结束:1469151895748
    Thread-3打印:0
    Thread-5准备:1469151895748
    Thread-3打印:1
    Thread-3打印:2
    Thread-3打印:3
    Thread-3打印:4
    Thread-3结束:1469151895748
    Thread-6准备:1469151895748
    Thread-4打印:0
    Thread-4打印:1
    Thread-4打印:2
    Thread-4打印:3
    Thread-4打印:4
    Thread-4结束:1469151895749
    Thread-5打印:0
    Thread-7准备:1469151895749
    Thread-5打印:1
    Thread-5打印:2
    Thread-5打印:3
    Thread-5打印:4
    Thread-5结束:1469151895749
    Thread-6打印:0
    Thread-8准备:1469151895749
    Thread-6打印:1
    Thread-6打印:2
    Thread-6打印:3
    Thread-6打印:4
    Thread-6结束:1469151895749
    Thread-7打印:0
    Thread-9准备:1469151895749
    Thread-7打印:1
    Thread-7打印:2
    Thread-7打印:3
    Thread-7打印:4
    Thread-7结束:1469151895749
    Thread-8打印:0
    Thread-8打印:1
    Thread-8打印:2
    Thread-8打印:3
    Thread-8打印:4
    Thread-8结束:1469151895750
    Thread-9打印:0
    Thread-9打印:1
    Thread-9打印:2
    Thread-9打印:3
    Thread-9打印:4
    Thread-9结束:1469151895750
    

    此时,某一时刻最多只有一个线程在运行,执行任务的顺序是同步的。


    使用Semaphore创建字符串池

    Semaphore类可以有效地对并发执行任务的线程数量进行限制,这个功能可以应用在pool池技术中,可以设置同时访问pool池中数据的线程数量。

    public class ListPool {
    	private int poolMaxSize = 3;
    	private int semaphorePermits = 5;
    	private List<String> list = new ArrayList<>();
    	private Semaphore concurrentSemaphore = new Semaphore(semaphorePermits);
    	private ReentrantLock lock = new ReentrantLock();
    	private Condition condition = lock.newCondition();
    	
    	public ListPool() {
    		super();
    		for (int i = 0; i < poolMaxSize; i++) {
    			list.add("test-" + (i + 1));
    		}
    	}
    	
    	public String get() {
    		String string = null;
    		try {
    			concurrentSemaphore.acquire();
    			lock.lock();
    			while (list.size() == 0) {
    				condition.await();
    			}
    			string = list.remove(0);
    			lock.unlock();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return string;
    	}
    	
    	public void put(String string) {
    		lock.lock();
    		list.add(string);
    		condition.signalAll();
    		lock.unlock();
    		concurrentSemaphore.release();
    	}
    }
    
    public class MyThread extends Thread {
    	private ListPool listPool;
    	public MyThread(ListPool listPool) {
    		super();
    		this.listPool = listPool;
    	}
    	@Override
    	public void run() {
    		for (int i = 0; i < Integer.MAX_VALUE; i++) {
    			String string = listPool.get();
    			System.out.println(Thread.currentThread().getName() + "取值:" + string);
    			listPool.put(string);
    		}
    	}
    }
    
    public class Main {
    	public static void main(String[] args) {
    		ListPool pool = new ListPool();
    		MyThread[] threads = new MyThread[12];
    		for (int i = 0; i < threads.length; i++) {
    			threads[i] = new MyThread(pool);
    		}
    		for (int i = 0; i < threads.length; i++) {
    			threads[i].start();;
    		}
    	}
    }
    

    程序运行结果如下:

    ......
    Thread-10取值:test-3
    Thread-10取值:test-3
    Thread-10取值:test-3
    Thread-10取值:test-3
    Thread-2取值:test-2
    Thread-10取值:test-3
    Thread-2取值:test-2
    Thread-8取值:test-1
    Thread-2取值:test-2
    Thread-10取值:test-3
    Thread-10取值:test-3
    Thread-10取值:test-3
    Thread-10取值:test-3
    Thread-10取值:test-3
    ......
    

    使用Semaphore实现多生产者/多消费者模式

    使用Semaphore可以限制生产者与消费者的数量。Semaphore提供了限制并发线程数的功能,synchronized不提供这个功能。

    public class Service {
    	volatile private Semaphore setSemaphore = new Semaphore(10);//厨师 生产者
    	volatile private Semaphore getSemaphore = new Semaphore(20);//就餐者 消费者
    	volatile private ReentrantLock lock = new ReentrantLock();
    	volatile private Condition setCondition = lock.newCondition();
    	volatile private Condition getCondition = lock.newCondition();
    	volatile private Object[] producePosition = new Object[4];//4个盒子存放菜品
    	
    	private boolean isEmpty() {
    		boolean isEmpty = true;
    		for (int i = 0; i < producePosition.length; i++) {
    			if (producePosition[i] != null) {
    				isEmpty = false;
    				break;
    			}
    		}
    		return isEmpty;
    	}
    	
    	private boolean isFull() {
    		boolean isFull = true;
    		for (int i = 0; i < producePosition.length; i++) {
    			if (producePosition[i] == null) {
    				isFull = false;
    				break;
    			}
    		}
    		return isFull;
    	}
    	
    	public void set() {//生产
    		try {
    			setSemaphore.acquire();//最多允许10个厨师同时生产
    			lock.lock();
    			while (isFull()) {
    				setCondition.await();
    			}
    			for (int i = 0; i < producePosition.length; i++) {
    				if (producePosition[i] == null) {
    					producePosition[i] = "数据";
    					System.out.println(Thread.currentThread().getName() + "生产了:" + producePosition[i]);
    					break;
    				}
    			}
    			getCondition.signalAll();
    			lock.unlock();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			setSemaphore.release();
    		}
    	}
    	
    	public void get() {//消费
    		try {
    			getSemaphore.acquire();
    			lock.lock();
    			while (isEmpty()) {
    				getCondition.await();
    			}
    			for (int i = 0; i < producePosition.length; i++) {
    				if (producePosition[i] != null) {
    					System.out.println(Thread.currentThread().getName() + "消费了:" + producePosition[i]);
    					producePosition[i] = null;
    					break;
    				}
    			}
    			setCondition.signalAll();
    			lock.unlock();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			getSemaphore.release();
    		}
    	}
    }
    
    public class ThreadP extends Thread {
    	private Service service;
    	public ThreadP(Service service) {
    		super();
    		this.service = service;
    	}
    	@Override
    	public void run() {
    		service.set();
    	}
    }
    
    public class ThreadC extends Thread {
    	private Service service;
    	public ThreadC(Service service) {
    		super();
    		this.service = service;
    	}
    	@Override
    	public void run() {
    		service.get();
    	}
    }
    
    public class Main {
    	public static void main(String[] args) throws InterruptedException {
    		Service service = new Service();
    		ThreadP[] threadPs = new ThreadP[60];
    		ThreadC[] threadCs = new ThreadC[60];
    		for (int i = 0; i < threadCs.length; i++) {
    			threadPs[i] = new ThreadP(service);
    			threadCs[i] = new ThreadC(service);
    		}
    		Thread.sleep(2000);
    		for (int i = 0; i < threadCs.length; i++) {
    			threadPs[i].start();;
    			threadCs[i].start();;
    		}
    	}
    }
    

    运行程序,控制台打印结果如下:

    ......
    Thread-19消费了:数据
    Thread-4生产了:数据
    Thread-20生产了:数据
    Thread-5消费了:数据
    Thread-23消费了:数据
    Thread-24生产了:数据
    Thread-6生产了:数据
    Thread-7消费了:数据
    Thread-28生产了:数据
    ......
    
  • 相关阅读:
    企业nginx应用实例(功能拆分记录)
    Squid代理服务器的安装与配置
    关于mysql主从架构master宕机后,请求转移问题解决办法
    记一次innobackupex备份恢复数据库过程
    elasticsearch6.6及其插件安装记录(较详细)
    redis一主二从加哨兵
    nginx反向代理proxy_pass的问题
    spring程序打包war,直接通过-jar启动,并指定spring.profiles.active参数控制多环境配置
    Debian下配置防火墙iptables
    内连接和外连接
  • 原文地址:https://www.cnblogs.com/umgsai/p/5671625.html
Copyright © 2011-2022 走看看