zoukankan      html  css  js  c++  java
  • JDK的多线程与并发库

    1.创建多线程

    public class MultiThread {
    
        public static void main(String[] args) {
            // 通过继承Thread类
            Thread thread = new Thread(){
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("1:" + Thread.currentThread().getName());
                        System.out.println("2:" + this.getName());
                    }
                }
            };
            thread.start();
            
            // 通过实现Runnable接口
            Thread thread2 = new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("1:" + Thread.currentThread().getName());
    
                    }                
                    
                }
            });
            thread2.start();
            
            // 如果既继承runnable接口又实现了Thread类, 会执行哪个?
            new Thread(
                    new Runnable(){
                        public void run() {
                            while(true){
                                try {
                                    Thread.sleep(500);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println("runnable :" + Thread.currentThread().getName());
    
                            }                            
                        }
                    }
            ){
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("thread :" + Thread.currentThread().getName());
    
                    }    
                }
            }.start();
            
        }
    }

    2.定时器Timer

    定时任务就是靠多线程实现的

    public class TimerTest {
        private static int count = 0;
       
    public static void main(String[] args) { class MyTimerTask extends TimerTask{ @Override public void run() { count = (count+1)%2; System.out.println("bombing!"); new Timer().schedule(new MyTimerTask(),2000+2000*count); } } new Timer().schedule(new MyTimerTask(), 2000); while(true){ System.out.println(new Date().getSeconds()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }

     

    3.互斥 synchronized 

    保证线程安全(数据完整性)

    public class MultiThreadMutex {
    
        public static void main(String[] args) {
            new MultiThreadMutex().init();
        }
        
        private void init(){
            final Outputer outputer = new Outputer();
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        outputer.output("javaIsAPurelyObjectOrientedProgrammingLanguage");
                    }
                    
                }
            }).start();
            
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        outputer.output3("c++IsAMulti-paradigmSystems-levelProgrammingLanguage");
                    } 
                }
            }).start();
            
        }
    
        static class Outputer{
            
            public void output(String name){
                int len = name.length();
                synchronized (Outputer.class) 
                {
                    for(int i=0;i<len;i++){
                        System.out.print(name.charAt(i));
                    }
                    System.out.println();
                }
            }
            
            public synchronized void output2(String name){
                int len = name.length();
                for(int i=0;i<len;i++){
                        System.out.print(name.charAt(i));
                }
                System.out.println();
            }
            
            public static synchronized void output3(String name){
                int len = name.length();
                for(int i=0;i<len;i++){
                        System.out.print(name.charAt(i));
                }
                System.out.println();
            }    
        }
    }

    4.同步 wait/notify

    保证线程间执行次序

    // 1. wait notify成对出现, 并且处于互斥锁的范围内
    // 2. 要用while(condition)围住mutex.wait(), 因为存在虚假唤醒
    public class MultiThreadSynchronization {
    
        public static void main(String[] args) {
            
            final Business business = new Business();
            new Thread(
                    new Runnable() {
                        @Override
                        public void run() {
                            for(int i=1;i<=50;i++){
                                business.sub(i);
                            }
                        }
                    }
            ).start();
            
            for(int i=1;i<=50;i++){
                business.main(i);
            }
        }
    }
    
    class Business {
        private boolean bShouldSub = true;
    
        public synchronized void sub(int i) {
            while (!bShouldSub) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("sub thread sequence of " + j + ",loop of " + i);
            }
            bShouldSub = false;
            this.notify();
        }
    
        public synchronized void main(int i) {
            while (bShouldSub) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 100; j++) {
                System.out.println("main thread sequence of " + j + ",loop of " + i);
            }
            bShouldSub = true;
            this.notify();
        }
    }

     5.线程间传递参数

    共享变量

    / 多个线程共享变量      
    // 以类中变量为中介; 以传入的共同参数为中介; 匿名内部类以主线程main中变量为中介;
    public class MultiThreadShareData {
    
        public static void main(String[] args) {
            // 传入共享参数  每个线程执行相同的代码
            ShareData1 data1 = new ShareData1();
            new Thread(data1).start();
            new Thread(data1).start();
            
            // 传入共享参数
            ShareData2 data2 = new ShareData2();
            new Thread(new MyRunnable1(data2)).start();
            new Thread(new MyRunnable2(data2)).start();
            
            // 匿名内部类实现变量的写法更简洁, 不需要传参
            final ShareData2 data3 = new ShareData2();
            new Thread(new Runnable(){
                @Override
                public void run() {
                    data3.decrement();
                }
            }).start();
            new Thread(new Runnable(){
                @Override
                public void run() {
                    data3.increment();
                }
            }).start();
        }
    }
        
        
    // 方式1. 如果每个线程执行相同的代码 -> 多个Thread共享同一个runnable中的对象    少有可能
    class ShareData1 implements Runnable {
        private int count = 100;
        @Override
        public void run() { 
            while (true) {
                synchronized(this) {
                    count--;
                }
            }
        }
    }
    
    
    // 方式2.   
    class ShareData2 {
        private int j = 0;
        public synchronized void increment() {   
            j++;
        }
        public synchronized void decrement() {
            j--;
        }
    }
    class MyRunnable1 implements Runnable {
        private ShareData2 data1;
        public MyRunnable1(ShareData2 data1) {
            this.data1 = data1;
        }
        public void run() {
            data1.decrement();
        }
    }
    class MyRunnable2 implements Runnable {
        private ShareData2 data1;
        public MyRunnable2(ShareData2 data1) {
            this.data1 = data1;
        }
        public void run() {
            data1.increment();
        }
    }

    管道

    public class MultiThreadPipe {
        
        public static void main(String[] args) {
            PipedOutputStream pos = new PipedOutputStream();
            PipedInputStream pis = new PipedInputStream();
            try {
                pos.connect(pis);
            } catch (IOException e) {
                e.printStackTrace();
            }
            new Consumer(pis).start();
            new Producer(pos).start();
        }
    }
    
    class Producer extends Thread {
        private PipedOutputStream pos;
        public Producer(PipedOutputStream pos) {
            this.pos = pos;
        }
    
        public void run() {
            int i = 8;
            try {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                pos.write(i);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    class Consumer extends Thread {
        private PipedInputStream pis;
        public Consumer(PipedInputStream pis) {
            this.pis = pis;
        }
        public void run() {
            try {
                System.out.println(pis.read());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    6.ThreadLocal

    该变量形式上共享, 但却是by线程独立

    public class ThreadLocalExample {
    
        private static ThreadLocal<Integer> x = new ThreadLocal<Integer>(); 
        public static void main(String[] args) {
            for (int i = 0; i < 2; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        int data = new Random().nextInt();
                        System.out.println(Thread.currentThread().getName()
                                + " has put data :" + data);
                        x.set(data);
                        Person.getInstance().setName("name" + data);
                        Person.getInstance().setAge(data);
                        new A().print();
                        new B().print();
                    }
                }).start();
            }
        }
        
        static class A{
            public void print(){
                int data = x.get();
                System.out.println("A from " + Thread.currentThread().getName() 
                        + " get data :" + data);
                Person myData = Person.getInstance();
                System.out.println("A from " + Thread.currentThread().getName() 
                        + " getMyData: " + myData.getName() + "," +
                        myData.getAge());
            }
        }
        
        static class B{
            public void print(){
                int data = x.get();            
                System.out.println("B from " + Thread.currentThread().getName() 
                        + " get data :" + data);
                Person myData = Person.getInstance();
                System.out.println("B from " + Thread.currentThread().getName() 
                        + " getMyData: " + myData.getName() + "," +
                        myData.getAge());            
            }        
        }
    }
    
    // javaBean的by线程的单例
    class Person {
        private static ThreadLocal<Person> personThreadLocal = new ThreadLocal<Person>();   
        private Person(){}
        public static /*无需synchronized*/ Person getInstance(){
            Person instance = personThreadLocal.get();
            if(instance == null){
                instance = new Person();
                personThreadLocal.set(instance);
            }
            return instance;
        }
        
        private String name;
        private int age;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public int getAge() {
            return age;
        }
        public void setAge(int age) {
            this.age = age;
        }
    }

    ThreadLocal实现原理

    public class ThreadLocalSimulation {
    
        private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>();  //核心
    
        public static void main(String[] args) {
            for (int i = 0; i < 2; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        int data = new Random().nextInt();
                        System.out.println(Thread.currentThread().getName()
                                + " has put data :" + data);
                        threadData.put(Thread.currentThread(), data);
                        new A().get();
                        new B().get();
                    }
                }).start();
            }
        }
        
        static class A{
            public void get(){
                int data = threadData.get(Thread.currentThread());
                System.out.println("A from " + Thread.currentThread().getName() 
                        + " get data :" + data);
            }
        }
        
        static class B{
            public void get(){
                int data = threadData.get(Thread.currentThread());            
                System.out.println("B from " + Thread.currentThread().getName() 
                        + " get data :" + data);
            }        
        }
    }

     7. 线程池

    池化技术都是防止频繁开关来提高系统性能, 代价是必须损耗一定空间来保存池

    // 池化技术之线程池
    public class ThreadPoolTest {
    
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(3);  // 限制线程数量
            //ExecutorService threadPool = Executors.newCachedThreadPool();  // 动态控制线程数量
            //ExecutorService threadPool = Executors.newSingleThreadExecutor();  // 跟一个线程类似, 但可以保证线程挂了有新线程接替
            for(int i=1; i<=10; i++){
                final int task = i;
                threadPool.execute(new Runnable(){
                    @Override
                    public void run() {
                        for(int j = 1; j <= 10; j++){
                            try {
                                Thread.sleep(20);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(Thread.currentThread().getName() + " task:" + task + " loop:" + j);
                        }
                    }
                });
            }
            System.out.println("all of 10 tasks have committed!");
            threadPool.shutdown();    // 如果是shutdownNow方法会停止正在执行的任务
            
            // 带定时器的线程池 schedule方法:xx时间以后执行; scheduleAtFiexedRate方法:xx时间后每隔yy时间执行
            Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println("bombing!");
    
                }
            }, 6, 2, TimeUnit.SECONDS); 
        }
    
    }

    8. Callable接口与Future

    能实现返回线程执行结果 的效果

    // 返回结果的任务
    public class CallableAndFuture {
    
        public static void main(String[] args) {
            // 其一
            ExecutorService threadPool = Executors.newSingleThreadExecutor();
            Future<String> future = threadPool.submit(     // submit Callable<resultType>而非execute Runnable
                    new Callable<String>() {
                        public String call() throws Exception {
                            // 模拟handling
                            Thread.sleep(2000);
                            return "hello";
                        };
                    });
            System.out.println("等待结果");
            
            try {
                System.out.println("拿到结果:" + future.get());    //阻塞等待结果, 还有个get方法的重载版本,带超时参数, 超时抛异常. future/get的特点在于, 我们可以把任务合理分解, 在需要任务结果时调用get
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
            threadPool.shutdown();  //不用该函数主线程是不会退出的
            
            
            // 其二
            // ExecutorCompletionService包装线程池, take方法返回最先完成的Future任务
            ExecutorService threadPool2 =  Executors.newFixedThreadPool(10);
            CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2);
            for (int i = 1; i <= 10; i++) {
                final int seq = i;
                completionService.submit(new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        // 模拟handling
                        Thread.sleep(new Random().nextInt(5000));
                        return seq;
                    }
                });
            }
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println(completionService.take().get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            threadPool2.shutdown();
        }
    }

    9.Lock

    ReentrantLock是具有synchronized功能的类
    ReentrantReadWriteLock 粒度更细, 读与读不互斥, 写与写互斥,  读与写互斥

    // 使用Lock改写synchronized例子
    public class LockTest {
    
        public static void main(String[] args) {
            new LockTest().init();
        }
        
        private void init(){
            final Outputer outputer = new Outputer();
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        outputer.output("javaIsAPurelyObjectOrientedProgrammingLanguage");
                    }
                    
                }
            }).start();
            
            
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        outputer.output("c++IsAMulti-paradigmSystems-levelProgrammingLanguage");
                    }
                    
                }
            }).start();
            
        }
    
        static class Outputer {
            Lock lock = new ReentrantLock();
    
            public void output(String name) {
                int len = name.length();
                lock.lock();
                try {
                    for (int i = 0; i < len; i++) {
                        System.out.print(name.charAt(i));
                    }
                    System.out.println();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    使用读写锁模拟缓存

    // 模拟缓存
    // 加锁解锁要一致: 解没加过的锁会抛出异常; 加锁不解会造成死锁
    public class CacheSimulation {
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; ++i) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        String i = (String) getData("key");
                        // out.println()参数为常量无并发问题; 为表达式时存在并发问题
                        System.out.println(i);
                    }
                }).start();
            }
        }
        
        private static Map<String, Object> cache = new HashMap<String, Object>();  //保存缓存
        private static ReadWriteLock rwl = new ReentrantReadWriteLock();
        
        public static Object getData(String key) {
            rwl.readLock().lock();       
            Object value = cache.get(key);
            if (value == null) {
                rwl.readLock().unlock(); 
                rwl.writeLock().lock();
                if (cache.get(key) == null) {    // 防止几个线程都阻塞在writeLock.lock()
                    value = "abcde";     // 模拟获取数据
                    System.out.println("get");
                    cache.put(key, value);
                }
                rwl.writeLock().unlock();
            }
            return value;
        }
    }

    10.Condition

    Condition具有wait/notify功能的类, 同样要配合Lock使用. 但与synchronized的waitnotify不同, 这里同一个Lock下可以创建多个Condition对象, 来实现粒度更细的控制

    一个condition

    // 使用Condition改写线程同步示例, Condition由Lock.newCondition()而来
    // Condition.await/signal 对应 Mutex.wait/notify
    public class ConditionTest {
    
        public static void main(String[] args) {
    
            final Business business = new Business();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 1; i <= 50; i++) {
                        business.sub(i);
                    }
                }
            }).start();
    
            for (int i = 1; i <= 30; i++) {
                business.main(i);
            }
    
        }
    
        static class Business {
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            private boolean bShouldSub = true;
    
            public void sub(int i) {
                lock.lock();
                try {
                    while (!bShouldSub) {
                        try {
                            condition.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    for (int j = 1; j <= 10; j++) {
                        System.out.println("sub thread sequence of " + j + ",loop of " + i);
                    }
                    bShouldSub = false;
                    condition.signal();
                } finally {
                    lock.unlock();
                }
            }
    
            public void main(int i) {
                lock.lock();
                try {
                    while (bShouldSub) {
                        try {
                            condition.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    for (int j = 1; j <= 20; j++) {
                        System.out.println("main thread sequence of " + j + ",loop of " + i);
                    }
                    bShouldSub = true;
                    condition.signal();
                } finally {
                    lock.unlock();
                }
            }
    
        }
    }

    两个condition,  下面模拟了数组阻塞队列

    // 有界缓冲区/数组阻塞队列 的模拟
    class ArrayBlockingQueueSimulation {
        final Lock lock = new ReentrantLock();
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();
        final Object[] items = new Object[100];  // 长度
        int putptr, takeptr, count;  // 初始为0
    
        public void put(Object x) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length) {
                    notFull.await();
                }
                items[putptr] = x;
                if (++putptr == items.length) {
                    putptr = 0;
                }
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public Object take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    notEmpty.await();
                }
                Object x = items[takeptr];
                if (++takeptr == items.length) {
                    takeptr = 0;
                }
                --count;
                notFull.signal();
                return x;
            } finally {
                lock.unlock();
            }
        }
    }

    三个condition,  如下实现了三个线程轮流执行

    public class ThreeThreadsSynchronization {
    
        public static void main(String[] args) {
    
            final Business business = new Business();
            
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 1; i <= 50; i++) {
                        business.sub2(i);
                    }
    
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 1; i <= 50; i++) {
                        business.sub3(i);
                    }
                }
            }).start();
    
            for (int i = 1; i <= 50; i++) {
                business.main(i);
            }
    
        }
    
        static class Business {
            Lock lock = new ReentrantLock();
            Condition condition1 = lock.newCondition();
            Condition condition2 = lock.newCondition();
            Condition condition3 = lock.newCondition();
            private int shouldSub = 1;
    
            public void sub2(int i) {
                lock.lock();
                try {
                    while (shouldSub != 2) {
                        try {
                            condition2.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    for (int j = 1; j <= 20; j++) {
                        System.out.println("sub2 thread sequence of " + j + ",loop of " + i);
                    }
                    shouldSub = 3;
                    condition3.signal();
                } finally {
                    lock.unlock();
                }
            }
    
            public void sub3(int i) {
                lock.lock();
                try {
                    while (shouldSub != 3) {
                        try {
                            condition3.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    for (int j = 1; j <= 10; j++) {
                        System.out.println("sub3 thread sequence of " + j + ",loop of " + i);
                    }
                    shouldSub = 1;
                    condition1.signal();
                } finally {
                    lock.unlock();
                }
            }
    
            public void main(int i) {
                lock.lock();
                try {
                    while (shouldSub != 1) {
                        try {
                            condition1.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    for (int j = 1; j <= 30; j++) {
                        System.out.println("main thread sequence of " + j + ",loop of " + i);
                    }
                    shouldSub = 2;
                    condition2.signal();
                } finally {
                    lock.unlock();
                }
            }
    
        }
    }

    11. Semaphore

    Semaphore信号量, 互斥锁保证多个线程同时访问同一个资源时的线程安全性, 信号量让线程动态匹配现有资源数, 来保证同时访问多个资源时的线程安全性, 并发更高. 

    Lock是哪个线程拿哪个线程负责释放; 信号量可以是一个线程获取, 另一个线程释放, 这个特性能用于死锁恢复.

    public class SemaphoreTest {
        
        public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(3);
            for (int i = 0; i < 10; i++) {
                Runnable runnable = new Runnable() {
                    public void run() {
                        try {
                            semaphore.acquire();  //线程进入时获取信号量
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("线程" + Thread.currentThread().getName() + "进入,当前已有" + (3 - semaphore.availablePermits()) + "个并发");
                        try {
                            Thread.sleep((long) (Math.random() * 1000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("线程" + Thread.currentThread().getName() + "即将离开");
                        semaphore.release();   //线程结束时释放信号量
                        // 下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
                        System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有" + (3 - semaphore.availablePermits()) + "个并发");
                    }
                };
                service.execute(runnable);
            }
    service.shutdown(); } }

    12. CyclicBarrier

    多个线程阶段点同步

    public class CyclicBarrierTest {
    
        public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            final CyclicBarrier cb = new CyclicBarrier(3);
            for (int i = 0; i < 3; i++) {
                Runnable runnable = new Runnable() {
                    public void run() {
                        try {
                            // 模拟handling
                            Thread.sleep((long) (Math.random() * 1000));
                            System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点1,当前已有" 
                            + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走" : "正在等候"));
                            cb.await();              //第一个同步点
                            Thread.sleep((long) (Math.random() * 1000));
                            System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有"
                            + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走" : "正在等候"));
                            cb.await();              //第二个同步点
                            Thread.sleep((long) (Math.random() * 1000));
                            System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有"
                            + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走" : "正在等候"));
                            cb.await();             //第三个同步点
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                service.execute(runnable);
            }
            service.shutdown();
        }
    }

    13.CountDownLatch

    线程通过等待计数器归零来实现同步   实现一个人/多个人等待一个人/多个人的完成

    public class CountdownLatchTest {
    
        public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            final CountDownLatch cdOrder = new CountDownLatch(1);  //初始计数器的数为1
            final CountDownLatch cdAnswer = new CountDownLatch(3);
            for (int i = 0; i < 3; i++) {
                Runnable runnable = new Runnable() {
                    public void run() {
                        try {
                            System.out.println("线程" + Thread.currentThread().getName() + "准备接受执行命令");
                            cdOrder.await();
                            System.out.println("线程" + Thread.currentThread().getName() + "已接到命令, 开始执行");
                            // 模拟handling
                            Thread.sleep((long) (Math.random() * 5000));
                            System.out.println("线程" + Thread.currentThread().getName() + "的分任务完成");
                            cdAnswer.countDown();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                service.execute(runnable);
            }
            try {
                Thread.sleep((long) (Math.random() * 5000));
                System.out.println("线程" + Thread.currentThread().getName() + "即将发送执行命令");
                cdOrder.countDown();
                System.out.println("线程" + Thread.currentThread().getName() + "已发送命令, 任务正在处理");
                cdAnswer.await();
                System.out.println("线程" + Thread.currentThread().getName() + "主管的所有任务完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
            service.shutdown();
        }
    }

    14. Exchanger

    两个线程间互相交换数据

    public class ExchangerTest {
    
        public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            final Exchanger<String> exchanger = new Exchanger<>();
            service.execute(new Runnable(){
                public void run() {
                    try {                
                        String data1 = "王老吉";
                        System.out.println("线程" + Thread.currentThread().getName() + "正在把数据 " + data1 +" 换出去");
                        Thread.sleep((long)(Math.random()*2000));
                        String data2 = (String)exchanger.exchange(data1);
                        System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为 " + data2);
                    }catch(Exception e){
                        e.printStackTrace();
                    }
                }    
            });
            
            service.execute(new Runnable(){
                public void run() {
                    try {                
                        String data1 = "加多宝";
                        System.out.println("线程" + Thread.currentThread().getName() + "正在把数据 " + data1 +" 换出去");
                        Thread.sleep((long)(Math.random()*2000));                    
                        String data2 = (String)exchanger.exchange(data1);
                        System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为 " + data2);
                    }catch(Exception e){
                        e.printStackTrace();
                    }                
                }    
            });        
            
            service.shutdown();
        }
    }

    15. 阻塞队列

    阻塞队列实现了BlockingQueue接口,  是生产者消费者模型的典范, 通过锁实现

    put和take方法才具有阻塞功能
    阻塞队列与线程同步      :  两个大小为1的空/满阻塞队列可以实现condition或wait/notify的效果
    阻塞队列与Semaphore :  阻塞队列是一个线程存入数据, 一个线程取出数据; Semaphore一般用作同一线程获取和释放

    public class BlockingQueueTest {
        
        public static void main(String[] args) {
            final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
            for (int i = 0; i < 2; i++) {
                new Thread() {
                    public void run() {
                        while (true) {
                            try {
                                Thread.sleep((long) (Math.random() * 1000));
                                System.out.println(Thread.currentThread().getName() + "准备放数据!");
                                queue.put(1);
                                System.out.println(Thread.currentThread().getName() + "已经放入数据, " + "队列目前有" + queue.size() + "个数据");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }.start();
            }
    
            new Thread() {
                public void run() {
                    while (true) {
                        try {
                            Thread.sleep(1000);
                            System.out.println(Thread.currentThread().getName() + "准备取数据!");
                            queue.take();
                            System.out.println(Thread.currentThread().getName()
                                    + "已经取走数据, " + "队列目前有" + queue.size() + "个数据");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }
    }

    两个长度为1的空/满队列实现condition的效果

    public class BlockingQueueImplSynchronization {
    
        public static void main(String[] args) {
    
            final Business business = new Business();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 1; i <= 20; i++) {
                        business.sub1(i);
                    }
                }
            }).start();
    
            for (int i = 1; i <= 20; i++) {
                business.sub2(i);
            }
    
        }
    
        static class Business {
    
            BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
            BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
            
            {
                try {
                    System.out.println("init");
                    queue2.put(1);    //queue1为空  queue为满
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            public void sub1(int i) {
                try {
                    queue1.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 1; j <= 10; j++) {
                    System.out.println("sub thread sequece of " + j + ", loop of " + i);
                }
                try {
                    queue2.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            public void sub2(int i) {
                try {
                    queue2.put(1);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                for (int j = 1; j <= 20; j++) {
                    System.out.println("main thread sequece of " + j + ", loop of " + i);
                }
                try {
                    queue1.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    16. 线程安全的非阻塞容器

    并发集合
    在JDK5之前, 多线程中对容器的操作部分需要手动加synchronized块保证线程安全
    稍微轻便点的方式是使用Collections.synchronizedXXX()生成集合. 实现原理:通过装饰器模式在同名方法前添加synchronized(this), 来达到实现线程安全
    但这是不完整的解决方案, 因为
    1)装饰类的迭代器相关的代码没有加synchronized. 涉及到迭代还依然需要手动加synchronized块
    2)迭代器遍历过程中除该迭代器外不能用其他方式增删元素(单线程在自身循环内, 多线程在不同线程执行不同部分), 否则抛出并发修改异常
    3)最重要的, 并发低

    // 在集合的迭代器迭代过程中, 除了迭代器外不能对集合进行修改, 否则会抛出ConcurrentModificationException
    // ConcurrentModificationException的实现: 乐观锁, 记录一个版本号, 版本号不对抛异常
    public class ConcurrentModificationExceptionExample {
        
        public static void main(String[] args) {
            // Collection users = new CopyOnWriteArrayList();  //若使用同步集合, 非迭代器修改就正常
            Collection<User> users = new ArrayList<>();
            users.add(new User("张三", 28));
            users.add(new User("李四", 25));
            users.add(new User("王五", 31));
            Iterator itrUsers = users.iterator();
    
            while (itrUsers.hasNext()) {
                System.out.println("mark");
                User user = (User) itrUsers.next();
                if ("张三".equals(user.getName())) {
                    users.remove(user);   // 非迭代器修改抛出异常
                    //itrUsers.remove();  // 若使用迭代器修改, 则正常
                } else {
                    System.out.println(user);
                }
            }
        }
    } 

    JDK5之后提出了很多线程安全的容器, 与前辈synchronized方式比起来, 它们的亮点并不是保证了线程安全, 而是它们在保证线程安全的同时尽量避免并发瓶颈

    基本上限制条件多的容器都能实现Concurrent版本, 保持一定的读写并发;  像ArrayList LinkedList很难避开并发瓶颈, 退而求其次ArrayList实现了CopyOn保证了读并发;

     LinkedList只能是通过Collections.synchronizedList()的synchronized方式(读|读都有锁), 尽量用其他集合替代.

    ps:Collections.synchronizedList()或Vector的区别:  1.扩容量不同Vector 100%, SynchronizedList 50%.  2.Vector已对迭代器加锁, SynchronizedList需要手动加锁

    原有集合 并发集合 原理
    HashMap ConcurrentHashMap 锁分段技术
    HashSet Collections.newSetFromMap(new ConcurrentHashMap()) 用map版本实现
    TreeMap ConcurrentSkipListMap

    用SkipList替代红黑树, CAS

    TreeSet ConcurrentSkipListSet 用map版本实现
    Queue接口 ConcurrentLinkedQueue  非阻塞 CAS
    ArrayList CopyOnWriteArrayList 写时复制, 提高了读并发度;  以空间换取了部分写并发, 这点好坏需测试
    Set接口 CopyOnWriteArraySet 用ArrayList版本实现, 用addIfAbsent()方法实现元素去重,写时还要复制, 因此写效率不佳

    CAS原理类似乐观锁, 处理器保证底层实现, 理念:多次尝试肯定有一个能成(版本匹配则操作成功->该操作即具有原子性), 但会做很多无用功; 相比加锁能提高并发

    17. 原子类

    AtomicInteger等原子类用的是CAS, 并发比加锁高

    实现多线程下安全性的要素:  原子性(不能被其他影响某变量的程序段打断) +  内存可见性 (一个线程修改, 另一个线程马上能看到)

    synchronized: 实现了原子性和可见性 
    volatile: 实现内存可见性   CAS: 实现原子性

  • 相关阅读:
    系统集成项目管理工程师计算题(成本管理计算)
    系统集成项目管理工程师计算题(进度管理计算)
    系统集成项目管理工程师计算题(期望值)
    系统集成项目管理工程师计算题(三点估算)
    系统集成项目管理工程师计算题(沟通渠道)
    Asp.net core web Api 应用Jwt 验证
    Linux vmstat命令
    关于Java集合的小抄
    @Resource和@Autowire
    Servlet是线程安全的吗?
  • 原文地址:https://www.cnblogs.com/myJavaEE/p/6707001.html
Copyright © 2011-2022 走看看