zoukankan      html  css  js  c++  java
  • Java多线程基础知识例子

    一、管理

    1、创建线程

    Thread
    public class Main {
    
    	public static void main(String[] args) {
    		
    		MyThread myThread = new MyThread();
    		myThread.start();
    	}
    }
    
    /**
     * 继承Thread来创建线程
     * 缺点是不能再继承其他的类了
     */
    public class MyThread extends Thread {
    	
    	@Override
    	public void run() {
    		System.out.println("继承Thread");
    	}
    }
    
    Runnable
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Thread myThread = new Thread(new MyThread());
    		myThread.start();
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	@Override
    	public void run() {
    		System.out.println("实现Runnable");
    	}
    }
    
    ThreadPool
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		//newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    		//newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    		//newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
    		//newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    		ExecutorService executorService = Executors.newFixedThreadPool(5);
    		
    		MyThread thread = new MyThread();
    		executorService.execute(thread);
    		
    		System.out.println("线程池创建thread");
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	@Override
    	public void run() {
    		System.out.println("实现Runnable");
    	}
    }
    
    Callable
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		MyThread myThread = new MyThread();
    		FutureTask<Integer> result = new FutureTask<Integer>(myThread);
    		Thread thread = new Thread(result);
    		thread.start();
    		
    		try {
    			System.out.println(result.get());
    			System.out.println("end");
    		} catch (InterruptedException | ExecutionException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 实现Callable来创建线程并返回结果
     * 优点是可以返回结果,可以捕获异常
     */
    public class MyThread implements Callable<Integer> {
    	
    	@Override
    	public Integer call() {
    		System.out.println("实现Callable");
    		try {
    			TimeUnit.SECONDS.sleep(3);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		return 233;
    	}
    }
    

    2、状态切换

    import java.util.concurrent.TimeUnit;
    
    /**
     * 线程状态:NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED
     */
    public class Main {
    
    	public static void main(String[] args) {
    		
    		try {
    			
    			Object lock = new Object();
    			System.out.println("线程运行:创建线程");
    			Thread myThread = new Thread(new MyThread(lock));
    			TimeUnit.SECONDS.sleep(1);
    			
    			//NEW
    			System.out.println("线程状态:" + myThread.getState());
    			TimeUnit.SECONDS.sleep(1);
    			
    			//启动线程然后等待lock
    			System.out.println("线程运行:启动线程");
    			myThread.start();
    			TimeUnit.SECONDS.sleep(2);
    			
    			//WAITING
    			System.out.println("线程状态:" + myThread.getState());
    			TimeUnit.SECONDS.sleep(1);
    			
    			//启动解锁线程唤醒lock
    			//TIME_WAITING
    			Thread unLockThread = new Thread(new UnLockThread(lock));
    			unLockThread.start();
    			TimeUnit.SECONDS.sleep(2);
    			System.out.println("线程状态:" + myThread.getState());
    			
    			TimeUnit.SECONDS.sleep(1);
    			Thread lockThread = new Thread(new LockThread(lock));
    			lockThread.start();
    			
    			//BLOCKED
    			TimeUnit.SECONDS.sleep(2);
    			System.out.println("线程状态:" + myThread.getState());
    			
    			//RUNNABLE
    			TimeUnit.SECONDS.sleep(4);
    			System.out.println("线程状态:" + myThread.getState());
    			
    			TimeUnit.SECONDS.sleep(1);
    			System.out.println("线程运行:中断线程");
    			myThread.interrupt();
    			
    			//TERMINATED
    			TimeUnit.SECONDS.sleep(2);
    			System.out.println("线程状态:" + myThread.getState());
    		
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	private Object lock;
    	
    	public MyThread(Object lock) {
    		this.lock = lock;
    	}
    	
    	@Override
    	public void run() {
    		try {
    			TimeUnit.SECONDS.sleep(1);
    			synchronized (lock) {
    				System.out.println("线程运行:进入等待lock");
    				lock.wait();
    			}
    			
    			TimeUnit.SECONDS.sleep(1);
    			System.out.println("线程运行:进入超时等待");
    			
    			TimeUnit.SECONDS.sleep(3);
    			System.out.println("线程运行:进入阻塞lock");
    			synchronized (lock) {
    				TimeUnit.SECONDS.sleep(1);
    				System.out.println("线程运行:获得锁");
    			}
    			TimeUnit.SECONDS.sleep(1);
    			System.out.println("线程运行:无限运行");
    			boolean flag = true;
    			do {
    				if(Thread.interrupted()) {
    					flag = false;
    					//System.out.println("线程运行:线程中断");
    				}
    			} while(flag);
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println("线程运行:结束执行");
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class LockThread implements Runnable {
    	
    private Object lock;
    	
    	public LockThread(Object lock) {
    		this.lock = lock;
    	}
    	
    	@Override
    	public void run() {
    		System.out.println("线程运行:LockThread占有锁3秒");
    		synchronized (lock) {
    			try {
    				TimeUnit.SECONDS.sleep(3);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			System.out.println("线程运行:LockThread释放锁");
    		}
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class UnLockThread implements Runnable {
    	
    private Object lock;
    	
    	public UnLockThread(Object lock) {
    		this.lock = lock;
    	}
    	
    	@Override
    	public void run() {
    		System.out.println("线程运行:UnlockThread唤醒lock");
    		synchronized (lock) {
    			lock.notify();
    		}
    	}
    }
    

    3、守护线程

    import java.util.concurrent.TimeUnit;
    
    /**
     * 守护线程会在所有线程结束后结束
     */
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Thread daemonThread = new Thread(new DaemonThread());
    		daemonThread.setDaemon(true);
    		daemonThread.start();
    		
    		Thread myThread = new Thread(new MyThread());
    		myThread.start();
    		
    		try {
    			//等待线程结束
    			myThread.join();
    		} catch (InterruptedException e1) {
    			e1.printStackTrace();
    		}
    		
    		try {
    			TimeUnit.SECONDS.sleep(5);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println("主线程便当");
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	@Override
    	public void run() {
    		for (int i = 0; i < 10; i++) {
    			try {
    				TimeUnit.SECONDS.sleep(1);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			System.out.println("子线程存活确认");
    		}
    		System.out.println("子线程便当");
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class DaemonThread implements Runnable {
    	
    	@Override
    	public void run() {
    		while (true) {
    			try {
    				TimeUnit.SECONDS.sleep(1);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			System.out.println("守护线程存活确认");
    		}
    	}
    }
    

    4、异常捕获

    UncaughtExceptionHandler
    import org.sy.lab.Java7并发编程实战.一线程管理.八线程中不可控异常的处理.ExceptionHandler;
    
    //可捕获的异常分为非运行时异常和运行时异常
    //非运行时异常必须在方法的生命体里throw或捕获
    //运行时异常发生时如果没有捕获,只会在控制台输出堆栈信息,所以一般需要给线程设置未捕获异常处理器去捕获和处理异常
    //顺序:线程的未捕获异常处理器 -> 线程组的未捕获异常处理器 -> 全局线程的未捕获异常处理器
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Thread myThread = new Thread(new MyThread());
    		myThread.setUncaughtExceptionHandler(new ExceptionHandler());
    		myThread.start();
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	@Override
    	public void run() {
    		int a = Integer.parseInt("TTT");
    	}
    }
    
    import java.lang.Thread.UncaughtExceptionHandler;
    
    public class ExceptionHandler implements UncaughtExceptionHandler {
    
    	@Override
    	public void uncaughtException(Thread t, Throwable e) {
    		System.out.printf("An exception has been captured
    ");
    		System.out.printf("Thread: %s
    ", t.getId());
    		System.out.printf("Exception: %s: %s
    ", e.getClass().getName(), e.getMessage());
    		System.out.printf("Stack Trace: 
    ");
    		e.printStackTrace();
    		System.out.printf("Thread status: %s
    ", t.getState());
    	}
    }
    
    线程组uncaughtException
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		MyThread myThread = new MyThread();
    		MyThreadGroup group = new MyThreadGroup("xs");
    		for(int i=0; i<5; i++) {
    			Thread thread = new Thread(group, myThread);
    			thread.setUncaughtExceptionHandler(new ExceptionHandler());
    			thread.start();
    		}
    		
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		
    		System.out.println("线程数量:" + group.activeCount());
    		
    		group.list();
    		
    		Thread[] threads = new Thread[group.activeCount()];
    		group.enumerate(threads);
    		for (int i = 0; i < threads.length; i++) {
    			System.out.println("线程" + threads[i].getName() + "状态:" + threads[i].getState() + " id:" + threads[i].getId());
    		}
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	@Override
    	public void run() {
    		System.out.println("实现Runnable");
    		try {
    			TimeUnit.SECONDS.sleep(2);
    			Integer a = Integer.valueOf("a");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    public class MyThreadGroup extends ThreadGroup {
    
    	public MyThreadGroup(String name) {
    		super(name);
    	}
    	
    	@Override
    	public void uncaughtException(Thread t, Throwable e) {
    		System.out.println("捕获到线程组的异常,异常线程:" + t.getName());
    		e.printStackTrace();
    		interrupt();
    	}
    }
    
    import java.lang.Thread.UncaughtExceptionHandler;
    
    public class ExceptionHandler implements UncaughtExceptionHandler {
    
    	@Override
    	public void uncaughtException(Thread t, Throwable e) {
    		System.out.printf("An exception has been captured
    ");
    		System.out.printf("Thread: %s
    ", t.getId());
    		System.out.printf("Exception: %s: %s
    ", e.getClass().getName(), e.getMessage());
    		System.out.printf("Stack Trace: 
    ");
    		e.printStackTrace();
    		System.out.printf("Thread status: %s
    ", t.getState());
    	}
    }
    
    Future
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		MyThread myThread = new MyThread();
    		FutureTask<Integer> result = new FutureTask<Integer>(myThread);
    		Thread thread = new Thread(result);
    		thread.start();
    		
    		try {
    			TimeUnit.SECONDS.sleep(3);
    			System.out.println(result.get());
    		} catch (InterruptedException | ExecutionException e) {
    			System.out.println("捕获到异常");
    			e.printStackTrace();
    		}
    	}
    }
    
    import java.util.concurrent.Callable;
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Callable<Integer> {
    	
    	@Override
    	public Integer call() throws Exception {
    		int a = Integer.parseInt("TTT");
    		return a;
    	}
    }
    

    5、局部变量

    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		MyThread unsafeThread = new MyThread();
    		for (int i = 0; i < 5; i++) {
    			Thread myThread = new Thread(unsafeThread);
    			myThread.start();
    			try {
    				TimeUnit.SECONDS.sleep(1);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	//在当前的线程中保存变量的副本
    	//ThreadLocal.get: 获取ThreadLocal中当前线程共享变量的值
    	//ThreadLocal.set: 设置ThreadLocal中当前线程共享变量的值
    	//ThreadLocal.remove: 移除ThreadLocal中当前线程共享变量的值
    	//ThreadLocal.initialValue: ThreadLocal没有被当前线程赋值时或当前线程刚调用remove方法后调用get方法,返回此方法值
    	ThreadLocal<Integer> num = new ThreadLocal<Integer>() {
    		protected Integer initialValue() {
    			return 0;
    		};
    	};
    	
    	@Override
    	public void run() {
    		for (int i = 0; i < 10; i++) {
    			num.set(num.get() + 1);
    		}
    		System.out.println(num.get());
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class UnsafeThread implements Runnable {
    	
    	private Integer num = 0;
    	
    	@Override
    	public void run() {
    		for (int i = 0; i < 10; i++) {
    			num++;
    		}
    		System.out.println(num);
    	}
    }
    

    6、线程组

    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		MyThread myThread = new MyThread();
    		ThreadGroup group = new ThreadGroup("xs");
    		for(int i=0; i<5; i++) {
    			Thread thread = new Thread(group, myThread);
    			thread.start();
    		}
    		
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		
    		System.out.println("线程数量:" + group.activeCount());
    		
    		group.list();
    		
    		Thread[] threads = new Thread[group.activeCount()];
    		group.enumerate(threads);
    		for (int i = 0; i < threads.length; i++) {
    			System.out.println("线程" + threads[i].getName() + "状态:" + threads[i].getState() + " id:" + threads[i].getId());
    		}
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	@Override
    	public void run() {
    		System.out.println("实现Runnable");
    		try {
    			TimeUnit.SECONDS.sleep(2);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    7、

    public class Main {
    
    	public static void main(String[] args) {
    		
    		MyThread myThread = new MyThread();
    		MyThreadFactory factory = new MyThreadFactory();
    		Thread thread = factory.newThread(myThread);
    		thread.start();
     	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	@Override
    	public void run() {
    		System.out.println("实现Runnable");
    	}
    }
    
    import java.util.concurrent.ThreadFactory;
    
    public class MyThreadFactory implements ThreadFactory {
    
    	@Override
    	public Thread newThread(Runnable r) {
    		
    		System.out.println("工厂创建线程");
    		Thread thread = new Thread(r);
    		return thread;
    	}
    }
    

    二、同步

    1、synchronized

    锁的对象
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		//UnsafeTick tick = new UnsafeTick();
    		
    		//锁住该对象
    		//public synchronized void method { } 方法锁
    		//synchronized(this) { } 代码锁
    		//Tick1 tick = new Tick1();
    		
    		//锁住该类
    		//public synchronized static void method { } 方法锁
    		//synchronized(Test.class) { } 代码锁
    		//Tick2 tick = new Tick2();
    		
    		//锁住非依赖属性
    		//synchronized(o) {} 代码锁
    		Tick3 tick = new Tick3();
    		
    		MyThread myThread = new MyThread(tick);
    		
    		for (int i = 0; i < 10; i++) {
    			Thread thread = new Thread(myThread);
    			thread.start();
    		}
    		
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(tick.getNum());
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	private ITick tick;
    	
    	public MyThread(ITick tick) {
    		this.tick = tick;
    	}
    	
    	@Override
    	public void run() {
    		
    		for (int i = 0; i < 10; i++) {
    			tick.subtract();
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    public interface ITick {
    	
    	public void add();
    	
    	public void subtract();
    	
    	public int getNum();
    }
    
    public class Tick1 implements ITick {
    	
    	private int num = 100;
    	
    	@Override
    	public synchronized void add() {
    		num++;
    	}
    	
    	@Override
    	public synchronized void subtract() {
    		num--;
    	}
    	
    	@Override
    	public int getNum() {
    		return num;
    	}
    }
    
    public class Tick2 implements ITick {
    	
    	private int num = 100;
    	
    	@Override
    	public void add() {
    		synchronized(ITick.class) {
    			num++;
    		}
    	}
    	
    	@Override
    	public void subtract() {
    		synchronized(ITick.class) {
    			num--;
    		}
    	}
    	
    	@Override
    	public int getNum() {
    		return num;
    	}
    }
    
    public class Tick3 implements ITick {
    	
    	private int num = 100;
    	private Object obj = new Object();
    	
    	@Override
    	public void add() {
    		synchronized(obj) {
    			num++;
    		}
    	}
    	
    	@Override
    	public void subtract() {
    		synchronized(obj) {
    			num--;
    		}
    	}
    	
    	@Override
    	public int getNum() {
    		return num;
    	}
    }
    
    public class UnsafeTick implements ITick {
    	
    	private int num = 100;
    	
    	@Override
    	public void add() {
    		num++;
    	}
    	
    	@Override
    	public void subtract() {
    		num--;
    	}
    	
    	@Override
    	public int getNum() {
    		return num;
    	}
    }
    
    使用条件
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Storage storage = new Storage();
    		
    		Producer producer1 = new Producer(storage, 10);
    		Producer producer2 = new Producer(storage, 10);
    		Producer producer3 = new Producer(storage, 10);
    		Producer producer4 = new Producer(storage, 50);
    		Producer producer5 = new Producer(storage, 10);
    		Producer producer6 = new Producer(storage, 10);
    		Producer producer7 = new Producer(storage, 50);
    		Producer producer8 = new Producer(storage, 10);
    		Producer producer9 = new Producer(storage, 10);
    		Producer producer10 = new Producer(storage, 10);
    		
    		Consumer consumer1 = new Consumer(storage, 50);
    		Consumer consumer2 = new Consumer(storage, 20);
    		Consumer consumer3 = new Consumer(storage, 30);
    		
    		Thread thread1 = new Thread(producer1, "生产者1");
    		Thread thread2 = new Thread(producer2, "生产者2");
    		Thread thread3 = new Thread(producer3, "生产者3");
    		Thread thread4 = new Thread(producer4, "生产者4");
    		Thread thread5 = new Thread(producer5, "生产者5");
    		Thread thread6 = new Thread(producer6, "生产者6");
    		Thread thread7 = new Thread(producer7, "生产者7");
    		Thread thread8 = new Thread(producer8, "生产者8");
    		Thread thread9 = new Thread(producer9, "生产者9");
    		Thread thread10 = new Thread(producer10, "生产者10");
    		Thread thread11 = new Thread(consumer1, "消费者1");
    		Thread thread12 = new Thread(consumer2, "消费者2");
    		Thread thread13 = new Thread(consumer3, "消费者3");
    		
    		thread11.start();
    		thread12.start();
    		thread13.start();
    		thread1.start();
    		thread2.start();
    		thread3.start();
    		thread4.start();
    		thread5.start();
    		thread6.start();
    		thread7.start();
    		thread8.start();
    		thread9.start();
    		thread10.start();
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	@Override
    	public void run() {
    		System.out.println("实现Runnable");
    	}
    }
    
    public class Consumer implements Runnable {
    	
    	private Storage storage;
    	private int num;
    	
    	public Consumer(Storage storage, int num) {
    		this.storage = storage;
    		this.num = num;
    	}
    	
    	@Override
    	public void run() {
    		storage.subtract(num);
    	}
    }
    
    public class Producer implements Runnable {
    	
    	private Storage storage;
    	private int num;
    	
    	public Producer(Storage storage, int num) {
    		this.storage = storage;
    		this.num = num;
    	}
    	
    	@Override
    	public void run() {
    		storage.add(num);
    	}
    }
    
    /**
     * 仓库
     */
    public class Storage {
    	
    	private int max = 100;
    	private int num = 0;
    	
    	public synchronized void add(int n) {
    		
    		while(num + n > max) {
    			try {
    				System.out.println(Thread.currentThread().getName() + "  要增加:" + n + "  仓库存量:" + num + "  无法加入,进入等待");
    				wait();
    				System.out.println(Thread.currentThread().getName() + "  被唤醒");
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		num += n;
    		System.out.println(Thread.currentThread().getName() + "  已增加:" + n + "  现仓库存量:" + num);
    		notifyAll();
    	}
    	
    	public synchronized void subtract(int n) {
    		
    		while(num - n < 0) {
    			try {
    				System.out.println(Thread.currentThread().getName() + "  要取出:" + n + "  仓库存量:" + num + "  无法取出,进入等待");
    				wait();
    				System.out.println(Thread.currentThread().getName() + "  被唤醒");
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		num -= n;
    		System.out.println(Thread.currentThread().getName() + "  已取出:" + n + "  现仓库存量:" + num);
    		notifyAll();
    	}
    	
    	public int getNum() {
    		return num;
    	}
    }
    

    2、ReentrantLock

    创建锁
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Tick tick = new Tick();
    		
    		MyThread myThread = new MyThread(tick);
    		
    		for (int i = 0; i < 10; i++) {
    			Thread thread = new Thread(myThread);
    			thread.start();
    		}
    		
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(tick.getNum());
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	private Tick tick;
    	
    	public MyThread(Tick tick) {
    		this.tick = tick;
    	}
    	
    	@Override
    	public void run() {
    		
    		for (int i = 0; i < 10; i++) {
    			tick.subtract();
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Tick {
    	
    	private int num = 100;
    	private Lock lock = new ReentrantLock();
    	
    	public void add() {
    		lock.lock();
    		num++;
    		lock.unlock();
    	}
    	
    	public void subtract() {
    		lock.lock();
    		num--;
    		lock.unlock();
    	}
    	
    	public int getNum() {
    		return num;
    	}
    }
    
    读写锁
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Counter counter = new Counter();
    		Read read = new Read(counter);
    		Write write = new Write(counter);
    		
    		Thread thread1 = new Thread(read);
    		thread1.start();
    		
    		Thread thread2 = new Thread(write);
    		thread2.start();
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class Counter {
    	
    	private int num = 10;
    	private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    	
    	public void add() {
    		lock.writeLock().lock();
    		System.out.println("--开始写入");
    		try {
    			for (int i = 0; i < 5; i++) {
    				num++;
    				try {
    					TimeUnit.SECONDS.sleep(1);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			System.out.println("--结束写入");
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			lock.writeLock().unlock();
    		}
    	}
    	
    	public int read() {
    		lock.readLock().lock();
    		try {
    			System.out.println("读取:" + num);
    			return num;
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			lock.readLock().unlock();
    		}
    		return 0;
    	}
    	
    	public int unsafeRead() {
    		System.out.println("读取:" + num);
    		return num;
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class Read implements Runnable {
    	
    	private Counter counter;
    	
    	public Read(Counter counter) {
    		this.counter = counter;
    	}
    	
    	@Override
    	public void run() {
    		
    		for (int i = 0; i < 10; i++) {
    			counter.read();
    			try {
    				TimeUnit.SECONDS.sleep(1);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class Write implements Runnable {
    	
    	private Counter counter;
    	
    	public Write(Counter counter) {
    		this.counter = counter;
    	}
    	
    	@Override
    	public void run() {
    		try {
    			TimeUnit.SECONDS.sleep(3);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		
    		counter.add();
    	}
    }
    
    公平锁
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Tick tick = new Tick();
    		
    		MyThread myThread = new MyThread(tick);
    		
    		for (int i = 0; i < 5; i++) {
    			Thread thread = new Thread(myThread);
    			thread.start();
    			try {
    				Thread.sleep(1);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    /**
     * 实现Runnable来创建线程
     * 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
     */
    public class MyThread implements Runnable {
    	
    	private Tick tick;
    	
    	public MyThread(Tick tick) {
    		this.tick = tick;
    	}
    	
    	@Override
    	public void run() {
    		tick.add();
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Tick {
    	
    	private Lock lock = new ReentrantLock(false);
    	
    	public void add() {
    		lock.lock();
    		System.out.println(Thread.currentThread().getName() + "进入锁");
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		//System.out.println(Thread.currentThread().getName() + "离开锁");
    		lock.unlock();
    		
    		lock.lock();
    		System.out.println(Thread.currentThread().getName() + "进入锁2");
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		//System.out.println(Thread.currentThread().getName() + "离开锁2");
    		lock.unlock();
    	}
    }
    
    使用条件
    public class Main {
    
    	//酒吧里有100瓶老酒,吧台有3个位置,1次只能喝1瓶
    	//1个酒保把老酒拿到吧台
    	//5个酒鬼在吧台抢酒喝
    	public static void main(String[] args) {
    		
    		Bar bar = new Bar();
    		Counter counter = new Counter();
    		Bartender bartender = new Bartender(bar, counter);
    		Drunkard drunkard = new Drunkard(counter);
    		
    		Thread thread = new Thread(bartender, "酒保");
    		thread.start();
    		
    		for (int i = 1; i <= 5; i++) {
    			Thread thread1 = new Thread(drunkard, "酒鬼" + i);
    			thread1.start();
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    public class Bar {
    	
    	private int beer = 100;
    	
    	public void getBeer() {
    		beer--;
    		System.out.println(Thread.currentThread().getName() + "取走1瓶酒,酒窖剩余:" + beer);
    	}
    	
    	public boolean hasBeer() {
    		return beer>0 ? true:false;
    	}
    }
    
    public class Bartender implements Runnable {
    	
    	private Bar bar;
    	private Counter counter;
    	
    	public Bartender(Bar bar, Counter counter) {
    		this.bar = bar;
    		this.counter = counter;
    	}
    
    	@Override
    	public void run() {
    		while (bar.hasBeer()) {
    			bar.getBeer();
    			counter.setBeer();
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		counter.setOpen(false);
    	}
    }
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Counter {
    	
    	private int beer = 0;
    	private int max = 3;
    	private Lock lock = new ReentrantLock(false);
    	private Condition set = lock.newCondition();
    	private Condition get = lock.newCondition();
    	private boolean open = true;
    	
    	public void setBeer() {
    		lock.lock();
    		try {
    			while(beer == max) {
    				set.await();
    			}
    			beer++;
    			get.signalAll();
    			System.out.println(Thread.currentThread().getName() + "拿出1瓶酒,吧台老酒数量:" + beer);
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			lock.unlock();
    		}
    	}
    	
    	public void getBeer() {
    		lock.lock();
    		try {
    			while(beer == 0) {
    				get.await();
    			}
    			beer--;
    			set.signalAll();
    			System.out.println("--" + Thread.currentThread().getName() + "喝掉1瓶酒,吧台老酒数量:" + beer);
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			lock.unlock();
    		}
    	}
    	
    	public boolean hasBeer() {
    		return beer>0 ? true:false;
    	}
    
    	public boolean isOpen() {
    		return open;
    	}
    
    	public void setOpen(boolean open) {
    		this.open = open;
    	}
    }
    
    public class Drunkard implements Runnable {
    	
    	private Counter counter;
    	
    	public Drunkard(Counter counter) {
    		this.counter = counter;
    	}
    
    	@Override
    	public void run() {
    		while (counter.isOpen() || counter.hasBeer()) {
    			counter.getBeer();
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }

    三、同步辅助类

    1、Semaphore信号量

    //Semaphore有限流的效果
    //10个文件,3台打印机,同时最多有3个文件处于正在打印的状态
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Queue queue = new Queue();
    		MyThread myThread = new MyThread(queue);
    		
    		for (int i = 0; i < 10; i++) {
    			Thread thread = new Thread(myThread, "打印机" + i);
    			thread.start();
    			try {
    				Thread.sleep(1);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    public class MyThread implements Runnable {
    	
    	private Queue queue;
    	
    	public MyThread(Queue queue) {
    		this.queue = queue;
    	}
    	
    	@Override
    	public void run() {
    		queue.print(Thread.currentThread().getName());
    	}
    }
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    public class Queue {
    	
    	private Semaphore semaphore = new Semaphore(3);
    	
    	public void print(String name) {
    		
    		try {
    			semaphore.acquire();
    			System.out.println(name + "开始打印");
    			TimeUnit.SECONDS.sleep(1);
    			System.out.println("--" + name + "打印结束");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			semaphore.release();
    		}
    	}
    }
    

    2、CountDownLatch门阀

    import java.util.concurrent.TimeUnit;
    
    //CountDownLatch可以提供类似join的功能
    //await等待其他线程,在计数减为0的时候继续执行
    //例子里10个人参与众筹,在6个人参加后众筹成功
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Crowdfunding crowdfunding = new Crowdfunding(6);
    		Thread thread = new Thread(crowdfunding);
    		thread.start();
    		
    		MyThread myThread = new MyThread(crowdfunding);
    		for (int i = 1; i <= 10; i++) {
    			Thread thread2 = new Thread(myThread, "客户" + i);
    			thread2.start();
    			try {
    				TimeUnit.SECONDS.sleep(1);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    public class MyThread implements Runnable {
    	
    	private Crowdfunding crowdfunding;
    	
    	public MyThread(Crowdfunding crowdfunding) {
    		this.crowdfunding = crowdfunding;
    	}
    	
    	@Override
    	public void run() {
    		crowdfunding.join(Thread.currentThread().getName());
    	}
    }
    
    import java.util.concurrent.CountDownLatch;
    
    public class Crowdfunding implements Runnable {
    	
    	private CountDownLatch countDownLatch;
    	
    	public Crowdfunding(int num) {
    		countDownLatch = new CountDownLatch(num);
    	}
    	
    	public void join(String name) {
    		
    		countDownLatch.countDown();
    		System.out.println(name + "参与众筹,还需要数量:" + countDownLatch.getCount());
    	}
    
    	@Override
    	public void run() {
    		System.out.println("众筹开始,需求数量:" + countDownLatch.getCount());
    		try {
    			countDownLatch.await();
    			Thread.sleep(10);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println("众筹成功");
    	}
    }
    

    3、CyclicBarrier同步屏障

    import java.util.concurrent.TimeUnit;
    
    //CyclicBarrier允许两个或者多个线程在某个点上进行同步
    //可以传入另一个Runnable对象作为初始化参数,当所有线程都到达集合点后,CyclicBarrier类将这个Runnable对象作为线程优先去执行
    //比CountDownLatch功能强大,支持用reset()方法重置,getNumberWaiting()可以获得阻塞的线程数量,isBroken()判断是否被中断
    //例子里5个人参与众筹,在5个人参加后众筹成功,生产完产品,5个人结束众筹
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Product product = new Product();
    		Crowdfunding crowdfunding = new Crowdfunding(5, product);
    		
    		MyThread myThread = new MyThread(crowdfunding);
    		for (int i = 1; i <= 5; i++) {
    			Thread thread = new Thread(myThread, "客户" + i);
    			thread.start();
    			try {
    				TimeUnit.SECONDS.sleep(1);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    public class MyThread implements Runnable {
    	
    	private Crowdfunding crowdfunding;
    	
    	public MyThread(Crowdfunding crowdfunding) {
    		this.crowdfunding = crowdfunding;
    	}
    	
    	@Override
    	public void run() {
    		crowdfunding.join(Thread.currentThread().getName());
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class Product implements Runnable {
    
    	@Override
    	public void run() {
    		
    		try {
    			System.out.println("众筹成功,生产产品..");
    			TimeUnit.SECONDS.sleep(3);
    			System.out.println("产品发货");
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class Crowdfunding {
    	
    	private CyclicBarrier cyclicBarrier;
    	
    	public Crowdfunding(int num, Runnable runnable) {
    		cyclicBarrier = new CyclicBarrier(num, runnable);
    	}
    	
    	public void join(String name) {
    		
    		System.out.println("当前参与人数:" + cyclicBarrier.getNumberWaiting());
    		System.out.println(name + "参与众筹");
    		try {
    			cyclicBarrier.await();
    		} catch (InterruptedException | BrokenBarrierException e) {
    			e.printStackTrace();
    		}
    		System.out.println(name + "完成众筹");
    	}
    }
    

    4、Phaser阶段器

    //Phaser是一种可重用的同步屏障,比CountDownLatch和CyclicBarrier更灵活
    //Phaser默认不会进行异常处理,休眠的线程不会响应中断事件
    //arriveAndAwaitAdvance()等待参与者到达指定的数量
    //arriveAndDeregister注销当前线程,可以模拟CountDownLatch的功能
    //继承Phaser可以实现更多的触发行为,其中onAdvance()返回true表示deregister所有线程并解除阻塞
    //例子中模拟火箭分3个阶段升空的过程,有3个模块在每个阶段都要同时就绪,燃油要在前两个阶段就绪,然后在第2个阶段结束后回收
    public class Main {
    
    	public static void main(String[] args) {
    		
    		Rocket rocket = new Rocket();
    		
    		Module module1 = new Module(rocket);
    		Module module2 = new Module(rocket);
    		Module module3 = new Module(rocket);
    		Fuel fuel = new Fuel(rocket);
    		
    		Thread thread1 = new Thread(module1, "模块1");
    		Thread thread2 = new Thread(module2, "模块2");
    		Thread thread3 = new Thread(module3, "模块3");
    		Thread thread4 = new Thread(fuel, "燃料");
    		
    		thread1.start();
    		thread2.start();
    		thread3.start();
    		thread4.start();
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class Fuel implements Runnable {
    	
    private Rocket rocket = new Rocket();
    	
    	public Fuel(Rocket rocket) {
    		this.rocket = rocket;
    	}
    	
    	@Override
    	public void run() {
    		
    		try {
    			
    			TimeUnit.SECONDS.sleep((int)(Math.random()*5));
    			rocket.first(Thread.currentThread().getName());
    			
    			TimeUnit.SECONDS.sleep((int)(Math.random()*5));
    			rocket.secondRemove(Thread.currentThread().getName());
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class Module implements Runnable {
    
    	private Rocket rocket = new Rocket();
    	
    	public Module(Rocket rocket) {
    		this.rocket = rocket;
    	}
    	
    	@Override
    	public void run() {
    		
    		try {
    			
    			TimeUnit.SECONDS.sleep((int)(Math.random()*5));
    			rocket.first(Thread.currentThread().getName());
    			
    			TimeUnit.SECONDS.sleep((int)(Math.random()*5));
    			rocket.second(Thread.currentThread().getName());
    			
    			TimeUnit.SECONDS.sleep((int)(Math.random()*5));
    			rocket.third(Thread.currentThread().getName());
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    import java.util.concurrent.Phaser;
    
    public class MyPhaser extends Phaser {
    	
    	public MyPhaser(int parties) {
            super(null, parties);
        }
    	
    	@Override
    	protected boolean onAdvance(int phase, int registeredParties) {
    		switch (phase) {
    		case 0:
    			System.out.println("火箭第1阶段就绪,就绪部件数量:" + registeredParties + "
    ");
    			return false;
    		case 1:
    			System.out.println("火箭第2阶段就绪,就绪部件数量:" + registeredParties + "
    ");
    			return false;
    		case 2:
    			System.out.println("火箭第3阶段就绪,就绪部件数量:" + registeredParties + "
    ");
    			System.out.println("火箭升空
    ");
    			return false;
    		default:
    			return true;
    		}
    	}
    }
    
    public class Rocket {
    	
    	private MyPhaser phaser = new MyPhaser(4);
    	
    	public void first(String name) {
    		System.out.println(name + "在阶段1就绪");
    		phaser.arriveAndAwaitAdvance();
    	}
    	
    	public void second(String name) {
    		System.out.println("--" + name + "在阶段2就绪");
    		phaser.arriveAndAwaitAdvance();
    	}
    	
    	public void third(String name) {
    		System.out.println("----" + name + "在阶段3就绪");
    		phaser.arriveAndAwaitAdvance();
    	}
    	
    	public void secondRemove(String name) {
    		System.out.println("--" + name + "在阶段2就绪");
    		phaser.arriveAndAwaitAdvance();
    		
    		System.out.println("--" + name + "在阶段2结束后移除");
    		phaser.arriveAndDeregister();
    	}
    }
    

    5、Exchanger交换者

    import java.util.concurrent.Exchanger;
    
    //Exchanger用于两个线程在到达同步点时交换数据
    //可以用于遗传算法和校对工作
    //例子中对产生后的2个银行流水进行比较
    public class Main {
    	
    	public static void main(String[] args) {
    		
    		Exchanger<Integer> exchanger = new Exchanger<Integer>();
    		
    		Account1 account1 = new Account1(exchanger);
    		Account2 account2 = new Account2(exchanger);
    		
    		Thread thread1 = new Thread(account1, "银行账单1");
    		Thread thread2 = new Thread(account2, "银行账单2");
    		
    		thread1.start();
    		thread2.start();
    	}
    }
    
    import java.util.concurrent.Exchanger;
    import java.util.concurrent.TimeUnit;
    
    public class Account1 implements Runnable {
    
    	private Exchanger<Integer> exchanger;
    	
    	public Account1(Exchanger<Integer> exchanger) {
    		this.exchanger = exchanger;
    	}
    	
    	@Override
    	public void run() {
    		try {
    			TimeUnit.SECONDS.sleep(2);
    			System.out.println(Thread.currentThread().getName() + "已出账");
    			
    			Integer account = exchanger.exchange(233);
    			System.out.println(Thread.currentThread().getName() + "对方金额:" + account);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    import java.util.concurrent.Exchanger;
    import java.util.concurrent.TimeUnit;
    
    public class Account2 implements Runnable {
    
    	private Exchanger<Integer> exchanger;
    	
    	public Account2(Exchanger<Integer> exchanger) {
    		this.exchanger = exchanger;
    	}
    	
    	@Override
    	public void run() {
    		try {
    			TimeUnit.SECONDS.sleep(5);
    			System.out.println(Thread.currentThread().getName() + "已出账");
    			
    			Integer account = exchanger.exchange(666);
    			System.out.println(Thread.currentThread().getName() + "对方金额:" + account);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    四、执行器

    1、创建线程池

    newFixedThreadPool
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    
    //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
    public class Main {
    	
    	public static void main(String[] args) {
    		
    		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    		
    		for (int i = 0; i < 5; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		
    		for (int i = 5; i < 10; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(300);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		
    		for (int i = 10; i < 15; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		executor.shutdown();
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class MyThread implements Runnable {
    	
    	private String name;
    	
    	public MyThread(String name) {
    		this.name = name;
    	}
    	
    	@Override
    	public void run() {
    		
    		System.out.println(Thread.currentThread().getName() + "开始执行" + name);
    		try {
    			TimeUnit.SECONDS.sleep(5);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(Thread.currentThread().getName() + "执行结束");
    	}
    }
    
    newCachedThreadPool
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    
    import org.sy.lab.多线程基础._4_执行器._1_创建线程池._1_newFixedThreadPool.MyThread;
    
    //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
    public class Main {
    	
    	public static void main(String[] args) {
    		
    		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
    		
    		for (int i = 0; i < 5; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		
    		for (int i = 5; i < 10; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(300);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		
    		for (int i = 10; i < 15; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		executor.shutdown();
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class MyThread implements Runnable {
    	
    	private String name;
    	
    	public MyThread(String name) {
    		this.name = name;
    	}
    	
    	@Override
    	public void run() {
    		
    		System.out.println(Thread.currentThread().getName() + "开始执行" + name);
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(Thread.currentThread().getName() + "执行结束");
    	}
    }
    
    newSingleThreadExecutor
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.sy.lab.多线程基础._4_执行器._1_创建线程池._1_newFixedThreadPool.MyThread;
    
    //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
    public class Main {
    	
    	public static void main(String[] args) {
    		
    		ExecutorService executor = Executors.newSingleThreadExecutor();
    		
    		for (int i = 0; i < 5; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		
    		for (int i = 5; i < 10; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(300);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		
    		for (int i = 10; i < 15; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		executor.shutdown();
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class MyThread implements Runnable {
    	
    	private String name;
    	
    	public MyThread(String name) {
    		this.name = name;
    	}
    	
    	@Override
    	public void run() {
    		
    		System.out.println(Thread.currentThread().getName() + "开始执行" + name);
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(Thread.currentThread().getName() + "执行结束");
    	}
    }
    
    newScheduledThreadPool
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import org.sy.lab.多线程基础._4_执行器._1_创建线程池._1_newFixedThreadPool.MyThread;
    
    //创建一个定长线程池,支持定时及周期性任务执行
    public class Main {
    	
    	public static void main(String[] args) {
    		
    		ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(3);
    		
    		for (int i = 0; i < 5; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.schedule(myThread, 10, TimeUnit.SECONDS);
    		}
    		
    		for (int i = 5; i < 10; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(300);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		
    		for (int i = 10; i < 15; i++) {
    			MyThread myThread = new MyThread("任务" + (i+1));
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			executor.execute(myThread);
    		}
    		executor.shutdown();
    		try {
    			executor.awaitTermination(1, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class MyThread implements Runnable {
    	
    	private String name;
    	
    	public MyThread(String name) {
    		this.name = name;
    	}
    	
    	@Override
    	public void run() {
    		
    		System.out.println(Thread.currentThread().getName() + "开始执行" + name);
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(Thread.currentThread().getName() + "执行结束");
    	}
    }
    
    周期性任务
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import org.sy.lab.多线程基础._4_执行器._1_创建线程池._1_newFixedThreadPool.MyThread;
    
    //创建一个定长线程池,支持定时及周期性任务执行
    //scheduleAtFixedRate      第3个参数表示前一个任务的开始时间和后一个任务的开始时间间隔
    //scheduleWithFixedDelay 第3个参数表示前一个任务的结束时间和后一个任务的开始时间间隔
    public class Main {
    	
    	public static void main(String[] args) {
    		
    		ScheduledExecutorService  executor = Executors.newScheduledThreadPool(1);
    		
    		MyThread myThread = new MyThread("任务");
    		executor.scheduleAtFixedRate(myThread, 0, 5, TimeUnit.SECONDS);
    		//executor.scheduleWithFixedDelay(myThread, 0, 5, TimeUnit.SECONDS);
    		
    		executor.shutdown();
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class MyThread implements Runnable {
    	
    	private String name;
    	
    	public MyThread(String name) {
    		this.name = name;
    	}
    	
    	@Override
    	public void run() {
    		
    		System.out.println(Thread.currentThread().getName() + "开始执行" + name);
    		try {
    			TimeUnit.SECONDS.sleep(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(Thread.currentThread().getName() + "执行结束");
    	}
    }
    

    2、Future线程控制

    返回结果
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
    		
    		MyCallable myCallable = new MyCallable();
    		
    		Future<String> result = executor.submit(myCallable);
    		try {
    			System.out.println(result.get());
    		} catch (InterruptedException | ExecutionException e) {
    			e.printStackTrace();
    		}
    		executor.shutdown();
    	}
    }
    
    import java.util.concurrent.Callable;
    
    public class MyCallable implements Callable<String> {
    
    	@Override
    	public String call() throws Exception {
    		
    		return "这是callable返回的结果";
    	}
    }
    
    处理第一个结果
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    
    import org.sy.lab.多线程基础._4_执行器._2_Future线程控制._3_处理所有结果.MyCallable;
    
    //invokeAny 返回第一个完成没有抛异常的任务结果
    public class Main {
    
    	public static void main(String[] args) {
    		
    		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
    		
    		MyCallable myCallable = new MyCallable(5, "任务1");
    		MyCallable myCallable2 = new MyCallable(2, "任务2");
    		
    		List<MyCallable> list =new ArrayList<MyCallable>();
    		list.add(myCallable);
    		list.add(myCallable2);
    		
    		try {
    			System.out.println(executor.invokeAny(list));
    		} catch (InterruptedException | ExecutionException e) {
    			e.printStackTrace();
    		}
    		executor.shutdown();
    	}
    }
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.TimeUnit;
    
    public class MyCallable implements Callable<String> {
    
    	private int time;
    	private String name;
    	
    	public MyCallable(int time, String name) {
    		this.time = time;
    		this.name = name;
    	}
    	
    	@Override
    	public String call() throws Exception {
    		TimeUnit.SECONDS.sleep(time);
    		return "这是" + name + "返回的结果";
    	}
    }
    
    处理所有结果
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
    
    //invokeAll 接收一个任务列表,等待所有任务完成
    public class Main {
    
    	public static void main(String[] args) {
    		
    		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
    		
    		MyCallable myCallable = new MyCallable(5, "任务1");
    		MyCallable myCallable2 = new MyCallable(2, "任务2");
    		
    		List<MyCallable> list =new ArrayList<MyCallable>();
    		list.add(myCallable);
    		list.add(myCallable2);
    		
    		List<Future<String>> result = new ArrayList<Future<String>>();
    		try {
    			result = executor.invokeAll(list);
    			for (Iterator<Future<String>> iterator = result.iterator(); iterator.hasNext();) {
    				Future<String> future = (Future<String>) iterator.next();
    				System.out.println(future.get());
    			}
    		} catch (InterruptedException | ExecutionException e) {
    			e.printStackTrace();
    		}
    		executor.shutdown();
    	}
    }
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.TimeUnit;
    
    public class MyCallable implements Callable<String> {
    
    	private int time;
    	private String name;
    	
    	public MyCallable(int time, String name) {
    		this.time = time;
    		this.name = name;
    	}
    	
    	@Override
    	public String call() throws Exception {
    		TimeUnit.SECONDS.sleep(time);
    		return "这是" + name + "返回的结果";
    	}
    }
    
    取消任务
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
    		
    		MyCallable myCallable = new MyCallable();
    		
    		Future<String> result = executor.submit(myCallable);
    		System.out.println("isCancelled:" + result.isCancelled());
    		System.out.println("isDone:" + result.isDone());
    		try {
    			TimeUnit.SECONDS.sleep(2);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println("取消任务");
    		result.cancel(true);
    		System.out.println("isCancelled:" + result.isCancelled());
    		System.out.println("isDone:" + result.isDone());
    		executor.shutdown();
    	}
    }
    
    import java.util.concurrent.Callable;
    
    public class MyCallable implements Callable<String> {
    
    	@Override
    	public String call() throws Exception {
    		while(true) {
                System.out.printf("Task: Test
    ");
                Thread.sleep(100);
            }
    	}
    }
    
    控制任务的完成
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import org.sy.lab.多线程基础._4_执行器._2_Future线程控制._4_取消任务.MyCallable;
    
    public class Main {
    
    public static void main(String[] args) {
    		
    		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
    		
    		MyCallable myCallable = new MyCallable();
    		MyFuture myFuture = new MyFuture(myCallable);
    		
    		executor.submit(myFuture);
    		executor.shutdown();
    		System.out.println("isCancelled:" + myFuture.isCancelled());
    		System.out.println("isDone:" + myFuture.isDone());
    		try {
    			TimeUnit.SECONDS.sleep(2);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println("取消任务");
    		myFuture.cancel(true);
    		System.out.println("isCancelled:" + myFuture.isCancelled());
    		System.out.println("isDone:" + myFuture.isDone());
    	}
    }
    
    import java.util.concurrent.Callable;
    
    public class MyCallable implements Callable<String> {
    
    	@Override
    	public String call() throws Exception {
    		while(true) {
                System.out.printf("Task: Test
    ");
                Thread.sleep(100);
            }
    	}
    }
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.FutureTask;
    
    public class MyFuture extends FutureTask<String> {
    
    	public MyFuture(Callable<String> callable) {
    		super(callable);
    	}
    	
    	@Override
    	protected void done() {
    		if(isCancelled()) {
    			System.out.println("cancel");
    		} else {
    			System.out.println("done");
    		}
    	}
    }
    

    3、分离任务的启动和结束

    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
    	public static void main(String[] args) {
    		
    		ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    		CompletionService<String> service = new ExecutorCompletionService<String>(executorService);
    		
    		Producer producer1 = new Producer("生产者1", service);
    		Producer producer2 = new Producer("生产者2", service);
    		Consumer consumer = new Consumer("消费者", service);
    		
    		Thread thread1 = new Thread(producer1);
    		Thread thread2 = new Thread(producer2);
    		Thread thread3 = new Thread(consumer);
    		
    		thread1.start();
    		thread2.start();
    		thread3.start();
    		
    		try {
    			TimeUnit.SECONDS.sleep(5);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		executorService.shutdown();
    		consumer.setEnd();
    	}
    }
    
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class Consumer implements Runnable {
    
    	private String name;
    	private boolean end = false;
    	private CompletionService<String> service;
    	
    	public Consumer(String name, CompletionService<String> service) {
    		this.name = name;
    		this.service = service;
    	}
    	
    	@Override
    	public void run() {
    		while(!end) {
    			Future<String> result = service.poll();
    			try {
    				if(result != null) {
    					System.out.println(name + "消费 " + result.get());
    				}
    			} catch (InterruptedException | ExecutionException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	
    	public void setEnd() {
    		end = true;
    	}
    }
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.TimeUnit;
    
    public class Processor implements Callable<String> {
    
    	private String name;
    	
    	public Processor(String name) {
    		this.name = name;
    	}
    	
    	@Override
    	public String call() throws Exception {
    		TimeUnit.SECONDS.sleep((long) (Math.random() * 3));
    		System.out.println(name + "生产产品完成");
    		return name + "生产的产品";
    	}
    }
    
    import java.util.concurrent.CompletionService;
    
    public class Producer implements Runnable {
    
    	private String name;
    	private CompletionService<String> service;
    	
    	public Producer(String name, CompletionService<String> service) {
    		this.name = name;
    		this.service = service;
    	}
    	
    	@Override
    	public void run() {
    		Processor processor = new Processor(name);
    		service.submit(processor);
    	}
    }
    

    4、处理被拒绝的任务

    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    
    //ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    //ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    //ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    //ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
    //自定义处理继承RejectedExecutionHandler类
    public class Main {
    	public static void main(String[] args) {
    		RejectedTaskController controller = new RejectedTaskController();
    		ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
    		executor.setRejectedExecutionHandler(controller);
    		System.out.printf("Main: Starting.
    ");
    		for(int i=0; i<3; i++) {
    			Task task = new Task("Task" + i);
    			executor.submit(task);
    		}
    		System.out.printf("Main: Shutting down the Executor.
    ");
    		executor.shutdown();
    		System.out.printf("Main: Sending another Task.
    ");
    		Task task = new Task("RejectedTask");
    		executor.submit(task);
    		System.out.printf("Main: End.
    ");
    	}
    }
    
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class RejectedTaskController implements RejectedExecutionHandler {
    	@Override
    	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    		System.out.printf("RejectedTaskController: The task %s has been rejected
    ", r.toString());
    		System.out.printf("RejectedTaskController: %s
    ", executor.toString());
    		System.out.printf("RejectedTaskController: Terminating: %s
    ", executor.isTerminating());
    		System.out.printf("RejectedTaksController: Terminated: %s
    ", executor.isTerminated());
    	}
    }
    
    import java.util.concurrent.TimeUnit;
    
    public class Task implements Runnable {
    	private String name;
    	public Task(String name) {
    		this.name = name;
    	}
    	@Override
    	public void run() {
    		System.out.printf("Task " + name + ": Starting
    ");
    		try {
    			long duration = (long) (Math.random() * 10);
    			System.out.printf("Task %s: ReportGenerator: Generating a report during %d seconds
    ", name, duration);
    			TimeUnit.SECONDS.sleep(duration);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.printf("Task %s: Ending
    ", name);
    	}
    	public String toString() {
    		return name;
    	}
    }
    

    五、ForkJoin

    六、集合

    七、定制

    八、监控

    九、代码下载

    代码下载:gitee

  • 相关阅读:
    delphi RTTI 反射技术
    delphi 自我删除和线程池(1000行代码,需要仔细研究)
    寻找两个已序数组中的第k大元素
    OpenCV中的神器Image Watch
    PYTHON 之 【RE模块的正则表达式学习】
    Call U
    微软IE11浏览器的7大变化
    集群应用及运维经验小结
    逆序对:从插入排序到归并排序
    Jquery 图片轮播实现原理总结
  • 原文地址:https://www.cnblogs.com/ctxsdhy/p/9583932.html
Copyright © 2011-2022 走看看