zoukankan      html  css  js  c++  java
  • 基于马士兵老师的高并发笔记

    一、分析下面程序输出:

    /**
     * 分析一下这个程序的输出
     * @author mashibing
     */
    
    package yxxy.c_005;
    
    public class T implements Runnable {
    
        private int count = 10;
        
        public synchronized void run() { 
            count--;
            System.out.println(Thread.currentThread().getName() + " count = " + count);
        }
        
        public static void main(String[] args) {
            T t = new T();
            for(int i=0; i<5; i++) {
                new Thread(t, "THREAD" + i).start();
            }
        }
        
    }
    THREAD0 count = 9
    THREAD4 count = 8
    THREAD1 count = 7
    THREAD3 count = 6
    THREAD2 count = 5
    

    分析:

    启动了5个线程,thread0先拿到这把锁,开始执行,thread1-4都在等待准备抢这把锁;thread0执行完之后,释放锁;thread4率先抢到了这把锁,开始执行;执行完之后thread1又抢到了这把锁,开始执行....;
    所以看到每次线程访问一次,count-1;而且thread执行的先后顺序每次执行的结果不同,因为你不知道哪个线程先执行了;

    二、对比上一个程序,分析这个程序的输出:

    /**
     * 对比上面一个小程序,分析一下这个程序的输出
     * @author mashibing
     */
    
    package yxxy.c_006;
    
    public class T implements Runnable {
    
        private int count = 10;
        
        public synchronized void run() { 
            count--;
            System.out.println(Thread.currentThread().getName() + " count = " + count);
        }
        
        public static void main(String[] args) {
            
            for(int i=0; i<5; i++) {
                T t = new T();
                new Thread(t, "THREAD" + i).start();
            }
        }
        
    }
    THREAD0 count = 9
    THREAD4 count = 9
    THREAD3 count = 9
    THREAD1 count = 9
    THREAD2 count = 9
    

    分析:

    启动了5个线程,因为每次都是new了一个t,每个线程都能锁住t,一共有5个t,5个count;所以这里5个线程执行完,count都是9;
    但是因为不知道哪个线程先被cpu执行,所以thread名字的顺序是随机的;

    三、同步和非同步方法是否可以同时调用?

    /**
     * 同步和非同步方法是否可以同时调用?
     * @author mashibing
     */
    
    package yxxy.c_007;
    
    public class T {
    
        public synchronized void m1() { 
            System.out.println(Thread.currentThread().getName() + " m1 start...");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " m1 end");
        }
        
        public void m2() {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " m2 ");
        }
        
        public static void main(String[] args) {
            T t = new T();
            
            new Thread(()->t.m1(), "t1").start();
            new Thread(()->t.m2(), "t2").start();
        }
        
    }
    t1 m1 start...
    t2 m2 
    t1 m1 end
    

    分析:

    t1线程执行m1方法,开始睡10s,在这过程之中,t2线程执行m2方法,5s之后打印了m2;由此可见在m1执行的过程之中,m2是可以运行的。
    同步方法的执行过程中,非同步方法是可以执行的。只有synchronized这样的方法在运行时候才需要申请那把锁,而别的方法是不需要申请那把锁的。
    new Thread(()->t.m1())这个写法是java8里面的Lambda表达式,一种简写,还可以写成这样:

     

    public static void main(String[] args) {
            T t = new T();
            
            new Thread(new Runnable(){
                @Override
                public void run() {
                    t.m1();
                }
            }, "t1").start();
            
            new Thread(new Runnable(){
                @Override
                public void run() {
                    t.m2();
                }
            }, "t2").start();
        }
    public static void main(String[] args) {
            T t = new T();
    
            new Thread(t::m1, "t1").start();
            new Thread(t::m2, "t2").start();*/
        
    }
    

    四:对业务写方法加锁,对业务读方法不加锁,容易产生脏读问题(dirtyRead)

    脏读:读到没有写过程中没有完成的数据

     

    /**
     * 对业务写方法加锁
     * 对业务读方法不加锁
     * 容易产生脏读问题(dirtyRead)
     */
    
    package yxxy.c_008;
    
    import java.util.concurrent.TimeUnit;
    
    public class Account {
        String name;
        double balance;
        
        public synchronized void set(String name, double balance) {
            this.name = name;
            
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            this.balance = balance;
        }
        
        public double getBalance(String name) {
            return this.balance;
        }
        
        
        public static void main(String[] args) {
            Account a = new Account();
            new Thread(()->a.set("zhangsan", 100.0)).start();
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println(a.getBalance("zhangsan"));
            
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println(a.getBalance("zhangsan"));
        }
    }
    0.0
    100.0
    

      

    分析:

    主线程里面第一次读zhangsan里面的钱是0.0,第二次读是100.0;原因是set修改钱的时候过程中,sleep了2s钟;为什么sleep 2s就是放大了在线程的执行过程之中的时间差,set钱方法里面this.name=name和this.balance=balance之间可能是会被别的程序执行的;
    在线程的执行过程set钱之中,尽管写的这个方法set加上了synchronized锁定了这个对象,锁定这个对象过程之中,它仍然有可能被那些非锁定的方法/非同步方法访问的;
    尽管对写进行了加锁,但是由于没有对读加锁,那么有可能会读到在写的过程中还没有完成的数据,产生了脏读问题;
     
    解决:
    对读方法枷锁:
    public synchronized double getBalance(String name) {
        return this.balance;
    }
    

    五、一个同步方法可以调用另外一个同步方法:  

    一个线程已经拥有某个对象的锁,再次申请的时候仍然会得到该对象的锁.

     
    /**
     * 一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,再次申请的时候仍然会得到该对象的锁.
     * 也就是说synchronized获得的锁是可重入的.(可重入的意思就是获得锁之后还可以再获得一遍)
     * @author mashibing
     */
    package yxxy.c_009;
    
    import java.util.concurrent.TimeUnit;
    
    public class T {
        synchronized void m1() {
            System.out.println("m1 start");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            m2();
        }
        
        synchronized void m2() {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("m2");
        }
    }
    

      

    分析:

    对t执行m1的时候,需要在t上面加把锁,拿到这个锁了,开始执行,执行锁定的过程之中,调用了m2();
    调用m2的过程中,发现m2也是需要申请一把锁,而申请的这把锁就是当前自己已经持有的这把锁;
    严格来讲,这把锁m1已经持有了,m2还能持有吗?由于是在同一个线程里面,这个是没关系的。它可以再去申请我自己已经拥有的这把锁,实际上就在这把锁上加个数字,从1变成2,锁定了2次。总而言之,再去申请当前持有的这把锁没问题,仍然会得到该对象的锁。

    六、重入锁的另外一种情形,继承中子类的同步方法调用父类的同步方法

    /**
     * 一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,再次申请的时候仍然会得到该对象的锁.
     * 也就是说synchronized获得的锁是可重入的
     * 这里是继承中有可能发生的情形,子类调用父类的同步方法
     * @author mashibing
     */
    package yxxy.c_010;
    
    import java.util.concurrent.TimeUnit;
    
    public class T {
        synchronized void m() {
            System.out.println("m start");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("m end");
        }
        
        public static void main(String[] args) {
            new TT().m();
        }
        
    }
    
    class TT extends T {
        @Override
        synchronized void m() {
            System.out.println("child m start");
            super.m();
            System.out.println("child m end");
        }
    }
    

    七、synchronized同步方法如果遇到异常,锁就会被释放

    /**
     * 程序在执行过程中,如果出现异常,默认情况锁会被释放
     * 所以,在并发处理的过程中,有异常要多加小心,不然可能会发生不一致的情况。
     * 比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,
     * 在第一个线程中抛出异常,其他线程就会进入同步代码区,有可能会访问到异常产生时的数据。
     * 因此要非常小心的处理同步业务逻辑中的异常
     * @author mashibing
     */
    package yxxy.c_011;
    
    import java.util.concurrent.TimeUnit;
    
    public class T {
        int count = 0;
        synchronized void m() {
            System.out.println(Thread.currentThread().getName() + " start");
            while(true) {
                count ++;
                System.out.println(Thread.currentThread().getName() + " count = " + count);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                if(count == 5) {
                    int i = 1/0; //此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续
                }
            }
        }
        
        public static void main(String[] args) {
            T t = new T();
            
            new Thread(()->t.m(), "t1").start();
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            new Thread(()->t.m(), "t2").start();
        }
        
    }
    

      执行结果

    t1 start
    t1 count = 1
    t1 count = 2
    t1 count = 3
    t1 count = 4
    t1 count = 5
    t2 start
    t2 count = 6
    Exception in thread "t1" java.lang.ArithmeticException: / by zero
        at yxxy.c_011.T.m(T.java:28)
        at yxxy.c_011.T.lambda$0(T.java:36)
        at java.lang.Thread.run(Thread.java:745)
    t2 count = 7
    t2 count = 8
    t2 count = 9 
    分析:
    t1线程启动后,如果int i=1/0这里抛了异常后,锁不被释放的话,t2线程就永远启动不起来,永远执行不了;
    但是抛出异常之后,锁被释放了,t2得到了执行;
     
    解决:
    处理异常,锁不被释放,循环继续,t2线程永远执行不了:
    /**
     * 程序在执行过程中,如果出现异常,默认情况锁会被释放
     * 所以,在并发处理的过程中,有异常要多加小心,不然可能会发生不一致的情况。
     * 比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,
     * 在第一个线程中抛出异常,其他线程就会进入同步代码区,有可能会访问到异常产生时的数据。
     * 因此要非常小心的处理同步业务逻辑中的异常
     * @author mashibing
     */
    package yxxy.c_011;
    
    import java.util.concurrent.TimeUnit;
    
    public class T {
        int count = 0;
        synchronized void m() {
            System.out.println(Thread.currentThread().getName() + " start");
            while(true) {
                count ++;
                System.out.println(Thread.currentThread().getName() + " count = " + count);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                if(count == 5) {
                    try{
                        int i = 1/0; //此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续
                    }catch(Exception e){
                        System.out.println(e.getMessage());
                    }
                }
            }
        }
        
        public static void main(String[] args) {
            T t = new T();
            
            new Thread(()->t.m(), "t1").start();
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            new Thread(()->t.m(), "t2").start();
        }
        
    }
    t1 start
    t1 count = 1
    t1 count = 2
    t1 count = 3
    t1 count = 4
    t1 count = 5
    / by zero
    t1 count = 6
    t1 count = 7
    t1 count = 8
    t1 count = 9
    t1 count = 10
    t1 count = 11
    t1 count = 12
    

      

    八、volatile关键字

    /**
     * volatile 关键字,使一个变量在多个线程间可见
     * A B线程都用到一个变量,java默认是A线程中保留一份copy,这样如果B线程修改了该变量,则A线程未必知道
     * 使用volatile关键字,会让所有线程都会读到变量的修改值
     * 
     * 在下面的代码中,running是存在于堆内存的t对象中
     * 当线程t1开始运行的时候,会把running值从内存中读到t1线程的工作区,在运行过程中直接使用这个copy,并不会每次都去
     * 读取堆内存,这样,当主线程修改running的值之后,t1线程感知不到,所以不会停止运行
     * 
     * 使用volatile,将会强制所有线程都去堆内存中读取running的值
     * 
     * 可以阅读这篇文章进行更深入的理解
     * http://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html
     * 
     * volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized
     * @author mashibing
     */
    package yxxy.c_012;
    
    import java.util.concurrent.TimeUnit;
    
    public class T {
        volatile boolean running = true; //对比一下有无volatile的情况下,整个程序运行结果的区别
        void m() {
            System.out.println("m start");
            while(running) {
            }
            System.out.println("m end!");
        }
        
        public static void main(String[] args) {
            T t = new T();
            
            new Thread(t::m, "t1").start();
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            t.running = false;
        }
        
    }
    

      

    分析:

    不加volatile是不行的,线程1没法结束,那么volatile到底是干嘛的?
    线程之间要让running这个值进行可见,这里要涉及到java的内存模型,java对于线程处理的内存模型;
    在jmm(java memory model)里面有个内存它叫主内存,我们所熟识的栈内存,堆内存都可以认为是主内存;每一个线程在执行的过程之中,它有一个线程自己的一块内存,(实际上不能认为这块是内存,有可能它是内存,还有cpu上的缓冲区,是一个统称,就是线程存放它自己变量的一块内存),如果两个cpu在运行不同线程的话,每个线程上都有自己的一块缓冲区,缓冲区就是把主内存JMM里面的内容读过来在缓冲区里面进行修改,如果+1,+1加了好多次再写回去;
    现在有个running在主内存里面,值是true,占一个字节;
    第一个线程启动的时候会把这个字节copy到自己的缓冲区里面,cpu在处理的过程之中就不再去主内存里面读了;它在运行这个线程的过程之中,由于这个cpu非常的忙,在while(running)里面,没空再去主线程里面去刷一下running值了;它一直读自己缓存里面的内容,running永远是true;
    第二个主线程里面,它首先也是把running读到它自己的缓冲区,然后把running改成false,发现running已经改了那就把running写回到主内存里面去;写回到主内存之后,但是第一个线程它没有在主内存重新读啊,所以第一个线程永远结束不了;
     
    加了volatile,第一个线程运行中,不是要求你每次while(running)循环的时候都要到主内存里面读一次running的值,而是说一旦主内存running这个值发生改变后会通知别的线程,说你们的缓冲区里面内容过期了请重新读一下,第一个线程再去读的时候running已经改了,所以线程结束了。
    加了volatile的意思就是当running改了后会通知其他的所有线程的缓冲区,说你们那边的值已经过期了,请你们再去主内存里面重新读一下。
    而并不是通知所有的线程cpu执行的时候每次用的时候都要去主内存读一下,不是,是写完之后进行缓存过期通知。
     
    要保证线程之间的可见性,那么需要对两个线程共同访问的变量加上volatile;如果不想加volatile那只能用synchronized;但volatile的效率要比synchronized高的多;所以在很多高并发的框架里面好多的volatile关键字都在用;比如JDK的并发容器的源码;能用volatile的时候就不要加锁,程序的并发性就要提高很多;

     图:

    九、volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized

    /**
     * volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized
     * 运行下面的程序,并分析结果
     * @author mashibing
     */
    package yxxy.c_013;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class T {
        volatile int count = 0; 
        void m() {
            for(int i=0; i<10000; i++) count++;
        }
        
        public static void main(String[] args) {
            T t = new T();
            
            List<Thread> threads = new ArrayList<Thread>();
            
            for(int i=0; i<10; i++) {
                threads.add(new Thread(t::m, "thread-"+i));
            }
            
            threads.forEach((o)->o.start());
            
            threads.forEach((o)->{
                try {
                    o.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            
            System.out.println(t.count);
            
        }
        
    }
    

      

    volatile和synchronized区别?

    volatile只保证可见性,并不保证原子性;
    synchronized既保证可见性,又保证原子性;但效率要比volatile低不少;
    如果只需要保证可见性的时候,使用volatile,不要使用synchronized;
     
    Thread.join()方法简单解释:插队,等待该线程完成后执行该线程
     

    十、对比上一个程序,可以用synchronized解决

    /**
     * 解决同样的问题的更高效的方法,使用AtomXXX类
     * AtomXXX类本身方法都是原子性的,但不能保证多个方法连续调用是原子性的
     * @author mashibing
     */
    package yxxy.c_015;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class T {
        AtomicInteger count = new AtomicInteger(0); 
    
        void m() {
            for (int i = 0; i < 10000; i++)
                count.incrementAndGet();  //count++
        }
    
        public static void main(String[] args) {
            T t = new T();
    
            List<Thread> threads = new ArrayList<Thread>();
    
            for (int i = 0; i < 10; i++) {
                threads.add(new Thread(t::m, "thread-" + i));
            }
    
            threads.forEach((o) -> o.start());
    
            threads.forEach((o) -> {
                try {
                    o.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            System.out.println(t.count);
        }
    }
    

      

    运行结果:100000

    AtomicInteger:原子性操作的int类型;
    incrementAndGet(): 原子方法,你可以认为它是加了synchronized的,当然它内部实现不是用synchronized的而是用系统相当底层的实现来去完成的;它的效率要比synchronized高很多;

    十二、synchronized优化

    /**
     * 锁定某对象o,如果o的属性发生改变,不影响锁的使用
     * 但是如果o变成另外一个对象,则锁定的对象发生改变
     * 应该避免将锁定对象的引用变成另外的对象
     * @author mashibing
     */
    package yxxy.c_017;
    
    import java.util.concurrent.TimeUnit;
    
    public class T {
        Object o = new Object();
    
        void m() {
            synchronized(o) {
                while(true) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                }
            }
        }
        
        public static void main(String[] args) {
            T t = new T();
            //启动第一个线程
            new Thread(t::m, "t1").start();
            
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //创建第二个线程
            Thread t2 = new Thread(t::m, "t2");
            
            t.o = new Object(); //锁对象发生改变,所以t2线程得以执行,如果注释掉这句话,线程2将永远得不到执行机会
            
            t2.start();
        }
    }
    

      

    分析:

    m2()的并发效率要比m1()高不少;细粒度的锁执行效率要比粗粒度的锁执行效率要高不少;

    十三、避免将锁定对象的引用变成另外的对象,例子:

    /**
     * 锁定某对象o,如果o的属性发生改变,不影响锁的使用
     * 但是如果o变成另外一个对象,则锁定的对象发生改变
     * 应该避免将锁定对象的引用变成另外的对象
     * @author mashibing
     */
    package yxxy.c_017;
    
    import java.util.concurrent.TimeUnit;
    
    public class T {
        Object o = new Object();
    
        void m() {
            synchronized(o) {
                while(true) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                }
            }
        }
        
        public static void main(String[] args) {
            T t = new T();
            //启动第一个线程
            new Thread(t::m, "t1").start();
            
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //创建第二个线程
            Thread t2 = new Thread(t::m, "t2");
            
            t.o = new Object(); //锁对象发生改变,所以t2线程得以执行,如果注释掉这句话,线程2将永远得不到执行机会
            
            t2.start();
        }
    }
    

      

    分析:

    t.o = new Object();锁的对象发生改变,就不需要锁原来的对象,直接锁新对象就行了;而新对象还没有锁的,所以t2线程就被执行了;
    所以,这就证明这个锁是锁在什么地方?是锁在堆内存里new出来的对象上,不是锁在栈内存里头o的引用,不是锁的引用,而是锁new出来的真正的对象;
    锁的信息是记录在堆内存里的。


    十四、不要以字符串常量作为锁定对象

    /**
     * 不要以字符串常量作为锁定对象
     * 在下面的例子中,m1和m2其实锁定的是同一个对象
     * 这种情况还会发生比较诡异的现象,比如你用到了一个类库,在该类库中代码锁定了字符串“Hello”,
     * 但是你读不到源码,所以你在自己的代码中也锁定了"Hello",这时候就有可能发生非常诡异的死锁阻塞,
     * 因为你的程序和你用到的类库不经意间使用了同一把锁
     * 
     * jetty
     * 
     * @author mashibing
     */
    package yxxy.c_018;
    
    public class T {
        
        String s1 = "Hello";
        String s2 = "Hello";
    
        void m1() {
            synchronized(s1) {
                
            }
        }
        
        void m2() {
            synchronized(s2) {
                
            }
        }
    }
    

      

     十五:分析一道面试题

    /**
     * 曾经的面试题:(淘宝?)
     * 实现一个容器,提供两个方法,add,size
     * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
     * 
     * 分析下面这个程序,能完成这个功能吗?
     * @author mashibing
     */
    package yxxy.c_019;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    
    public class MyContainer1 {
    
        List lists = new ArrayList();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
        
        public static void main(String[] args) {
            MyContainer1 c = new MyContainer1();
    
            new Thread(() -> {
                for(int i=0; i<10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);
                    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1").start();
            
            new Thread(() -> {
                while(true) {
                    if(c.size() == 5) {
                        break;
                    }
                }
                System.out.println("t2 结束");
            }, "t2").start();
        }
    }
    

      

    分析:
    不能完成这个功能;
    添加volatile关键字,修改为如下:
    添加volatile:
    /**
     * 曾经的面试题:(淘宝?)
     * 实现一个容器,提供两个方法,add,size
     * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
     * 
     * 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢?
     * @author mashibing
     */
    package yxxy.c_019;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    
    public class MyContainer2 {
    
        //添加volatile,使t2能够得到通知
        volatile List lists = new ArrayList();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
        
        public static void main(String[] args) {
            MyContainer2 c = new MyContainer2();
    
            new Thread(() -> {
                for(int i=0; i<10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);
                    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1").start();
            
            new Thread(() -> {
                while(true) {
                    if(c.size() == 5) {
                        break;
                    }
                }
                System.out.println("t2 结束");
            }, "t2").start();
        }
    }
    

      

    但是上面代码还存在两个问题:

    1)由于没加同步,c.size()等于5的时候,假如另外一个线程又往上增加了1个,实际上这时候已经等于6了才break,所以不是很精确;

    2)浪费CPU,t2线程的死循环很浪费cpu

     使用wait和notify

    /**
     * 曾经的面试题:(淘宝?)
     * 实现一个容器,提供两个方法,add,size
     * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
     * 
     * 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢?
     * 
     * 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁
     * 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以
     * 
     * 阅读下面的程序,并分析输出结果
     * 可以读到输出结果并不是size=5时t2退出,而是t1结束时t2才接收到通知而退出
     * 想想这是为什么?
     * @author mashibing
     */
    package yxxy.c_019;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    
    public class MyContainer3 {
    
        //添加volatile,使t2能够得到通知
        volatile List lists = new ArrayList();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
        
        public static void main(String[] args) {
            MyContainer3 c = new MyContainer3();
            
            final Object lock = new Object();
            
            new Thread(() -> {
                synchronized(lock) {
                    System.out.println("t2启动");
                    if(c.size() != 5) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("t2 结束");
                }
                
            }, "t2").start();
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
    
            new Thread(() -> {
                System.out.println("t1启动");
                synchronized(lock) {
                    for(int i=0; i<10; i++) {
                        c.add(new Object());
                        System.out.println("add " + i);
                        
                        if(c.size() == 5) {
                            lock.notify();
                        }
                        
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "t1").start();
            
            
        }
    }
    

      

    分析:

    1)解释wait和notify,notifyAll方法:

     wait:让正在运行的线程进入等待状态,并且释放锁

    notify:唤醒某个正在等待的线程,不能精确换新某个线程

    notifyAll:唤醒所有正在等待的线程

    2)为什么size=5了,t2线程没有结束?
    由于notify不会释放锁,即便你通知了t2,让它起来了,它起来之后想往下运行,wait了之后想重新继续往下运行是需要重新得到lock这把锁的,可是很不幸的是t1已经把这个锁锁定了,所以只有等t1执行完了,t2才会继续执行。
    /**
     * 曾经的面试题:(淘宝?)
     * 实现一个容器,提供两个方法,add,size
     * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
     * 
     * 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢?
     * 
     * 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁
     * 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以
     * 
     * 阅读下面的程序,并分析输出结果
     * 可以读到输出结果并不是size=5时t2退出,而是t1结束时t2才接收到通知而退出
     * 想想这是为什么?
     * 
     * notify之后,t1必须释放锁,t2退出后,也必须notify,通知t1继续执行
     * 整个通信过程比较繁琐
     * @author mashibing
     */
    package yxxy.c_019;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    
    public class MyContainer4 {
    
        //添加volatile,使t2能够得到通知
        volatile List lists = new ArrayList();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
        
        public static void main(String[] args) {
            MyContainer4 c = new MyContainer4();
            
            final Object lock = new Object();
            
            new Thread(() -> {
                synchronized(lock) {
                    System.out.println("t2启动");
                    if(c.size() != 5) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("t2 结束");
                    //通知t1继续执行
                    lock.notify();
                }
                
            }, "t2").start();
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
    
            new Thread(() -> {
                System.out.println("t1启动");
                synchronized(lock) {
                    for(int i=0; i<10; i++) {
                        c.add(new Object());
                        System.out.println("add " + i);
                        
                        if(c.size() == 5) {
                            lock.notify();
                            //释放锁,让t2得以执行
                            try {
                                lock.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "t1").start();
            
            
        }
    }
    

      流程图:

     

    使用门闩
    CountDownLatch(1),CountDown往下数,当1变为0的时候门闩就开了,latch.countDown()调用一次数就往下-1;
    latch.await(),门闩的等待是不需要锁定任何对象的;
    /**
     * 曾经的面试题:(淘宝?)
     * 实现一个容器,提供两个方法,add,size
     * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
     * 
     * 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢?
     * 
     * 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁
     * 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以
     * 
     * 阅读下面的程序,并分析输出结果
     * 可以读到输出结果并不是size=5时t2退出,而是t1结束时t2才接收到通知而退出
     * 想想这是为什么?
     * 
     * notify之后,t1必须释放锁,t2退出后,也必须notify,通知t1继续执行
     * 整个通信过程比较繁琐
     * 
     * 使用Latch(门闩)替代wait notify来进行通知
     * 好处是通信方式简单,同时也可以指定等待时间
     * 使用await和countdown方法替代wait和notify
     * CountDownLatch不涉及锁定,当count的值为零时当前线程继续运行
     * 当不涉及同步,只是涉及线程通信的时候,用synchronized + wait/notify就显得太重了
     * 这时应该考虑countdownlatch/cyclicbarrier/semaphore
     * @author mashibing
     */
    package yxxy.c_019;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class MyContainer5 {
    
        // 添加volatile,使t2能够得到通知
        volatile List lists = new ArrayList();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
    
        public static void main(String[] args) {
            MyContainer5 c = new MyContainer5();
    
            CountDownLatch latch = new CountDownLatch(1);
    
            new Thread(() -> {
                System.out.println("t2启动");
                if (c.size() != 5) {
                    try {
                        latch.await();
                        
                        //也可以指定等待时间
                        //latch.await(5000, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t2 结束");
    
            }, "t2").start();
    
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
    
            new Thread(() -> {
                System.out.println("t1启动");
                for (int i = 0; i < 10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);
    
                    if (c.size() == 5) {
                        // 打开门闩,让t2得以执行
                        latch.countDown();
                    }
    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            }, "t1").start();
    
        }
    }
    

      

    十六:ReentrantLock

    jdk里面提供了一个新的锁,是手工锁,它是用来替代synchronized的,叫ReentrantLock,重入锁,其实synchronized也是可重入的,但是这把锁是和synchronized是有区别的,ReentrantLock是用新的同步方法写的时候经常用的一个工具;
    复习之前讲的synchronized同步:
    /**
     * reentrantlock用于替代synchronized
     * 本例中由于m1锁定this,只有m1执行完毕的时候,m2才能执行
     * 这里是复习synchronized最原始的语义
     * @author mashibing
     */
    package yxxy.c_020;
    
    import java.util.concurrent.TimeUnit;
    
    public class ReentrantLock1 {
        synchronized void m1() {
            for(int i=0; i<10; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
            }
            
        }
        
        synchronized void m2() {
            System.out.println("m2 ...");
        }
        
        public static void main(String[] args) {
            ReentrantLock1 rl = new ReentrantLock1();
            new Thread(rl::m1).start();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(rl::m2).start();
        }
    }

     使用ReentrantLock完成同样功能

    /**
     * reentrantlock用于替代synchronized
     * 使用reentrantlock可以完成同样的功能
     * 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
     * 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
     * @author mashibing
     */
    package yxxy.c_020;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ReentrantLock2 {
        Lock lock = new ReentrantLock();
    
        void m1() {
            try {
                lock.lock(); //synchronized(this)
                for (int i = 0; i < 10; i++) {
                    TimeUnit.SECONDS.sleep(1);
    
                    System.out.println(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        void m2() {
            lock.lock();
            System.out.println("m2 ...");
            lock.unlock();
        }
    
        public static void main(String[] args) {
            ReentrantLock2 rl = new ReentrantLock2();
            new Thread(rl::m1).start();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(rl::m2).start();
        }
    }

     十七:RenntrantLock的tryLock

    /**
     * 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
     * @author mashibing
     */
    package yxxy.c_020;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ReentrantLock3 {
        Lock lock = new ReentrantLock();
    
        void m1() {
            try {
                lock.lock();
                for (int i = 0; i < 10; i++) {
                    TimeUnit.SECONDS.sleep(1);
    
                    System.out.println(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
         * 可以根据tryLock的返回值来判定是否锁定
         * 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
         */
        void m2() {
            /*
            boolean locked = lock.tryLock();
            System.out.println("m2 ..." + locked);
            if(locked) lock.unlock();
            */
            
            boolean locked = false;
            
            try {
                locked = lock.tryLock(5, TimeUnit.SECONDS);
                System.out.println("m2 ..." + locked);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if(locked) lock.unlock();
            }
            
        }
    
        public static void main(String[] args) {
            ReentrantLock3 rl = new ReentrantLock3();
            new Thread(rl::m1).start();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(rl::m2).start();
        }
    }
    1
    3
    5
    m2 ...false
    7
    9

    十八:ReentrantLock的lockInterruptibly方法

    /**
     * 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,
     * 在一个线程等待锁的过程中,可以被打断
     * 
     * @author mashibing
     */
    package yxxy.c_020;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.function.Function;
    
    public class ReentrantLock4 {
            
        public static void main(String[] args) {
            Lock lock = new ReentrantLock();
            
            
            Thread t1 = new Thread(()->{
                try {
                    lock.lock();
                    System.out.println("t1 start");
                    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
                    System.out.println("t1 end");
                } catch (InterruptedException e) {
                    System.out.println("interrupted!");
                } finally {
                    lock.unlock();
                }
            });
            t1.start();
            
            Thread t2 = new Thread(()->{
                try {
                    //lock.lock();
                    lock.lockInterruptibly(); //可以对interrupt()方法做出响应
                    System.out.println("t2 start");
                } catch (InterruptedException e) {
                    System.out.println("interrupted!");
                } finally {
                    lock.unlock();
                }
            });
            t2.start();
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            t2.interrupt(); //打断线程2的等待
            
        }
    }
    t1 start
    interrupted!
    Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
        at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
        at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
        at yxxy.c_020.ReentrantLock4.lambda$1(ReentrantLock4.java:42)
        at java.lang.Thread.run(Thread.java:745)

     分析:

    t1线程牢牢的拿到锁之后,一直sleep不会释放,如果t2线程中的run方法使用lock.lock(),那么t2线程就会一直傻傻的等着这把锁,不能被其他线程打断;

    而使用lockInterruptibly()方法是可以被打断的,主线程main调用t2.interrupt()来打断t2,告诉他是不会拿到这把锁的,别等了;

    报错是因为lock.unlock()这个方法报错的,因为都没有拿到锁,无法unlock();是代码的问题,应该判断有锁,已经锁定的情况下才lock.unlock();

     十九:ReentrantLock还可以指定为公平锁

    公平锁:等待时间长的线程先执行

    竞争锁:多个线程一起竞争一个锁

    竞争锁相对效率高

    /**
     * ReentrantLock还可以指定为公平锁
     * 
     * @author mashibing
     */
    package yxxy.c_020;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ReentrantLock5 extends Thread {
            
        private static ReentrantLock lock = new ReentrantLock(); //参数为true表示为公平锁,请对比输出结果
        
        public void run() {
            for(int i=0; i<100; i++) {
                lock.lock();
                try{
                    System.out.println(Thread.currentThread().getName()+"获得锁");
                }finally{
                    lock.unlock();
                }
            }
        }
        public static void main(String[] args) {
            ReentrantLock5 rl=new ReentrantLock5();
            Thread th1=new Thread(rl);
            Thread th2=new Thread(rl);
            th1.start();
            th2.start();
        }
    }

    二十:面试经典(生产者消费者问题) 

    要求:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用

    同步容器:多个线程共同访问的时候,不能出问题,就是要加锁了,下面这个是阻塞式的同步容器;
    代码:
    复制代码
    /**
     * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
     * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
     * 
     * 使用wait和notify/notifyAll来实现
     * 
     * @author mashibing
     */
    package yxxy.c_021;
    
    import java.util.LinkedList;
    import java.util.concurrent.TimeUnit;
    
    public class MyContainer1<T> {
        final private LinkedList<T> lists = new LinkedList<>();
        final private int MAX = 10; //最多10个元素
        private int count = 0;
        
        
        public synchronized void put(T t) {
            while(lists.size() == MAX) { //想想为什么用while而不是用if?
                try {
                    this.wait(); //effective java
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            lists.add(t);
            ++count;
            this.notifyAll(); //通知消费者线程进行消费
        }
        
        public synchronized T get() {
            T t = null;
            while(lists.size() == 0) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            t = lists.removeFirst();
            count --;
            this.notifyAll(); //通知生产者进行生产
            return t;
        }
        
        public static void main(String[] args) {
            MyContainer1<String> c = new MyContainer1<>();
            //启动消费者线程
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    for(int j=0; j<5; j++) System.out.println(c.get());
                }, "c" + i).start();
            }
            
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            //启动生产者线程
            for(int i=0; i<2; i++) {
                new Thread(()->{
                    for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
                }, "p" + i).start();
            }
        }
    }

    1.为什么用while而不用if?

    假设容器中已经满了,如果用的是if,这个线程A发现list.size()==max已经满了,就this.wait()住了;
    如果容器中被拿走了元素,线程A被叫醒了,它会从this.wait()开始继续往下运行,准备执行lists.add(),可是它被叫醒了之后还没有往里扔的时候,另外一个线程往list里面扔了一个,线程A拿到锁之后不再进行if判断,而是继续执行lists.add()就会出问题了;
    如果用while,this.wait()继续往下执行的时候需要在while中再检查一遍,就不会出问题;
     
    2.put()方法中为什么使用notifyAll而不是notify?
    如果使用notify,notify是叫醒一个线程,那么就有可能叫醒的一个线程又是生产者,整个程序可能不动了,都wait住了;

    使用wati和notify写线程程序的时候写起来会比较费劲,使用Lock和Condition

    /**
     * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
     * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
     * 
     * 使用Lock和Condition来实现
     * 对比两种方式,Condition的方式可以更加精确的指定哪些线程被唤醒
     * 
     * @author mashibing
     */
    package yxxy.c_021;
    
    import java.util.LinkedList;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class MyContainer2<T> {
        final private LinkedList<T> lists = new LinkedList<>();
        final private int MAX = 10; //最多10个元素
        private int count = 0;
        
        private Lock lock = new ReentrantLock();
        private Condition producer = lock.newCondition();
        private Condition consumer = lock.newCondition();
        
        public void put(T t) {
            try {
                lock.lock();
                while(lists.size() == MAX) {
                    producer.await();
                }
                
                lists.add(t);
                ++count;
                consumer.signalAll(); //通知消费者线程进行消费
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        
        public T get() {
            T t = null;
            try {
                lock.lock();
                while(lists.size() == 0) {
                    consumer.await();
                }
                t = lists.removeFirst();
                count --;
                producer.signalAll(); //通知生产者进行生产
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            return t;
        }
        
        public static void main(String[] args) {
            MyContainer2<String> c = new MyContainer2<>();
            //启动消费者线程
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    for(int j=0; j<5; j++){
                        System.out.println(c.get());
                    }
                }, "c" + i).start();
            }
            
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            //启动生产者线程
            for(int i=0; i<2; i++) {
                new Thread(()->{
                    for(int j=0; j<25; j++) {
                        c.put(Thread.currentThread().getName() + " " + j);
                    }
                }, "p" + i).start();
            }
        }
    }

    使用lock和condition好处在于可以精确的通知那些线程被叫醒,哪些线程不必被叫醒,这个效率显然要比notifyAll把所有线程全叫醒要高很多。

     二十一:ThreadLocal

    /**
     * ThreadLocal线程局部变量
     */
    package yxxy.c_022;
    
    import java.util.concurrent.TimeUnit;
    
    public class ThreadLocal1 {
        volatile static Person p = new Person();
        
        public static void main(String[] args) {
                    
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                System.out.println(p.name);
            }).start();
            
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                p.name = "lisi";
            }).start();
        }
    }
    
    class Person {
        String name = "zhangsan";
    }
    现在这两个线程是互相影响的;第二个线程改了名字之后,第一个线程就能读的到了;
    有的时候就想线程2的改变,不想让线程1知道,这时候怎么做?
    /**
     * ThreadLocal线程局部变量
     *
     * ThreadLocal是使用空间换时间,synchronized是使用时间换空间
     * 比如在hibernate中session就存在与ThreadLocal中,避免synchronized的使用
     *
     * 运行下面的程序,理解ThreadLocal
     */
    package yxxy.c_022;
    
    import java.util.concurrent.TimeUnit;
    
    public class ThreadLocal2 {
        static ThreadLocal<Person> tl = new ThreadLocal<>();
        
        public static void main(String[] args) {
                    
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                System.out.println(tl.get());
            }).start();
            
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                tl.set(new Person());
            }).start(); 
        }
        
        static class Person {
            String name = "zhangsan";
        }
    }

    console输出:null

    ThreadLocal的意思就是,tl里面的变量,自己的线程自己用;你别的线程里要想用的话,不好意思你自己往里扔;不能用我线程里面放的东西;相当于每个线程都有自己的变量,互相之间不会产生冲突;
    可以理解为person对象每个线程里面拷贝了一份,改的都是自己那份,都是自己线程本地的变量,所以空间换时间;ThreadLocal在效率上会更高一些;
    有一些需要加锁的对象,如果它们在使用的时候自己进行的改变,自己维护这个状态,不用通知其他线程,那么这个时候可以使用ThreadLocal;

     二十二:高并发容器

    一、需求背景:          

    有N张火车票,每张票都有一个编号,同时有10个窗口对外售票, 请写一个模拟程序。

    分析下面的程序可能会产生哪些问题?重复销售?超量销售?

    /**
     * 有N张火车票,每张票都有一个编号
     * 同时有10个窗口对外售票
     * 请写一个模拟程序
     * 
     * 分析下面的程序可能会产生哪些问题?
     * 重复销售?超量销售?
     * 
     * @author 马士兵
     */
    package yxxy.c_024;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class TicketSeller1 {
        static List<String> tickets = new ArrayList<>();
        
        static {
            for(int i=0; i<10000; i++) tickets.add("票编号:" + i);
        }
        
        public static void main(String[] args) {
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(tickets.size() > 0) {
                        System.out.println("销售了--" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }

     可能卖重;一张票可能对多个线程同时remove(0),所以可能一张票被卖出去多次;也可能最后一张票的时候都被多个线程remove(),程序会报错,总之,不加锁是不行的。

    ArrayList不是同步的,remove、add等各种方法全都不是同步的;一定会出问题;

    二、使用Vector

    /**
     * 使用Vector或者Collections.synchronizedXXX
     * 分析一下,这样能解决问题吗?
     * 
     * @author 马士兵
     */
    package yxxy.c_024;
    
    import java.util.Vector;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller2 {
        static Vector<String> tickets = new Vector<>();
        
        static {
            for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
        }
        
        public static void main(String[] args) {
            
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(tickets.size() > 0) {
                        
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        
                        System.out.println("销售了--" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }

     Vector是一个同步容器,所有的方法都是加锁的;

    虽然说在Vector里面remove方法是原子的,但是while条件中判断和remove是分离的;如果在while条件和remove之间被打断的话,问题依旧;(假设剩下最后一张票,多个线程争抢同一张票,每一个线程判断的size大于0,虽然size和remove都是原子性的,但是在判断和remove中间的这段过程中,还是可能被打断,A线程判断了size>0,还没有remove的时候被打断了,B线程把票拿走了,A线程继续往下执行的时候再remove就出问题了。)
    所以只是把List换成同步容器Vector,问题依旧;

     三、使用synchronized加锁:

    /**
     * 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
     * 就像这个程序,判断size和进行remove必须是一整个的原子操作
     * 
     * @author 马士兵
     */
    package yxxy.c_024;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller3 {
        static List<String> tickets = new LinkedList<>();
        
        
        static {
            for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
        }
        
        public static void main(String[] args) {
            
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(true) {
                        synchronized(tickets) {
                            if(tickets.size() <= 0) break;
                            
                            try {
                                TimeUnit.MILLISECONDS.sleep(10);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            
                            System.out.println("销售了--" + tickets.remove(0));
                        }
                    }
                }).start();
            }
        }
    }
    相当于把判断和销售都加到了一个原子操作里去了;可以解决问题;
    不过加锁后效率并不是很高;每销售一张票的时候都要把整个队列tickets锁定;

     四、使用ConcurrentLinkedQueue提供并发性

    /**
     * 使用ConcurrentQueue提高并发性
     * 
     * @author 马士兵
     */
    package yxxy.c_024;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class TicketSeller4 {
        static Queue<String> tickets = new ConcurrentLinkedQueue<>();
        
        static {
            for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
        }
        
        public static void main(String[] args) {
            
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(true) {
                        String s = tickets.poll();
                        if(s == null) {
                            break;
                        }else {
                            System.out.println("销售了--" + s);
                        }
                    }
                }).start();
            }
        }
    }
    这里面没有加锁,同样的也有判断,但是这个不会出问题;为什么?
    因为在做了s==null判断后,再也没有对队列进行修改操作;(上个程序都是做了判断之后,需要对队列进行修改操作remove一下)
    假如A线程执行完String s = tickets.poll(),还没有来得及执行if(s==null) break就被打断了,另外一个线程把队列拿空了,大不了while(true)返过头来再拿一遍得到null,所以不会出问题;

     五、ConcurrentHashMap

    复制代码
    /**
     * http://blog.csdn.net/sunxianghuang/article/details/52221913 
     * http://www.educity.cn/java/498061.html
     * 阅读concurrentskiplistmap
     */
    package yxxy.c_025;
    
    import java.util.Arrays;
    import java.util.Hashtable;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    import java.util.concurrent.CountDownLatch;
    
    public class T01_ConcurrentMap {
        public static void main(String[] args) {
    //        Map<String, String> map = new ConcurrentHashMap<>();
            Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序
            
    //        Map<String, String> map = new Hashtable<>();
            //Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX
            //TreeMap
            Random r = new Random();
            Thread[] ths = new Thread[100];
            CountDownLatch latch = new CountDownLatch(ths.length);
            long start = System.currentTimeMillis();
            for(int i=0; i<ths.length; i++) {
                ths[i] = new Thread(()->{
                    for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000));
                    latch.countDown();
                });
            }
            
            Arrays.asList(ths).forEach(t->t.start());
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            long end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }
    不同的Map容器执行完这段代码的时间:
    HashTable:445;
    ConcurrentHashMap:402;
     
    多线程的环境下ConcurrentHashMap的效率要比hashTable高一些,高在哪?
    hashTable往里加任何一个数据的时候,都是要锁定整个hashTable对象,而concurrentHashMap默认的是把容器分成16段,每次往里插数据的时候只锁定16段其中的一个部分;把锁细化了;当很多线程共同往里插数据的时候,线程A插的是其中一段,线程B是往另一段里插,那么这两个线程就可以同时并发的往里插;因此多线程环境下要比hashTable高;
     
    ConcurrentSkipListMap:是支持排序的,所以插入的时候慢了一些;
    Collections.synchronizedList/Collections.synchronizedMap(Map<K, V>):往里面传一个不加锁的Map,将它包装一下,返回一个加了锁的Map;
     
    注:以上所有的map,都可以换成set;因为set只是使用了map的key。
    注2:currentHashMap不是绝对的线程安全(在put的时候get会出问题(生产环境实验所得))

     六:copyOnWriteList

    复制代码
    /**
     * 写时复制容器 copy on write
     * 多线程环境下,写时效率低,读时效率高
     * 适合写少读多的环境
     * @author 马士兵
     */
    package yxxy.c_025;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    import java.util.Vector;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    public class T02_CopyOnWriteList {
        public static void main(String[] args) {
            List<String> lists = 
                    //new ArrayList<>(); //这个会出并发问题!
                    //new Vector();
                    new CopyOnWriteArrayList<>();
            Random r = new Random();
            Thread[] ths = new Thread[100];
            
            for(int i=0; i<ths.length; i++) {
                Runnable task = new Runnable() {
        
                    @Override
                    public void run() {
                        for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000));
                    }
                    
                };
                ths[i] = new Thread(task);
            }
            
            runAndComputeTime(ths);
            
            System.out.println(lists.size());
        }
        
        static void runAndComputeTime(Thread[] ths) {
            long s1 = System.currentTimeMillis();
            Arrays.asList(ths).forEach(t->t.start());
            Arrays.asList(ths).forEach(t->{
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            long s2 = System.currentTimeMillis();
            System.out.println(s2 - s1);
        }
    }

    用于读少写多的场景 

     七、ConcurrentLinkedQueue:

    package yxxy.c_025;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class T04_ConcurrentQueue {
        public static void main(String[] args) {
            Queue<String> strs = new ConcurrentLinkedQueue<>();
            
            for(int i=0; i<10; i++) {
                strs.offer("a" + i);  //add
            }
            
            System.out.println(strs);
            
            System.out.println(strs.size());
            
            System.out.println(strs.poll());
            System.out.println(strs.size());
            
            System.out.println(strs.peek());
            System.out.println(strs.size());
            
            //双端队列Deque
        }
    }
    [a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
    10
    a0
    9
    a1
    9
    Queue:队列,在并发容器里面最重要的也是应用的最多的容器;有很多种实现,ConcurrentLinkedQueue,BlockingQueue;
    常见操作:
    offer: 类似于add方法,但是add方法加的时候会出问题,如果有容量的限制话add就会抛异常;offer不会抛异常,返回值boolean代表是否加成功;
    poll(): 从头部拿出来一个元素,同时把原来的删掉;
    peek(): 从头部拿出来一个,但是原来的不删;
    八、LinkedBlockingQueue和ArrayBlockingQueue
    package yxxy.c_025;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class T05_LinkedBlockingQueue {
    
        static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
    
        static Random r = new Random();
    
        public static void main(String[] args) {
            new Thread(() -> {
                for (int i = 0; i < 100; i++) {
                    try {
                        strs.put("a" + i); //如果满了,就会等待
                        TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "p1").start();
    
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    for (;;) {
                        try {
                            System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, "c" + i).start();
    
            }
        }
    }
    复制代码
    Queue在高并发的情况下可以使用两种队列:
    ConcurrentLinkedQueue:内部加锁的
    BlockingQueue:阻塞式队列,如LinkedBlockingQueue,ArrayBlockingQueue。阻塞式的意思是,生产者消费者模式中生产者已经生产满了直接等待wait,消费如果空了消费者就会直接等待。
    LinkedBockingQueue是链表实现的阻塞式容器,是无界队列(往里扔多少个元素都可以,内存满足的情况下)
    ArrayBlockingQueue:有界队列
    package yxxy.c_025;
    
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class T06_ArrayBlockingQueue {
    
        static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //有界队列,最多装10个元素
    
        static Random r = new Random();
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 10; i++) {
                strs.put("a" + i);
            }
            
            strs.put("aaa"); //满了就会等待,程序阻塞,无限制的阻塞下去
            //strs.add("aaa");  //报异常,Queue full
            //strs.offer("aaa"); //不会报异常,但是加不进去;boolean带表是否加成功;这是add和offer的区别 
            //strs.offer("aaa", 1, TimeUnit.SECONDS); //1s钟之后加不进去就加不进了;按时间段阻塞
            
            System.out.println(strs);
        }
    }
    复制代码

     九:DelayQueue

    package yxxy.c_025;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class T07_DelayQueue {
    
        static BlockingQueue<MyTask> tasks = new DelayQueue<>();
    
        static Random r = new Random();
        
        static class MyTask implements Delayed {
            long runningTime;
            
            MyTask(long rt) {
                this.runningTime = rt;
            }
    
            @Override
            public int compareTo(Delayed o) {
                if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                    return -1;
                else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                    return 1;
                else 
                    return 0;
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
            
            @Override
            public String toString() {
                return "" + runningTime;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            long now = System.currentTimeMillis();
            MyTask t1 = new MyTask(now + 1000);
            MyTask t2 = new MyTask(now + 2000);
            MyTask t3 = new MyTask(now + 1500);
            MyTask t4 = new MyTask(now + 2500);
            MyTask t5 = new MyTask(now + 500);
            
            tasks.put(t1);
            tasks.put(t2);
            tasks.put(t3);
            tasks.put(t4);
            tasks.put(t5);
            
            System.out.println(tasks);
            
            for(int i=0; i<5; i++) {
                System.out.println(tasks.take());
            }
        }
    }

    console

    [1534606492700, 1534606493200, 1534606493700, 1534606494700, 1534606494200]
    1534606492700
    1534606493200
    1534606493700
    1534606494200
    1534606494700
    DelayQueue:无界队列,加进去的每一个元素,如果理解为一个任务的话,这个元素什么时候可以让消费者往外拿呢?每一个元素记载着我还有多长时间可以从队列中被消费者拿走;这个队列默认是排好顺序的,等待的时间最长的排在最前面,先往外拿;
    DelayQueue往里添加的元素是要实现Delayed接口;

     可以用来执行定时任务;

    十、TransferQueue:  
    TransferQueue:提供了transfer方法,一般是这种情形,有一个队列,消费者线程先启动,然后生产者生产一个东西的时候不是往队列里头仍,它首先去找有没有消费者,如果有消费者,生产的东西不往队列里扔了而是直接给消费者消费;如果没有消费者的话,调用transfer线程就会阻塞;
     
    比如场景:坦克大战中多个坦克客户端链接服务器,坦克A移动了,服务端需要把A移动的位置消息发送给其他客户端,服务端存在一个消息队列,消息都交给不同的线程处理,有一种是都往消息队列里扔,然后再往外拿,不过这种太慢了;假如有一大推消费者线程等着,那么直接把消息扔给消费者线程就行了,不要再往队列里扔了,效率会更高一些;所以TransferQueue是用在更高的并发的情况下。
     
    例子程序:
    1.先起消费者,在起生产者transfer,程序正常:
    package yxxy.c_025;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    public class T08_TransferQueue {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
            
            new Thread(() -> {
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            
            strs.transfer("aaa");
        }
    }
    复制代码

     2.如果先起生产者transfer,然后再起消费者take,程序就会阻塞住了:

    package yxxy.c_025;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    public class T08_TransferQueue {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
            
            strs.transfer("aaa");
    
            new Thread(() -> {
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    3.如果transfer换成put(或者add、offer),也不会有问题,因为不会阻塞:

    package yxxy.c_025;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    public class T08_TransferQueue {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
            
            //strs.transfer("aaa");
            
            strs.put("aaa");
    
            new Thread(() -> {
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    十一、SynchronousQueue

    package yxxy.c_025;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class T09_SynchronusQueue { //容量为0
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> strs = new SynchronousQueue<>();
            
            new Thread(()->{
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            
            strs.put("aaa"); //阻塞等待消费者消费
            //strs.add("aaa");
            System.out.println(strs.size());
        }
    }
    SynchronousQueue:同步队列,一种特殊的transferQueue,前面说的TransferQueue如果生产者生产了东西,这时候没有消费者,如果使用put/add,还可以扔到队列里,这个队列还是有一定的容量的;
    而SynchronousQueue叫做没有容量的队列,容量为0,生产者生产的东西必须马上消费掉,如果不消费掉就会出问题;调add抛异常(Queue full),调put程序阻塞;

    总结:
    1:对于map/set的选择使用
    HashMap 不需要多线程的情况下使用
    TreeMap 不需要多线程的情况下使用
    LinkedHashMap 不需要多线程的情况下使用

    Hashtable 并发量比较小
    Collections.sychronizedXXX 并发量比较小

    ConcurrentHashMap 高并发
    ConcurrentSkipListMap 高并发同时要求排好顺序

    2:队列
    ArrayList 不需要同步的情况
    LinkedList 不需要同步的情况
    Collections.synchronizedXXX 并发量低
    Vector 并发量低
    CopyOnWriteList 写的时候少,读时候多
    Queue
    CocurrentLinkedQueue //concurrentArrayQueue 高并发队列
    BlockingQueue 阻塞式
    LinkedBQ 无界
    ArrayBQ 有界
    TransferQueue 直接给消费者线程,如果没有消费者阻塞
    SynchronusQueue 特殊的transferQueue,容量0
    DelayQueue执行定时任务

     

     二十三:高并发线程池

    一、认识Executor、ExecutorService、Callable、Executors

    /**
     * 认识Executor
     */
    package yxxy.c_026;
    
    import java.util.concurrent.Executor;
    
    public class T01_MyExecutor implements Executor {
    
        public static void main(String[] args) {
            new T01_MyExecutor().execute(new Runnable(){
    
                @Override
                public void run() {
                    System.out.println("hello executor");
                }
                
            });
        }
    
        @Override
        public void execute(Runnable command) {
            //new Thread(command).run();
            command.run();
        }
    
    }
    Executor执行器是一个接口,只有一个方法execute执行任务,在java的线程池的框架里边,这个是最顶层的接口;
    ExecutorService:从Executor接口继承。
    Callable:里面call方法,和Runnable接口很像,设计出来都是被其他线程调用的;但是Runnable接口里面run方法是没有返回值的也不能抛出异常;而call方法有返回值可以抛异常;
    Executors: 操作Executor的一个工具类;以及操作ExecutorService,ThreadFactory,Callable等;

     二、ThreadPool:

    /**
     * 线程池的概念
     */
    package yxxy.c_026;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class T05_ThreadPool {
        public static void main(String[] args) throws InterruptedException {
            ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
            for (int i = 0; i < 6; i++) {
                service.execute(() -> {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                });
            }
            System.out.println(service);
            
            service.shutdown();
            System.out.println(service.isTerminated());
            System.out.println(service.isShutdown());
            System.out.println(service);
            
            TimeUnit.SECONDS.sleep(5);
            System.out.println(service.isTerminated());
            System.out.println(service.isShutdown());
            System.out.println(service);
        }
    }
    创建了一个线程池,扔了5个线程,接下来要执行6个任务,扔进去线程池里面就启一个线程帮你执行一个,因为这里最多就起5个线程,接下来扔第6个任务的时候,不好意思,它排队了,排在线程池所维护的一个任务队列里面,任务队列大多数使用的都是BlockingQueue,这是线程池的概念;
    有什么好处?好处在于如果这个任务执行完了,这个线程不会消失,它执行完任务空闲下来了,如果有新的任务来的时候,直接交给这个线程来运行就行了,不需要新启动线程;从这个概念上讲,如果你的任务和线程池线程数量控制的比较好的情况下,你不需要启动新的线程就能执行很多很多的任务,效率会比较高,并发性好;
     
    service.shutdown():关闭线程池,shutdown是正常的关闭,它会等所有的任务都执行完才会关闭掉;还有一个是shutdownNow,二话不说直接就给关了,不管线程有没有执行完;
    service.isTerminated(): 代表的是这里所有执行的任务是不是都执行完了。isShutdown()为true,注意它关了但并不代表它执行完了,只是代表正在关闭的过程之中(注意打印Shutting down)
    打印5个线程名字,而且第一个线程执行完了之后,第6个任务来了,第1个线程继续执行,不会有线程6;
     
    当所有线程全部执行完毕之后,线程池的状态为Terminated,表示正常结束,complete tasks=6
     
    线程池里面维护了很多线程,等着你往里扔任务,而扔任务的时候它可以维护着一个任务列表,还没有被执行的任务列表,同样的它还维护着另外一个队列,complete tasks,结束的任务队列,任务执行结束扔到这个队列里,所以,一个线程池维护着两个队列;

     三、Future 

    /**
     * 认识future
     */
    package yxxy.c_026;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeUnit;
    
    public class T06_Future {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            /*FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>(){
                @Override
                public Integer call() throws Exception {
                    TimeUnit.MILLISECONDS.sleep(3000);
                    return 1000;
                }
            });*/
            
            FutureTask<Integer> task = new FutureTask<>(()->{
                TimeUnit.MILLISECONDS.sleep(3000);
                return 1000;
            });
            
            new Thread(task).start();
            
            System.out.println(task.get()); //阻塞
            
            //*******************************
            ExecutorService service = Executors.newFixedThreadPool(5);
            Future<Integer> f = service.submit(()->{
                TimeUnit.MILLISECONDS.sleep(5000);
                return 1;
            });
            System.out.println(f.isDone());
            System.out.println(f.get());
            System.out.println(f.isDone());
            
        }
    }

    console

    1000
    false
    1
    true


     Future: ExecutorService里面有submit方法,它的返回值是Future类型,因为你扔一个任务进去需要执行一段时间,未来的某一个时间点上,任务执行完了产生给你一个结果,这个Future代表的就是那个Callable的返回值;

    四、并行计算的例子:

    /**
     * 线程池的概念
     * nasa
     */
    package yxxy.c_026;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class T07_ParallelComputing {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            long start = System.currentTimeMillis();
            List<Integer> results = getPrime(1, 200000); 
            long end = System.currentTimeMillis();
            System.out.println(end - start);
            
            final int cpuCoreNum = 4;
            
            ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
            
            MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20(越大的数计算是不是素数的时间越长)
            MyTask t2 = new MyTask(80001, 130000);
            MyTask t3 = new MyTask(130001, 170000);
            MyTask t4 = new MyTask(170001, 200000);
            
            Future<List<Integer>> f1 = service.submit(t1);
            Future<List<Integer>> f2 = service.submit(t2);
            Future<List<Integer>> f3 = service.submit(t3);
            Future<List<Integer>> f4 = service.submit(t4);
            
            start = System.currentTimeMillis();
            f1.get();
            f2.get();
            f3.get();
            f4.get();
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
        
        static class MyTask implements Callable<List<Integer>> {
            int startPos, endPos;
            
            MyTask(int s, int e) {
                this.startPos = s;
                this.endPos = e;
            }
            
            @Override
            public List<Integer> call() throws Exception {
                List<Integer> r = getPrime(startPos, endPos);
                return r;
            }
            
        }
        
        //判断是否是质数
        static boolean isPrime(int num) {
            for(int i=2; i<=num/2; i++) {
                if(num % i == 0) return false;
            }
            return true;
        }
        
        static List<Integer> getPrime(int start, int end) {
            List<Integer> results = new ArrayList<>();
            for(int i=start; i<=end; i++) {
                if(isPrime(i)) results.add(i);
            }
            
            return results;
        }
    }
    复制代码

     

     console:

    2280
    818

    五、CachedThreadPool 

    package yxxy.c_026;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class T08_CachedPool {
        public static void main(String[] args) throws InterruptedException {
            ExecutorService service = Executors.newCachedThreadPool();
            System.out.println(service);
            
            for (int i = 0; i < 2; i++) {
                service.execute(() -> {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                });
            }
            
            System.out.println(service);
            
            TimeUnit.SECONDS.sleep(80); //cachedthreadPool里面的线程空闲状态默认60s后销毁,这里保险起见
            
            System.out.println(service);
            
            
        }
    }

     console

    java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
    pool-1-thread-2
    pool-1-thread-1
    java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
    FixedThreadPool为固定个数的线程池;
    CachedThreadPool:刚开始一个线程都没有,来一个任务就起一个线程,假设起了两个线程A,B,如果来了第三个任务,这时候恰好线程B任务执行完了,线程池里面有空闲的,这时候直接让线程池里空闲的线程B来执行;最多起多少个线程?你的系统能支撑多少个为止;默认的情况下,只要一个线程空闲的状态超过60s,这个线程就自动的销毁了,alivetime=60s;这个值也可以自己指定。

     六、SingleThreadExecutor

    package yxxy.c_026;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class T09_SingleThreadPool {
        public static void main(String[] args) {
            ExecutorService service = Executors.newSingleThreadExecutor();
            for(int i=0; i<5; i++) {
                final int j = i;
                service.execute(()->{
                    
                    System.out.println(j + " " + Thread.currentThread().getName());
                });
            }
        }
    }
    0 pool-1-thread-1
    1 pool-1-thread-1
    2 pool-1-thread-1
    3 pool-1-thread-1
    4 pool-1-thread-1
    SingleThreadExecutor:线程池里就1个线程;扔5个任务,也永远只有1个线程执行;
    它能保证任务前后一定是顺序执行,先扔的任务一定先执行完;只有等第一个任务执行完才执行第二个任务
    用于顺序执行任务

    七、ScheduledThreadPool 

    package yxxy.c_026;
    
    import java.util.Random;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class T10_ScheduledPool {
        public static void main(String[] args) {
            ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
            service.scheduleAtFixedRate(()->{
                try {
                    TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }, 0, 500, TimeUnit.MILLISECONDS);
        }
    }

    用于定时重复执行 某个任务

     八、WorkStealingPool(工作窃取线程池,为精灵线程)

    /**
     *
     */
    package yxxy.c_026;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class T11_WorkStealingPool {
        public static void main(String[] args) throws IOException {
            ExecutorService service = Executors.newWorkStealingPool();
            int count = Runtime.getRuntime().availableProcessors();    //看cpu多少核的;如果是4核,默认就帮你起4个线程
            System.out.println(count);    
            
            service.execute(new R(1000));
            for(int i=0; i<count; i++){
                service.execute(new R(2000));
            }
            
            //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
            System.in.read(); 
        }
    
        static class R implements Runnable {
            int time;
    
            R(int t) {
                this.time = t;
            }
    
            @Override
            public void run() {
                try {
                    TimeUnit.MILLISECONDS.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                System.out.println(time  + " " + Thread.currentThread().getName());
            }
        }
    }

    console

    1000 ForkJoinPool-1-worker-1
    ForkJoinPool-1-worker-2
    ForkJoinPool-1-worker-0
    ForkJoinPool-1-worker-5
    ForkJoinPool-1-worker-3
    ForkJoinPool-1-worker-6
    ForkJoinPool-1-worker-7
    ForkJoinPool-1-worker-4
    ForkJoinPool-1-worker-1

     工作窃取线程池:本来执行完自己的线程应该变为等待状态,但是这个会去别的线程里面拿任务执行

     workStealing用于什么场景:就说任务分配的不是很均匀,有的线程维护的任务队列比较长,有些线程执行完任务就结束了不太合适,所以他执行完了之后可以去别的线程维护的队列里去偷任务,这样效率更高。

     九、ForkJoinPool(有点类似归并)

     ForkJoinPool: forkjoin的意思就是如果有一个难以完成的大任务,需要计算量特别大,时间特别长,可以把大任务切分成一个个小任务,如果小任务还是太大,它还可以继续分,至于分成多少你可以自己指定,... 分完之后,把结果进行合并,最后合并到一起join一起,产生一个总的结果。而里面任务的切分你可以自己指定,线程的启动根据你任务切分的规则,由ForkJoinPool这个线程池自己来维护。

     

    package yxxy.c_026;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Random;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    import java.util.concurrent.RecursiveTask;
    
    public class T12_ForkJoinPool {
        static int[] nums = new int[1000000];
        static final int MAX_NUM = 50000;
        static Random r = new Random();
        
        static {
            for(int i=0; i<nums.length; i++) {
                nums[i] = r.nextInt(100);
            }
            
            System.out.println(Arrays.stream(nums).sum()); //stream api 
        }
        
        
        static class AddTask extends RecursiveAction { 
            
            int start, end;
            
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected void compute() {
                
                if(end-start <= MAX_NUM) {
                    long sum = 0L;
                    for(int i=start; i<end; i++) sum += nums[i];
                    System.out.println("from:" + start + " to:" + end + " = " + sum);
                } else {
                    int middle = start + (end-start)/2;
                    AddTask subTask1 = new AddTask(start, middle);
                    AddTask subTask2 = new AddTask(middle, end);
                    subTask1.fork();
                    subTask2.fork();
                }
            }
        }
        
        public static void main(String[] args) throws IOException {
            ForkJoinPool fjp = new ForkJoinPool();
            AddTask task = new AddTask(0, nums.length);
            fjp.execute(task);
            
            System.in.read();
            
        }
    }
    复制代码
    49494882
    from:906250 to:937500 = 1545274
    from:968750 to:1000000 = 1537201
    from:593750 to:625000 = 1548289
    from:718750 to:750000 = 1546396
    from:468750 to:500000 = 1550373
    from:843750 to:875000 = 1543421
    from:218750 to:250000 = 1549856
    from:93750 to:125000 = 1548384
    from:562500 to:593750 = 1541814
    from:812500 to:843750 = 1547885
    from:187500 to:218750 = 1546831
    from:687500 to:718750 = 1554064
    from:437500 to:468750 = 1547434
    from:937500 to:968750 = 1547676
    from:875000 to:906250 = 1551839
    from:62500 to:93750 = 1548576
    from:531250 to:562500 = 1550943
    from:656250 to:687500 = 1544991
    from:156250 to:187500 = 1548367
    from:406250 to:437500 = 1539881
    from:125000 to:156250 = 1548128
    from:500000 to:531250 = 1545229
    from:781250 to:812500 = 1544296
    from:625000 to:656250 = 1545283
    from:375000 to:406250 = 1553931
    from:31250 to:62500 = 1544024
    from:750000 to:781250 = 1543573
    from:343750 to:375000 = 1546407
    from:0 to:31250 = 1539743
    from:281250 to:312500 = 1549470
    from:312500 to:343750 = 1552190
    from:250000 to:281250 = 1543113
    例子解释:
    对数组中100万个数求和计算,第一种方式是普通的将所有数加在一起(for循环);
    第二种方式使用ForkJoinPool计算,分而治之,它里面执行的任务必须是ForkJoinTask,这个任务可以自动进行切分,一般用的时候从RecursiveAction或RecursiveTask继承,RecursiveTask递归任务,因为它切分任务还可以在切分。RecursiveAction没有返回值,RecursiveTask有返回值。
     
    例子2:
    package yxxy.c_026;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Random;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    import java.util.concurrent.RecursiveTask;
    
    public class T12_ForkJoinPool {
        static int[] nums = new int[1000000];
        static final int MAX_NUM = 50000;
        static Random r = new Random();
        
        static {
            for(int i=0; i<nums.length; i++) {
                nums[i] = r.nextInt(100);
            }
            
            System.out.println(Arrays.stream(nums).sum()); //stream api 
        }
        
        
        static class AddTask extends RecursiveTask<Long> {
            
            int start, end;
            
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected Long compute() {
                
                if(end-start <= MAX_NUM) {
                    long sum = 0L;
                    for(int i=start; i<end; i++) sum += nums[i];
                    return sum;
                } 
                
                int middle = start + (end-start)/2;
                
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
                
                return subTask1.join() + subTask2.join();
            }
        }
        
        public static void main(String[] args) throws IOException {
            ForkJoinPool fjp = new ForkJoinPool();
            AddTask task = new AddTask(0, nums.length);
            fjp.execute(task);
            
            long result = task.join();
            System.out.println(result);
        }
    }
    49498457
    49498457
    和例子1差不多,唯一的区别是有返回值了,RecursiveTask<V>中的V泛型就是返回值类型。
    long result = task.join(),因为join本身就是阻塞的,只有等所有的都执行完了,最后才得出总的执行结果。所以不需要System.in.read了;
  • 相关阅读:
    第二十三篇 jQuery 学习5 添加元素
    第二十二篇 jQuery 学习4 内容和属性
    第二十一篇 jQuery 学习3 特效效果
    第二十篇 jQuery 初步学习2
    第十九篇 jQuery初步学习
    第十八篇 JS传参数
    第十七篇 JS验证form表单
    第十六篇 JS实现全选操作
    第十五篇 JS 移入移出事件 模拟一个二级菜单
    第十四篇 JS实现加减乘除 正则表达式
  • 原文地址:https://www.cnblogs.com/xhlwjy/p/11244574.html
Copyright © 2011-2022 走看看