zoukankan      html  css  js  c++  java
  • 多线程

    线程的启动、常用方法

    1. MyThread extends Thread    new MyThread().start()
    2. MyRun implements Runable new Thread(new MyRun()).start()
    3. new Thread(()->{}).start()
    4. MyCall implements Callable<String> new Thread(new FutureTask<String>(new MyCall())).start
    5. ExecutorService service = Executors.newCachedThreadPool(); service.execute(()->{sout}); service.shutdown();

    线程并列执行:join()

    线程状态

    1. start()之前是 新建状态
    2. Runable = Ready就绪状态 + Running运行状态
    3. TimedWaiting 等待
    4. Waiting 等待
    5. Blocked 阻塞
    6. Teminated 结束状态

     

    锁的概念

    1.访问某一段代码,同一时间需要一个线程操作,就需要加锁
    2.锁的是对象不是代码

    锁的特性

    1.加了synchronized 就不用了加volatile(访问不到其他线程修改的内容),因为sync保证了原子性、可见性
    2.锁定方法和非锁定方法 可以同时执行

    银行账户程序加锁

    写加锁、读不加锁、会产生脏读、具体情况得看业务

    可重入锁

    1.  m1方法加synchronized 、m2方法加synchronized 。方法1可以调用方法2。也就是同一个线程不断地加锁
    2.  为什么?父类方法加锁,子类继承重写父类的方法也必须加锁,如果不能继承,那么父子之间的继承就死锁了

    异常和锁

    1. 默认情况锁会被释放,等待抢锁的会乱入

    synchronized(XXX)

    不能用String、Integer、Long、

    synchronized

    1.锁的是对象不是代码
    2.this xx.class
    3.锁定方法和非锁定方法 同事执行
    4.锁升级四种: 无锁、偏向锁、自旋锁、重量级锁
      偏向锁: 持有一个线程的id标识,判断如果是这个线程就让他进去,如果不是该线程就升级为自旋锁、自旋锁占用CPU,一直在那里循环等待处理,如果多次循环(10次)没有结果就升级为重量级锁

    什么时候用自旋锁(占CPU)、什么时候用重锁(不占CPU)?

    1、执行时间长的用重锁,因为重锁是不占用CPU的,因为第一个进去以后,其他的在等待队列中,等待CPU的唤醒。
    2、自旋锁: 1.执行时间短 2.线程数比较少
    3、重锁=OS锁 :1.执行时间长  2.线程数比较多

    说明

    锁只能升级、不能降级

    Volatile

    1.保证线程可见性: 缓存一致性协议
    2.禁止指令重新排序: 双重判断单例模式必须加Volatile
    3.不能保证原子性:  线程A 将0改1,线程B和C都看到了,然后修改结果变成了2,不是3

    synchronized优化

    1.锁的代码越少越好 --锁的细化
    2.一个方法内有好多的锁, 还不如在该方法上加锁 -- 锁的粗化
    3.锁定某对象obj,如果obj的属性发送改变,不影响锁的使用,但是如果obj变成另外一个对象,则锁定的对象发生改变,应该便面将锁定对象的引用变成另外的对象  -- 加final

    CAS(CompareAndSet)

    1.无锁优化、既: 自旋
    2.Atomic.....开头的都是CAS
    3.AtomicInteger  incrementAndGet()(相当于count++)
    4.原理: 原值、期望值、新值   如果原值 等于 期望值 设置新值,否则就重新执行一次
    5.靠CPU原语支持,中间不能被打断
    6.ABA问题:如何解决:加版本号,Atomic类有自带版本号的。 如果是个int、long不影响CAS结果,如果是引用就有问题。

    LongAdder

    1.分段锁 分段的CAS
    2.- 性能: LongAdder > CAS > synchronized

    ReentrantLock

    1.可重入锁、可以代替synchronized
    2.lock.lock() lock.unlock() 必须在finally内进行解锁
    3.  a.synchronized搞不定会阻塞, b.ReentrantLock可以决定自己是否阻塞: tryLock(time)
    4.lockInterruptibly() 可以打断阻塞的线程
    5.公平锁: new ReentrantLock(true) 线程来了不是先抢锁,而是先访问队列是否有等待的,如果有就跟着等,没有的话再去用锁
    6.lock.newCondition可以创建不同的等待队列

    CountDownLatch

    1. 用来进行线程阻塞
    2. CountDownLatch latch = new CountDownLatch(num)
    3. latch.countDown()   -1操作
    4. latch.await() 什么时候num减少为0 就往下执行

    CyclicBarrier: 循环栅栏

    1.CyclicBarrier barrier = new CyclicBarrier(20,()->sout"out")
    2.每满20,就处理一下

    Phaser

    阶段栅栏,每个阶段人都到齐了在进行下一阶段

    ReadWriteLock

    1. 读写锁 : 共享锁 + 排它锁
    2. 互斥锁 = 排它锁 = ReentrantLock()
    3. 读写锁 = ReentrantReadWriteLock() = readLock() + writeLock()
    4. 读锁 = 共享锁: 读取的线程可以同步读,但是不能写,读完了才可以写
    5. 写锁 = 互斥锁 = 排它锁: 只能写,不能读,写完了才可以读

    semaphore

    1. new Semaphone(2) 同时可以执行两个线程
    2. .acquire()  阻塞到这里,当满足 Semaphone(num)中num个数,就变为0然后放行
    3. .relaese() 将0变回为num
    4. 限制流量。类似车道和收费站
    5. new Semaphone(2, true)公平锁  里面有队列

    Exchange

    1. 线程交换
    2. 线程1和线程2内容交换,只能2个线程交换

    LockSupport

    1.   .park()阻塞
    2.   .unpark(线程)放行
    3.   unpark可以先于park执行
    4.   unpark可以直接指定线程执行

    面试题

     

    一、synichronized和ReentrantLock的不同?

    synichronized
    a.系统自动加锁、自动解锁、不可以出现多个不同的等待队列、没骗人进行四种锁状态的升级
    ReentranLock
    b.需要手动加锁、手动解锁、可以出现多个不同的等待队列(newCondition),CAS实现

    二、实现一个容器,提供两个方法add、size、写两个线程:线程1,添加10个元素到容器中。 线程2,实施监控元素个数,当个数到5个时,线程2给出提示并结束

    方式一:
    package com.xiaoke.netty.thread;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class T04_NotifyFreeLock {
    
        //添加volatile,使t2能够得到通知
        volatile List lists = new ArrayList<>();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
    
        public static void main(String[] args) {
            T04_NotifyFreeLock c = new T04_NotifyFreeLock();
    
            final Object lock = new Object();
    
            new Thread(() -> {
                synchronized (lock) {
                    System.out.println("t2启动");
                    if (c.size() != 5) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("t2 结束");
                    //通知t1继续执行
                    lock.notify();
                }
    
            }, "t2").start();
    
            new Thread(() -> {
                System.out.println("t1启动");
                synchronized (lock) {
                    for (int i = 0; i < 10; i++) {
                        c.add(new Object());
                        System.out.println("add " + i);
    
                        if (c.size() == 5) {
                            lock.notify();
                            //释放锁,让t2得以执行
                            try {
                                lock.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }, "t1").start();
        }
    
    }
    
    
    方式二:
        // 添加volatile,使t2能够得到通知
        volatile List lists = new ArrayList<>();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
    
        static Thread t1 = null, t2 = null;
    
        public static void main(String[] args) {
            T07_LockSupport_WithoutSleep c = new T07_LockSupport_WithoutSleep();
    
            t1 = new Thread(() -> {
                System.out.println("t1启动");
                for (int i = 0; i < 10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);
                    if (c.size() == 5) {
                        LockSupport.unpark(t2);
                        LockSupport.park();
                    }
                }
            }, "t1");
    
            t2 = new Thread(() -> {
                LockSupport.park();
                System.out.println("t2 结束");
                LockSupport.unpark(t1);
            }, "t2");
            t2.start();
            t1.start();
        }
    
    
    方式三:
    package com.xiaoke.netty.thread;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Semaphore;
    
    public class T08_Semaphore {
    
        // 添加volatile,使t2能够得到通知
        volatile List lists = new ArrayList<>();
    
        public void add(Object o) {
            lists.add(o);
        }
    
        public int size() {
            return lists.size();
        }
    
        static Thread t1 = null, t2 = null;
    
        public static void main(String[] args) {
            T08_Semaphore c = new T08_Semaphore();
            Semaphore s = new Semaphore(1);
    
            t1 = new Thread(() -> {
                try {
                    s.acquire();
                    for (int i = 0; i < 5; i++) {
                        c.add(new Object());
                        System.out.println("add " + i);
                    }
                    s.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                try {
                    t2.start();
                    t2.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                try {
                    s.acquire();
                    for (int i = 5; i < 10; i++) {
                        System.out.println(i);
                    }
                    s.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }, "t1");
    
            t2 = new Thread(() -> {
                try {
                    s.acquire();
                    System.out.println("t2 结束");
                    s.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "t2");
            t1.start();
        }
    
    }

    三、写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支撑2个生产者线程以及10个消费者的阻塞调用。

    方式一:
    
    
    public class MyContainer1<T> {
    
        final private LinkedList<T> lists = new LinkedList<>();
        final private int MAX = 10; //最多10个元素
        private int count = 0;
    
    
        public synchronized void put(T t) {
            while (lists.size() == MAX) { //想想为什么用while而不是用if?
                try {
                    this.wait(); //effective java
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            lists.add(t);
            ++count;
            this.notifyAll(); //通知消费者线程进行消费
        }
    
        public synchronized T get() {
            T t = null;
            while (lists.size() == 0) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            t = lists.removeFirst();
            count--;
            this.notifyAll(); //通知生产者进行生产
            return t;
        }
    
        public static void main(String[] args) {
            MyContainer1<String> c = new MyContainer1<>();
            //启动消费者线程
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 5; j++) System.out.println(c.get());
                }, "c" + i).start();
            }
    
            //启动生产者线程
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 25; j++) c.put(Thread.currentThread().getName() + " " + j);
                }, "p" + i).start();
            }
        }
    }
    
    
    
    
    方式二:
    
    public class MyContainer2<T> {
    
        final private LinkedList<T> lists = new LinkedList<>();
        final private int MAX = 10; //最多10个元素
        private int count = 0;
    
        private Lock lock = new ReentrantLock();
        private Condition producer = lock.newCondition();
        private Condition consumer = lock.newCondition();
    
        public void put(T t) {
            try {
                lock.lock();
                while (lists.size() == MAX) { //想想为什么用while而不是用if?
                    producer.await();
                }
    
                lists.add(t);
                ++count;
                consumer.signalAll(); //通知消费者线程进行消费
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public T get() {
            T t = null;
            try {
                lock.lock();
                while (lists.size() == 0) {
                    consumer.await();
                }
                t = lists.removeFirst();
                count--;
                producer.signalAll(); //通知生产者进行生产
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            return t;
        }
    
        public static void main(String[] args) {
            MyContainer2<String> c = new MyContainer2<>();
            //启动消费者线程
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 5; j++) System.out.println(c.get());
                }, "c" + i).start();
            }
    
            //启动生产者线程
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 25; j++) c.put(Thread.currentThread().getName() + " " + j);
                }, "p" + i).start();
            }
        }
    
    }

    ReentranLock源码

    lock.lock() -> sync.lock() -> acquie(1) -> AQS(AbstractQueuedSynchronizer) -> CAS改变状态 + state(1加锁 0无锁) + 监控这个状态的双向链表

    为什么AQS效率高?
    多线程操作需要加锁,那就要看锁的粒度,AQS利用CAS只锁定了双向链表的head和tail

    为什么线程是隔离的?为什么当前线程获取不到其他线程的信息

        public void set(T value) {
            Thread t = Thread.currentThread();
            ThreadLocalMap map = getMap(t);
            if (map != null)
                map.set(this, value);
            else
                createMap(t, value);
        }
    
    
        public T get() {
            Thread t = Thread.currentThread();
            ThreadLocalMap map = getMap(t);
            if (map != null) {
                ThreadLocalMap.Entry e = map.getEntry(this);
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    T result = (T)e.value;
                    return result;
                }
            }
            return setInitialValue();
        }
    
    
    因为ThreadLocal是一个map,key是当前线程,所以获取不到其他线程

    java的四种引用

    1.强引用: new一个对象就是强引用,只有设置对象为空 gc才会回收
    2.软引用:  当内存不够的情况下,会gc
    3.弱引用: 只要是弱引用就会回收
    
    WeakReference<M> m = new WeakReference<>(new M());
    System.gc(); // 会被回收
    
    当有强引用指向弱引用的时候,当强引用消失,这个弱引用也应该消失。一般应用在容器中
    
    ThreadLocal中有应用
    a. ThreadLocal<M> t = new ThreadLocal //强引用
    b. t.set(new M())  key: Thread.currentThread  value: new M()
     key -> 指向 Entry -> WeakReference  key是弱引用
      假设key为强引用,当t =null的时候,但是当前线程key是强引用还指向了 new ThreadLocal,所以 当前对象ThreadLocal不会被回收,所以当服务器运行时间越长就会产生内存泄漏问题。当key为弱引用,t=null时候,强引用消失,弱引用也消失,ThreadLocal被回收,
        内存泄漏: 指的是内存漏掉了一块,永远不会被回收
        OOM: 是内存不够用,内存溢出了
    c. 当强引用消失,弱引用也会被回收,但是当key为null的时候,会导致整个value无法被访问到,value的值无法被回收,仍然存在内存泄漏问题。 解决办法, t.remove().
    d.只要使用完ThreadLocal,就必须t.remove(),防止内存泄漏
    
    
    4.虚引用
    一般写netty用,自己的内存存放在堆外内存(JVM以外的),堆外内存不归jvm管理,不能被gc,可以将使用虚引用,将回收的信息放到Queue中,然后清理堆外内存。
    
    怎么使用堆外内存(分配内存、回收) java使用类Unsafe

    容器

    容器 Collection List

    1.CopyOnWriteList

    2.Vector

    3.ArrayList

    4.LinkedList

     
    Set

    1.HashSet -> LinkedHashSet

    2.SortedSet ->TreeSet

    3.CopyOnWriteArraySet

    4.ConcurrentSkipListSet

     
    Queue

    1.BlockingQueue -----------> 

    2.ConcurrentLinkedQueue

    1.ArrayBlockingQueue

    2.PriorityBlockingQueue

    3.LinkedBlockingQueue

    4.TransferQueue

    5.SynchronousQueue

    Map Map

    1.HashMap -> LinkedHashMap

    2.TreeMap

    3.ConcurrentHashMap

    4.ConcurrentSkipListMap

     

    1.Vector、Collections.synchronizedList()、CopyOnWriteArrayList

    1.Vector所有的方法都被synchronized加锁
    2.CopyOnWriteArrayList加的是ReentrantLock锁,写的时候加锁,读的时候不加锁,因为读的时候copy了一个数据,其实数据还是原来的数据 没有改变

    2.HashTable、ConcurrentHashMap、ConcurrentSkipListMap

    HashTable: 里面的方法都被 Synchronized修饰
    ConcurrentHashMap:插入的时候加锁、查询的时候没加锁、插入的时候加的synchronized锁定代码块,所以性能要以实际压测结果为准。
    ConcurrentSkipListMap:高并发且排序,跳表实现快速查询和插入

    3.TreeMap、LinkedHashMap

    LinkedHashMap:按照插入的顺序排序,hashMap的key是在内存中不连续的,遍历的时候不连续,增加了链表有了指针,可以快速遍历。
    
    TreeMap: 是按照值排序

    ConcurrentLinkedQueue

    add 添加, 超过界限报错
    
    offer 相当于add,超过界限不报错
    
    peek  取第一个值
    
    poll  区第一个值并remove

    BlockingQueue

    Queue和List区别 主要是Queue对线程添加了很多友好的API  如:put add offer offer(加时间) poll
    add添加满了会报错
    put添加满了会阻塞

    1
    .LinkedBlockingQueue
    -> a. 最大Integer.MAX(很可能装到内存溢出) b. put 如果满了我会阻塞住 生产者 c. take如果没了我就会阻塞住 消费者
    2.ArrayBlockingQueue
    -> 可以设置线程数量
    3.DelayQueue
    -> 根据时间进行排序 compareTo 可以自行设定规则, 按实际顺序进行排序, 应用:按时间进行任务调度
    4.SynchronousQueue
    -> add数据报错,不能放数据,容量是0
    -> 用来线程之间传递内容,
     -> 第一个线程先:take() 第二个线程在:put()
    -> 线程之间的任务调度
    5.TransferQueue
    -> SynchronousQueue只能取走一个,TransferQueue可以做成列表传递多个
    -> .transfer("a") 装入数据阻塞,等待被取走
     -> .take() 阻塞,等待放入数据(1.可以通过put装数据 2.可以通过另外的线程transfer放入)
    -> put可以随便装,transfer只能装入一个

    面试题

    两个线程交叉打印数字和字母:
    
    
    方式一:
    static Thread t1 = null, t2 = null;
        public static void main(String[] args) throws Exception {
            char[] aI = "123456789".toCharArray();
            char[] aC = "ABCDEFGhI".toCharArray();
    
            t1 = new Thread(() -> {
    
                for (char c : aI) {
                    System.out.print(c);
                    LockSupport.unpark(t2); //叫醒T2
                    LockSupport.park(); //T1阻塞
                }
    
            }, "t1");
    
            t2 = new Thread(() -> {
    
                for (char c : aC) {
                    LockSupport.park(); //t2阻塞
                    System.out.print(c);
                    LockSupport.unpark(t1); //叫醒t1
                }
    
            }, "t2");
    
            t1.start();
            t2.start();
        }
    
    
    方式二:
    
     enum ReadyToRun {T1, T2}
        static volatile ReadyToRun r = ReadyToRun.T1; //思考为什么必须volatile
        public static void main(String[] args) {
            char[] aI = "1234567".toCharArray();
            char[] aC = "ABCDEFG".toCharArray();
    
            new Thread(() -> {
    
                for (char c : aI) {
                    while (r != ReadyToRun.T1) {
                    }
                    System.out.print(c);
                    r = ReadyToRun.T2;
                }
    
            }, "t1").start();
    
            new Thread(() -> {
    
                for (char c : aC) {
                    while (r != ReadyToRun.T2) {
                    }
                    System.out.print(c);
                    r = ReadyToRun.T1;
                }
            }, "t2").start();
        }
    
    
    方式三:
    
    static BlockingQueue<String> q1 = new ArrayBlockingQueue<>(1);
        static BlockingQueue<String> q2 = new ArrayBlockingQueue(1);
        public static void main(String[] args) throws Exception {
            char[] aI = "1234567".toCharArray();
            char[] aC = "ABCDEFG".toCharArray();
    
            new Thread(() -> {
    
                for (char c : aI) {
                    System.out.print(c);
                    try {
                        q1.put("ok");
                        q2.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            }, "t1").start();
    
            new Thread(() -> {
    
                for (char c : aC) {
                    try {
                        q1.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.print(c);
                    try {
                        q2.put("ok");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            }, "t2").start();
        }
    
    
    
    方式四:
    
    public static void main(String[] args) {
            final Object o = new Object();
            char[] aI = "1234567".toCharArray();
            char[] aC = "ABCDEFG".toCharArray();
            new Thread(() -> {
                synchronized (o) {
                    for (char c : aI) {
                        System.out.print(c);
                        try {
                            o.notify();
                            o.wait(); //让出锁
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    o.notify(); //必须,否则无法停止程序
                }
    
            }, "t1").start();
    
            new Thread(() -> {
                synchronized (o) {
                    for (char c : aC) {
                        System.out.print(c);
                        try {
                            o.notify();
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    o.notify();
                }
            }, "t2").start();
        }
    
    
    
    方式五:
    
      private static volatile boolean t2Started = false;
        public static void main(String[] args) {
            final Object o = new Object();
            char[] aI = "1234567".toCharArray();
            char[] aC = "ABCDEFG".toCharArray();
    
            new Thread(() -> {
                //latch.await();
    
                synchronized (o) {
    
                    while (!t2Started) {
                        try {
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    //
    
                    for (char c : aI) {
                        System.out.print(c);
                        try {
                            o.notify();
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    o.notify();
                }
            }, "t1").start();
    
            new Thread(() -> {
    
                synchronized (o) {
                    for (char c : aC) {
                        System.out.print(c);
                        //latch.countDown()
                        t2Started = true;
                        try {
                            o.notify();
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    o.notify();
                }
            }, "t2").start();
        }
    
    
    
    方式六:
    
    public static void main(String[] args) {
            char[] aI = "1234567".toCharArray();
            char[] aC = "ABCDEFG".toCharArray();
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            new Thread(() -> {
                try {
                    lock.lock();
    
                    for (char c : aI) {
                        System.out.print(c);
                        condition.signal();
                        condition.await();
                    }
    
                    condition.signal();
    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
    
            }, "t1").start();
    
            new Thread(() -> {
                try {
                    lock.lock();
    
                    for (char c : aC) {
                        System.out.print(c);
                        condition.signal();
                        condition.await();
                    }
    
                    condition.signal();
    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "t2").start();
        }
    
    
    
    方式七:
    
    
    public static void main(String[] args) {
            char[] aI = "1234567".toCharArray();
            char[] aC = "ABCDEFG".toCharArray();
            Lock lock = new ReentrantLock();
            Condition conditionT1 = lock.newCondition();
            Condition conditionT2 = lock.newCondition();
    
            new Thread(() -> {
                try {
                    lock.lock();
    
                    for (char c : aI) {
                        System.out.print(c);
                        conditionT2.signal();
                        conditionT1.await();
                    }
    
                    conditionT2.signal();
    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
    
            }, "t1").start();
    
            new Thread(() -> {
                try {
                    lock.lock();
    
                    for (char c : aC) {
                        System.out.print(c);
                        conditionT1.signal();
                        conditionT2.await();
                    }
    
                    conditionT1.signal();
    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
    
            }, "t2").start();
        }
    
    
    
    方式八:
    
     public static void main(String[] args) {
            char[] aI = "1234567".toCharArray();
            char[] aC = "ABCDEFG".toCharArray();
            TransferQueue<Character> queue = new LinkedTransferQueue<Character>();
            new Thread(() -> {
                try {
                    for (char c : aI) {
                        System.out.print(queue.take());
                        queue.transfer(c);
                    }
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "t1").start();
    
            new Thread(() -> {
                try {
                    for (char c : aC) {
                        queue.transfer(c);
                        System.out.print(queue.take());
                    }
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "t2").start();
        }

    线程池

    1. ThreadPoolExecutor extend ExecutorService extend Executor
    2. Executor -> void execute(Runnable command); 执行线程
    3. ExecutorService = 执行线程 + 生命周期
    4. 

    Callable和Runable区别

    两者功能一样:定义个线程的任务
    Runnable: void run(); 无返回值
    Callable<V>: V call() throws Exception; 有返回值
    FutureTask
    FutureTask imp RunnableFuture  extends Runnable, Future  // 接口可以多继承
    -> 执行线程 + 存储结果
    -> CompletableFuture 管理多个Future的结果
    
    

    CompletableFuture同时拉取数据

        private static double priceOfTM() {
            delay(2000L);
            return 1.00;
        }
    
        private static double priceOfTB() {
            delay(3000L);
            return 2.00;
        }
    
        private static double priceOfJD() {
            delay(4000L);
            return 3.00;
        }
    
        // 模仿访问外部接口的时间
        private static void delay(Long time) {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("After %s sleep!
    ", time);
        }
    
        static long start, end;
    
        public static void main(String[] args) throws Exception {
            start = System.currentTimeMillis();
    
            CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(() -> priceOfTM());
            CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(() -> priceOfTB());
            CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(() -> priceOfJD());
    
            CompletableFuture.allOf(futureTM, futureTB, futureJD).join();
            end = System.currentTimeMillis();
            System.out.println("use completable future! " + (end - start));
    
        }


    结果:use completable future! 4040

    // 可以处理返回值
    CompletableFuture.supplyAsync(()->priceOfTM())
    .thenApply(String::valueOf)
    .thenApply(str-> "price " + str)
    .thenAccept(System.out::println);
    结果: price 1.0

    线程池种类

    1.普通线程池 ThreadPoolExecutor
    2. ForkJoinPool 分叉合并结果

    定义线程池

    ThreadPoolExecutor构造参数:
    参数一:corePoolSize核心线程数,永远存在 参数二:maximumPoolSize最大线程数,核心线程不够用,就进行扩展 参数三:keepAliveTime生存时间 线程有很长时间没干活就把它归还给操作系统 参数四:TimeUnit.Seconds 生存时间的单位 参数五:任务队列 BlockingQueue 参数六:ThreadFactory 线程工厂 一般自定义,这里是生产线程的,要给线程起名字,防止出错定位问题方便 参数七:RejectStrategy 拒绝策略,指的是线程池忙,任务队列爆满情况下,后面来的线程怎么进行处理 a: Abort抛异常 b: Discard 扔掉 不抛异常 c: DiscardOldest 扔掉排队时间最久的 d: CallerRuns 调用者处理服务 注意: 一般拒绝策略不用这四种,需要自己实现,如果是重要内容,比如订单,那么需要保存到kafka、数据库等等。 代码:
    static class Task implements Runnable { private int i; public Task(int i) { this.i = i; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Task " + i); } @Override public String toString() { return "Task{" + "i=" + i + '}'; } } public static void main(String[] args) { ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 8; i++) { tpe.execute(new Task(i)); } tpe.shutdown(); } //现象: 线程不用启动了 直接去线程池拿 结果: pool-1-thread-1 Task 0 pool-1-thread-1 Task 2 pool-1-thread-4 Task 7 pool-1-thread-4 Task 4 pool-1-thread-3 Task 6 pool-1-thread-2 Task 1 pool-1-thread-4 Task 5 pool-1-thread-1 Task 3
    SingleThreadPool
    优点:顺序执行,线程池就一个线程,其余的任务丢队列中
    ExecutorService service = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { final int j = i; service.execute(() -> { System.out.println(j + " " + Thread.currentThread().getName()); }); } 结果: 0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1 源码: new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(), defaultHandler)
    核心线程1,最大线程1,生存时间和单位也就没意义,
    LinkedBlockingQueue队列
    Executors.defaultThreadFactory() 默认工厂,默认生产线程组和线程名称,当有问题时,难以定位
    defaultHandler 默认拒绝策略:Abort,比较重要的信息会丢失

    CachePool

    优点: 来一个任务就启动一个线程
    缺点: 线程过多,CPU的资源全部浪费在切换线程上面了

        ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0; i < 2; i++) { service.execute(() -> { System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); 结果: java.util.concurrent.ThreadPoolExecutor@2503dbd3[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] java.util.concurrent.ThreadPoolExecutor@2503dbd3[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0] pool-1-thread-1 pool-1-thread-2



    源码:
    new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>()
                      Executors.defaultThreadFactory(), defaultHandler)
    1.没有核心线程
    2.最大线程数量: Integer.MAX_VALUE
    3.SynchronousQueue 队列,来一个处理一个

    FixedThreadPool

            long start = System.currentTimeMillis();
            getPrime(1, 200000);
            long end = System.currentTimeMillis();
            System.out.println(end - start);
    
            final int cpuCoreNum = 4;
    
            ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
    
            MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
            MyTask t2 = new MyTask(80001, 130000);
            MyTask t3 = new MyTask(130001, 170000);
            MyTask t4 = new MyTask(170001, 200000);
    
            Future<List<Integer>> f1 = service.submit(t1);
            Future<List<Integer>> f2 = service.submit(t2);
            Future<List<Integer>> f3 = service.submit(t3);
            Future<List<Integer>> f4 = service.submit(t4);
    
            start = System.currentTimeMillis();
            f1.get();
            f2.get();
            f3.get();
            f4.get();
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    
        static class MyTask implements Callable<List<Integer>> {
            int startPos, endPos;
    
            MyTask(int s, int e) {
                this.startPos = s;
                this.endPos = e;
            }
    
            @Override
            public List<Integer> call() throws Exception {
                List<Integer> r = getPrime(startPos, endPos);
                return r;
            }
    
        }
    
        static boolean isPrime(int num) {
            for (int i = 2; i <= num / 2; i++) {
                if (num % i == 0) return false;
            }
            return true;
        }
    
        static List<Integer> getPrime(int start, int end) {
            List<Integer> results = new ArrayList<>();
            for (int i = start; i <= end; i++) {
                if (isPrime(i)) results.add(i);
            }
    
            return results;
        }
    
    
    
    结果: 1.说明这是个并行线程池  2.有固定的线程池
    1766
    560
    
    源码:
    new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>()
                                          Executors.defaultThreadFactory(), defaultHandler) 
    1.核心线程数和最大线程数一样,且自定义
    
                  
    

      

    阿里开发手册禁用:LinkedBlockingQueue, 因为这个是有上限的最大Integer.MAX 

    ScheduledPool

            ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
            service.scheduleAtFixedRate(()->{
                System.out.println(Thread.currentThread().getName());
            }, 0, 2000, TimeUnit.MILLISECONDS); // 0指的是第一个任务往后面推多久执行,2000 指的是2s循环一次
    
    
    结果: 每隔2秒钟打印一次
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-1

    源码:
    (corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    new DelayedWorkQueue(),Executors.defaultThreadFactory(), defaultHandler)
    1.DelayedWorkQueue  可以根据时间排序的队列,在这里根据时间一直循环(定时)
    
    

    面试题:

    闹钟服务,订阅人数10亿,每天早上7点响,每天早上7点有10亿的并发,请问怎么优化?
    
    1.分而治之,主服务器将数据分发到不同的服务器上
    2.使用定时线程:SecheduledPool 或者 定时器 + CachePool/FixedThreadPool

    自定义拒绝策略

        public static void main(String[] args) {
            ExecutorService service = new ThreadPoolExecutor(4, 4,
                    0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
                    Executors.defaultThreadFactory(),
                    new MyHandler());
        }
    
        static class MyHandler implements RejectedExecutionHandler {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                //log("r rejected")
                //save r kafka mysql redis
                //try 3 times
                if (executor.getQueue().size() < 10000) {
                    //try put again();
                }
            }
        }

    ForkJoinPool

        static int[] nums = new int[1000000];
        static final int MAX_NUM = 50000;
        static Random r = new Random();
    
        static {
            for (int i = 0; i < nums.length; i++) {
                nums[i] = r.nextInt(100);
            }
    
            System.out.println("---" + Arrays.stream(nums).sum()); //stream api
        }
    
    
        static class AddTask extends RecursiveAction {
    
            int start, end;
    
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected void compute() {
    
                if (end - start <= MAX_NUM) {
                    long sum = 0L;
                    for (int i = start; i < end; i++) sum += nums[i];
                    System.out.println("from:" + start + " to:" + end + " = " + sum);
                } else {
    
                    int middle = start + (end - start) / 2;
    
                    AddTask subTask1 = new AddTask(start, middle);
                    AddTask subTask2 = new AddTask(middle, end);
                    subTask1.fork();
                    subTask2.fork();
                }
    
    
            }
    
        }
    
    
        static class AddTaskRet extends RecursiveTask<Long> {
    
            private static final long serialVersionUID = 1L;
            int start, end;
    
            AddTaskRet(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected Long compute() {
    
                if (end - start <= MAX_NUM) {
                    long sum = 0L;
                    for (int i = start; i < end; i++) sum += nums[i];
                    return sum;
                }
    
                int middle = start + (end - start) / 2;
    
                AddTaskRet subTask1 = new AddTaskRet(start, middle);
                AddTaskRet subTask2 = new AddTaskRet(middle, end);
                subTask1.fork();
                subTask2.fork();
    
                return subTask1.join() + subTask2.join();
            }
    
        }
    
        public static void main(String[] args) throws IOException {
            /*ForkJoinPool fjp = new ForkJoinPool();
            AddTask task = new AddTask(0, nums.length);
            fjp.execute(task);*/
    
            T12_ForkJoinPool temp = new T12_ForkJoinPool();
    
            ForkJoinPool fjp = new ForkJoinPool();
            AddTaskRet task = new AddTaskRet(0, nums.length);
            fjp.execute(task);
            long result = task.join();
            System.out.println(result);
    
            //System.in.read();
    
        }

    结果: 将一个任务拆分成多个任务,可以不断地递归分,计算的结果在合并成一个

    ---49488749
    49488749

    待理解

     parallelStream 底层也是ForkJoinPool
    
       public static void main(String[] args) {
            List<Integer> nums = new ArrayList<>();
            Random r = new Random();
            for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));
    
            //System.out.println(nums);
    
            long start = System.currentTimeMillis();
            nums.forEach(v -> isPrime(v));
            long end = System.currentTimeMillis();
            System.out.println(end - start);
    
            //ʹparallel stream api
    
            start = System.currentTimeMillis();
            nums.parallelStream().forEach(T13_ParallelStreamAPI::isPrime);
            end = System.currentTimeMillis();
    
            System.out.println(end - start);
        }
    
        static boolean isPrime(int num) {
            for (int i = 2; i <= num / 2; i++) {
                if (num % i == 0) return false;
            }
            return true;
        }
    
    结果:
    1112
    182

    JMH

    1.POM:
            <dependency>
                <groupId>org.openjdk.jmh</groupId>
                <artifactId>jmh-core</artifactId>
                <version>1.21</version>
            </dependency>
    
            <dependency>
                <groupId>org.openjdk.jmh</groupId>
                <artifactId>jmh-generator-annprocess</artifactId>
                <version>1.21</version>
                <scope>test</scope>
            </dependency>
    
    2.安装JMH插件
    3.修改系统环境目录
    
    
    测试类:
    public class PSTest {
    
        @Benchmark
        @Warmup(iterations = 1, time = 3)
        @Fork(5)
        @BenchmarkMode(Mode.Throughput)
        @Measurement(iterations = 1, time = 3)
        public void testForeach(){
            PS.foreach();
        }
    }

    ops:operation per second 每秒操作次数

    Disruptor:最快的单机mq

    无锁(大量CAS操作)、高并发、环形Buffer,直接覆盖旧的数据,降低GC频率,用于生产者消费者。消费者就是通过观察者模式进行消费。
    
    对比
    1.ConcurrentArrayQueue 需要维护头和尾、需要两头加锁,Disruptor是环形的,只需要维护位置就行
    2.固定长度,里面是一个数组,首尾连接了起来

    什么是进程、线程、钎程

    1.双击QQ,会先去磁盘中找,然后加载进内存,开辟一块空间           --进程
    
    2.在双击QQ,会先去磁盘中找,然后加载进内存,在开辟一块空间   -- 进程
    
    3.当需要计算的时候,内存将需要计算的数据丢给CPU,这就是一个线程
    
    
    
    
    
    什么是程序: QQ.exe
    
    什么是进程:会有资源分配,资源分配的基本单位
    
    什么是线程:是进程执行的基本单位,多个线程共享进程资源
    
    线程切换:线程1正在计算,中间需要拉取其他数据等待,线程2来了,着急计算,会让出位置,线程1放进CPU的缓存中,线程从1切换到2

    单核CPU设定多线程是否有意义?

    有意义。比如我的第一个线程在计算中需要网络IO请求数据、或者sleep中,那么就让出位置,让第二个线程计算,这样就可以充分利用CPU资源。

    工作线程数是不是越大越好?

    不是,线程太多,全部用到线程切换上了

    工作线程数(线程池中线程数量)设置多少合适?

    1.根据压测结果,初始设定一个值和CPU的核数有关
    2.根据公式:N(threads) = N(CPU ) * Ucpu * (1 +W/C)
           Ucpu:CPU的利用率  W/C等待时间和计算时间的比率
    3.根据性能分析工具profiler等

    只有syc都是blocked状态,其他锁是wait状态

     

    如何优雅的打断一个线程

    使用打断 lock.lockInterruptibly   interrupt不能打断正在抢锁的线程

    比如上传大文件,中间点击取消,怎么优雅的结束

    1.自然结束
    2.stop() 
    3.interrupt

    缓存行64bytes

    缓存一致性协议: 两个CPU之间缓存行的数据被修改,需要通知另外一个重新去拿数据

  • 相关阅读:
    restful风格 webapi 发送put delete请求 405 错误
    mysql 连接数据库间接性连接异常
    .net core 发布到centos The configuration file 'appsettings.json' was not found and is not optional. 找不到文件
    .net list<int>、int[]参数类型 前端调用传参方法
    long? long 可空类型转换
    EF 多对多循环引用序列化失败 解决办法
    HTML5学习之HTML标记类型
    电脑高手常用的5个按钮!(太有用了!留下了!)
    final关键字用法总结
    Java程序员面试失败的5大原因 //转自:极客网
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/15121344.html
Copyright © 2011-2022 走看看