zoukankan      html  css  js  c++  java
  • Java高并发

    Java并发程序基础
    1. 线程的状态:new, runnable, blocked, waiting, timed_waiting, terminated。
    public enum State {
        NEW,
        RUNNABLE,
        BLOCKED,
        WAITING,
        TIMED_WAITING,
        TERMINATED;
    }
    1. Thread.run()和Thread.start()的区别是run不会开启新线程,在当前线程中,执行run()中的代码。 
    2. 新建线程:可以使用重载实现run()方法或者使用public Thread(Runnable runnable)来定义线程;
    3. 终止线程:不能直接使用stop(),因为在结束线程时,会直接终止线程,并会立即释放这个线程所持有的锁。而这些锁恰恰是用来维持对象的一致性。可以使用变量的方式实现:
    public class DemoThread extends Thread {
        volatile boolean stop = false; //退出线程标记
     
        public void stopThread() {
            stop = true;
        }
     
        @Override
        public void run() {
            while (true) {
                if (stop) {
                    break; //退出线程
                }
     
                synchronized (this) {
                    //执行逻辑
                }
     
                Thread.yield(); //谦让,线程退出CPU
            }
        }
    }
    1. 线程中断interrupt
      线程中断并不是立即退出,而是给线程发送一个通知,告知目标线程,希望退出。至于目标线程接收到通知后如何处理,则完全由目标线程自行决定。API如下:
    public void Thread.interrupt() //中断线程
    public bool Thread.isInterrupted() //判断是否中断
    public void Thread.interrupted() //判断是否被中断,并清楚当前中断状态
      在循环体中添加中断判断并添加中断的处理,如果在循环体中出现类似wait和sleep的操作,则只能通过中断来识别。因为sleep和wait会抛出InterruptedException中断异常。InterruptedException不是运行时异常,程序必须捕获并处理它,当线程处于sleep、状态时,如果被中断,就会触发该异常。
    public static native void sleep(long millis) throws InterruptedException;
    public class InterruptAndStopThread {
        public static void main(String args[]) throws InterruptedException {
            Thread thread = new Thread() {
                @Override
                public void run() {
                    while (true) {
                        if (Thread.currentThread().isInterrupted()) {
                            break;
                        }
     
                        try {
                            Thread.sleep(600);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            Thread.currentThread().interrupt(); //再次执行中断,置上中断标记位。异常后会清除该标记,如果不添加处理,则在下一个循环时无法捕获该中断
                        }
     
                        System.out.println("Running!");
                        Thread.yield();
                    }
                }
            };
            thread.start();
            Thread.sleep(2000);
            thread.interrupt();//中断
        }
    }
    1. 等待wait和通知notify
      为了支持多线程之间的协作,JDK在Object类中提供了wait和notify接口,任何对象都可以调用该方法。调用必须包含在对应的synchronizied语句中,无论是wait或者notify都需要首先获取目标对象的一个监视器。wait通过synchronizied获取监视器。调用wait后自动释放监视器;notify通过synchronizied获取监视器,完毕后释放监视器。
    public final void wait() throws InterruptedException
    public final native void notify();
    public class SimpleWaitAndNotify {
        final static Object object = new Object();
     
        public static class Thread1 extends Thread {
            public void run() {
                synchronized (object) {
                    System.out.println(System.currentTimeMillis() + ":thread1 start !");
     
                    try {
                        System.out.println(System.currentTimeMillis() + ":thread1 wait for object !");
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
     
                    System.out.println(System.currentTimeMillis() + ":thread1 end!");
                }
            }
        }
     
        public static class Thread2 extends Thread {
            public void run() {
                synchronized (object) {
                    System.out.println(System.currentTimeMillis() + ":thread2 start ! notify one thread");
                    object.notify();
     
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
     
                    System.out.println(System.currentTimeMillis() + ":thread2 end!");
                }
            }
        }
     
        public static void main(String args[]) {
            Thread thread1 = new Thread1();
            Thread thread2 = new Thread2();
            thread1.start();
            thread2.start();
        }
    }
      得到的结果是:thread1 start ! -> thread1 wait for object ! -> thread2 start ! notify one thread -> thread2 end! -> thread1 end!
      wait和sleep的区别是:wait会自动释放锁,而sleep不会。
    1. suspend和resume
      JDK已经废弃的接口,suspend在导致线程暂停的同时,不会去释放任何锁资源。如果resume操作意外在suspend之前执行,那么被挂起的线程可能很难有机会继续执行。
    1. 等待线程结束join和谦让yield
      join:线程需要等待依赖线程执行完毕。join的本质是让调用线程wait在当前线程对象实例上,线程执行完毕后,被等待的线程会在退出前notifyAll通知所有的等待线程继续执行。
      yield:是个静态方法,一旦执行,会使当前线程让出CPU。
    public final void join() throws InterruptedException
    public final synchronized void join(long millis) throws InterruptedException
    public static native void yield()
    public class JoinMain {
        public volatile static int i = 0;
        public static class AddThread extends Thread {
            public void run() {
                System.out.println("add!");
                for (i = 0; i < 1000000; i++) ;
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String args[]) throws InterruptedException {
            AddThread at = new AddThread();
            at.start();
            at.join(); //主线程等待at线程执行完毕
            System.out.println(i);
        }
    }
    1. volatile
      为了确保变量被修改后,应用程序范围的所有线程都能够“看到”这个改动,虚拟机就必须采用特殊的手段,保证变量的可见性特点。volatile对保证操作的原子性有非常大的帮助,但是不能代替锁,无法保证一些复核操作的原子性。
      如果没有把握,可以直接用synchronizied。例如:
    public class ThreadVolatile extends Thread {
        public static volatile int n = 0;
        public void run() {
            for (int i = 0; i < 10; i++)
                try {
                    n = n + 1;
                    sleep(3); // 为了使运行结果更随机,延迟3毫秒
                } catch (Exception e) {
                }
        }
     
        public static void main(String[] args) throws Exception {
            Thread threads[] = new Thread[100];
            for (int i = 0; i < threads.length; i++)
                // 建立100个线程
                threads[i] = new ThreadVolatile();
            for (int i = 0; i < threads.length; i++)
                // 运行刚才建立的100个线程
                threads[i].start();
            for (int i = 0; i < threads.length; i++)
                // 100个线程都执行完后继续
                threads[i].join();
            System.out.println("n=" + ThreadVolatile.n);
        }
    }
      如果对n的操作是原子级别的,最后输出的结果应该为n=1000,而在执行上面代码时,很多时侯输出的n都小于1000,这说明n=n+1不是原子级别的操作。原因是声明为volatile的简单变量如果当前值由该变量以前的值相关,那么volatile关键字不起作用,也就是说如下的表达式都不是原子操作:
    n = n + 1;
    n++;
      如果要想使这种情况变成原子操作,需要使用synchronized关键字,如上的代码可以改成如下的形式:
    public class ThreadVolatile2 extends Thread {
        public static volatile int n = 0;
        public static synchronized void inc() {
            n++;
        }
     
        public void run() {
            for (int i = 0; i < 10; i++)
                try {
                    inc(); // n = n + 1 改成了 inc();
                    sleep(3); // 为了使运行结果更随机,延迟3毫秒
                } catch (Exception e) {
                }
        }
     
        public static void main(String[] args) throws Exception {
            Thread threads[] = new Thread[100];
            for (int i = 0; i < threads.length; i++)
                // 建立100个线程
                threads[i] = new ThreadVolatile2();
            for (int i = 0; i < threads.length; i++)
                // 运行刚才建立的100个线程
                threads[i].start();
            for (int i = 0; i < threads.length; i++)
                // 100个线程都执行完后继续
                threads[i].join();
            System.out.println("n=" + ThreadVolatile2.n);
        }
    }
    1. 线程组
    public class ThreadGroupName implements Runnable {
        @Override
        public void run() {
            String groupAndName = Thread.currentThread().getThreadGroup().getName() + "-" + Thread.currentThread().getName();
            while (true) {
                System.out.println("I am " + groupAndName);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
     
        public static void main(String args[]) {
            ThreadGroup tg = new ThreadGroup("PrintGroup");
            Thread t1 = new Thread(tg, new ThreadGroupName(), "T1");
            Thread t2 = new Thread(tg, new ThreadGroupName(), "T2");
            t1.start();
            t2.start();
            System.out.println(tg.activeCount());
            Thread t3 = new Thread(tg, new ThreadGroupName(), "T3");
            t3.start();
            System.out.println(tg.activeCount());
            tg.list();
        }
    }
    1. 守护线程:setDaemon需要在start之前设置,否则抛异常。
    1. 线程优先级:setPriority
    /**
      * The minimum priority that a thread can have.
      */
    public final static int MIN_PRIORITY = 1;
     
    /**
      * The default priority that is assigned to a thread.
      */
    public final static int NORM_PRIORITY = 5;
     
    /**
      * The maximum priority that a thread can have.
      */
    public final static int MAX_PRIORITY = 10;
    1. 线程安全和synchronizied
      用法:
      (1)指定加锁对象:进入同步代码前要获得给定对象;
    synchronizied(instance) {}
      (2)直接作用于实例对象:相当于对当前实例加锁,进入同步代码前要获得当前实例的锁;
      注意:因为加锁于实例对象,所以创建线程时需要传入同一个对象。
     
    public class DemoThread extends Thread {
        private static DemoThread demoThread = new DemoThread();
        private  static int i = 0;
        
        private synchronized void increase() {
            i++;
        }
     
        @Override
        public void run() {
            for (int j = 0; j < 1000; j++) {
                increase();
            }
        }
        
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(demoThread);
            Thread t2 = new Thread(demoThread);
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println(i);
        }
    }
      (3)直接作用于静态方法:相当于当前类加锁,进入同步代码前要获得当前类的锁;
    public class DemoThread extends Thread {
        private static int i = 0;
     
        private static synchronized void increase() {
            i++;
        }
     
        @Override
        public void run() {
            for (int j = 0; j < 1000; j++) {
                increase();
            }
        }
     
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(new DemoThread());
            Thread t2 = new Thread(new DemoThread());
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println(i);
        }
    }
    1. 并发下的ArrayList和HashMap
    public class ArrayListMultiThread {
        static ArrayList<Integer> arrayList = new ArrayList<Integer>(10);
        public static class AddThread implements Runnable {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    arrayList.add(i);
                }
            }
        }
     
        public static void main(String args[]) throws InterruptedException {
            Thread thread1 = new Thread(new AddThread());
            Thread thread2 = new Thread(new AddThread());
            thread1.start();
            thread2.start();
            thread1.join();
            thread2.join();
            System.out.println(arrayList.size());
        }
    }
      出现的结果可能有:
    • 正常;
    • ArrayList在扩容时,内部一致性被破坏,由于没有锁对象,另一个线程访问到了不一致的内部状态,导致越界问题;
    • 出现隐蔽错误,ArrayList的大小小于20000。
      最简单的方式是:使用线程安全的Vector代替ArrayList。
    public class HashMapMultiThread {
        static Map<String, String> map = new HashMap<String, String>();
        public static class AddThread implements Runnable {
            int start = 0;
            public AddThread(int start) {
                this.start = start;
            }
            @Override
            public void run() {
                for (int i = start; i < 100000; i += 2) {
                    map.put(Integer.toString(i), Integer.toBinaryString(i));
                }
            }
        }
     
        public static void main(String args[]) throws InterruptedException {
            Thread thread1 = new Thread(new HashMapMultiThread.AddThread(0));
            Thread thread2 = new Thread(new HashMapMultiThread.AddThread(1));
            thread1.start();
            thread2.start();
            thread1.join();
            thread2.join();
            System.out.println(map.size());
        }
    }
      结果可能是:
    • 正常;
    • 正常退出,小于10000,
    • 程序永远不能结束,当两个线程正在遍历HashMap的内部数据,put方法类似于链表的遍历,由于多线程的冲突,导致链表结构遭到破坏,当链表成环时,就出现了死循坏。死循坏在JDK8中已经不存在,但是在多线程使用HashMap依然会导致内部数据不一致。
      最简单的方式是:使用ConcurrentHashMap代替HashMap。
     
     JDK并发包
    1. 重入锁ReentrantLock
      重入锁可以完全那替代synchronized关键字。提供了支持中断响应的接口,ReentrantLock的几个重要方法整理:
    • lock():获得锁,如果锁已经被占用,则等待;
    • lockInterruptibly:获得锁,但优先响应中断;
    • tryLock():尝试获得锁,如果成功则返回true,失败则返回false。不等待锁;
    • tryLock(long time, TimeUnit unit):在给定的时间尝试获得锁;
    • unlock():释放锁;
    • ReentrantLock(bool fair):公平锁;
    public class IntLock implements Runnable {
        public static ReentrantLock lock1 = new ReentrantLock();
        public static ReentrantLock lock2 = new ReentrantLock();
        int lock;
        public IntLock(int lock) {
            this.lock = lock;
        }
     
        @Override
        public void run() {
            try {
                if (lock == 1) {
                    lock1.lockInterruptibly();
                    Thread.sleep(500);
                    lock2.lockInterruptibly();
                } else {
                    lock2.lockInterruptibly();
                    Thread.sleep(500);
                    lock1.lockInterruptibly();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (lock1.isHeldByCurrentThread()) {
                    lock1.unlock();
                }
                if (lock2.isHeldByCurrentThread()) {
                    lock2.unlock();
                }
                System.out.println(Thread.currentThread().getId() + ":线程退出");
            }
        }
     
        public static void main(String args[]) throws InterruptedException {
            IntLock r1 = new IntLock(1);
            IntLock r2 = new IntLock(2);
            Thread thread1 = new Thread(r1);
            Thread thread2 = new Thread(r2);
            thread1.start();
            thread2.start();
            Thread.sleep(1000);
            thread2.interrupt();
        }
      上述代码分析:t1和t2启动之后,t1先占用lock1,再占用lock2;t2先占用lock2,再占用lock1,容易导致t1和t2处于相互等待状态,导致死锁。最后t2调用中断,最后t1能顺利执行,t2中断退出释放资源。
    public class TryLock implements Runnable {
        public static ReentrantLock lock1 = new ReentrantLock();
        public static ReentrantLock lock2 = new ReentrantLock();
        int lock;
        public TryLock(int lock) {
            this.lock = lock;
        }
        @Override
        public void run() {
            if (lock == 1) {
                while (true) {
                    if (lock1.tryLock()) {
                        try {
                            try {
                                Thread.sleep(500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            if (lock2.tryLock()) {
                                try {
                                    System.out.println(Thread.currentThread().getId() + ":My Job done;");
                                    return;
                                } finally {
                                    lock2.unlock();
                                }
                            }
                        } finally {
                            lock1.unlock();
                        }
                    }
                }
            } else {
                while (true) {
                    if (lock2.tryLock()) {
                        try {
                            try {
                                Thread.sleep(500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            if (lock1.tryLock()) {
                                try {
                                    System.out.println(Thread.currentThread().getId() + ":My Job done;");
                                    return;
                                } finally {
                                    lock1.unlock();
                                }
                            }
                        } finally {
                            lock2.unlock();
                        }
                    }
                }
            }
        }
     
        public static void main(String args[]) {
            TryLock r1 = new TryLock(1);
            TryLock r2 = new TryLock(2);
            Thread thread1 = new Thread(r1);
            Thread thread2 = new Thread(r2);
            thread1.start();
            thread2.start();
        }
      上述代码分析:使用tryLock,由于线程不会傻傻等待,而是不断尝试,只要执行时间足够长,线程总是会得到所需要的资源,从而正常执行。
    1. 重入锁的好搭档:Condition条件
    • await:是当前的线程等待,同时释放当前锁,当其他线程使用signal或者signalAll方法时,线程会重新获得锁并继续执行,或者被线程中断时,也会跳出等待,和Object.wait()相似;
    • awaitUninterruptibly:和await相似,但是不会在等待过程中响应中断;
    • signal:用于唤醒一个等待中的线程,和Object.notify()类似;
    • signalAll:唤醒所有等待中的线程;
    public class ReenterLockCondition implements Runnable {
        public static ReentrantLock lock = new ReentrantLock();
        public static Condition condition = lock.newCondition();
     
        @Override
        public void run() {
            try {
                lock.lock();
                condition.await();
                System.out.println("Thread is going on");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
     
        public static void main(String args[]) throws InterruptedException {
            ReenterLockCondition reenterLockCondition = new ReenterLockCondition();
            Thread thread1 = new Thread(reenterLockCondition);
            thread1.start();
            Thread.sleep(2000);
            lock.lock();
            condition.signal();
            lock.unlock();
        }
    }
      代码分析:await调用后,线程释放锁,等待sign释放锁后,线程获得锁并执行剩下的代码。
    1. 信号量Semaphore
      信号量是对锁的扩展,无论是内部锁synchronized还是重入锁ReentrantLock,一次只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问一个资源。
    public Semaphore(int permits):
    public Semaphore(int permits):
    public void acquire(int permits):
    public void acquireUninterruptibly(int permits):
    public boolean tryAcquire():
    public boolean tryAcquire(long timeout, TimeUnit unit):
    public void release():
    public class SemapDemo implements Runnable {
        final Semaphore semp = new Semaphore(5);
     
        @Override
        public void run() {
            try {
                semp.acquire();
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getId() + ":done!");
                semp.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
     
        public static void main(String args[]) {
            ExecutorService executorService = Executors.newFixedThreadPool(20);
            final SemapDemo demo = new SemapDemo();
            for (int i = 0; i < 20; i++) {
                executorService.submit(demo);
            }
        }
    }
    1. ReadWriteLock读写锁
      ReadWriteLock是JDK5中提供的读写分离锁,读写分离锁有效地减少了锁竞争,以提升系统的性能。
    public class ReadWriteLockDemo {
        private static Lock lock = new ReentrantLock();
        private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        private static Lock readLock = reentrantReadWriteLock.readLock();
        private static Lock writeLock = reentrantReadWriteLock.writeLock();
        private int value;
     
        public Object handleRead(Lock lock) throws InterruptedException {
            try {
                lock.lock();
                Thread.sleep(1000);
                return value;
            } finally {
                lock.unlock();
            }
        }
     
        public void handleWrite(Lock lock, int index) throws InterruptedException {
            try {
                lock.lock();
                Thread.sleep(1000);
                value = index;
            } finally {
                lock.unlock();
            }
        }
        public static void main(String args[]) {
            final ReadWriteLockDemo demo = new ReadWriteLockDemo();
            Runnable readRunnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        demo.handleRead(readLock);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
     
            Runnable writeRunnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        demo.handleWrite(writeLock, new Random().nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
     
            for (int i = 0; i < 18; i++) {
                new Thread(readRunnable).start();
            }
     
            for (int i = 18; i < 20; i++) {
                new Thread(writeRunnable).start();
            }
        }
    }
      代码分析:读线程为并行执行,写线程为串行执行。读读不堵塞,读写和写写堵塞。
    1. 倒计时器:CountDownLatch
      用来控制线程等待,它可以让某个线程等待直到倒计时结束,再开始执行。
    public class CountDownLatchDemo implements Runnable {
        static final CountDownLatch end = new CountDownLatch(10); //构造等待的线程数
        static final CountDownLatchDemo demo = new CountDownLatchDemo();
     
        @Override
        public void run() {
            try {
                Thread.sleep(new Random().nextInt(3) * 1000);
                System.out.println("check complete");
                end.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public static void main(String args[]) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
     
            for (int i = 0; i < 10; i++) {
                executorService.submit(demo);
            }
     
            //等待10个线程执行完毕,再继续执行
            end.await();
            System.out.println("Fire!");
            executorService.shutdown();
        }
    }
    1. 线程池
    public static ExecutorService newFixedThreadPool(int nThreads):固定线程数的线程池
    public static ExecutorService newSingleThreadExecutor():单线程线程池
    public static ExecutorService newCachedThreadPool():根据实际情况调整线程数量的线程池
    public static ScheduledExecutorService newSingleThreadScheduledExecutor():单线程任务线程池
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):固定线程数的任务线程池
    /**
    * Creates a new {@code ThreadPoolExecutor} with the given initial
    * parameters.
    *
    * @param corePoolSize the number of threads to keep in the pool, even
    *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
    * @param maximumPoolSize the maximum number of threads to allow in the
    *        pool
    * @param keepAliveTime when the number of threads is greater than
    *        the core, this is the maximum time that excess idle threads
    *        will wait for new tasks before terminating.
    * @param unit the time unit for the {@code keepAliveTime} argument
    * @param workQueue the queue to use for holding tasks before they are
    *        executed.  This queue will hold only the {@code Runnable}
    *        tasks submitted by the {@code execute} method.
    * @param threadFactory the factory to use when the executor
    *        creates a new thread
    * @param handler the handler to use when execution is blocked
    *        because the thread bounds and queue capacities are reached
    * @throws IllegalArgumentException if one of the following holds:
     
    *         {@code corePoolSize < 0}
     
    *         {@code keepAliveTime < 0}
     
    *         {@code maximumPoolSize <= 0}
     
    *         {@code maximumPoolSize < corePoolSize}
    * @throws NullPointerException if {@code workQueue}
    *         or {@code threadFactory} or {@code handler} is null
    */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    • corePoolSize:线程数;
    • maximumPoolSize:最大线程数;
    • keepAliveTime:当线程数量大于corePoolSize时,多余的空闲线程的存活时间。
    • unit:keepAliveTime的单位时间;
    • workQueue:任务列表,被提交但尚未被执行的任务;
    • threadFactory:线程工厂,用于创建线程;
    • handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。
      根据任务队列的功能,ThreadPoolExecutor的构造函数中可以使用以下的几种BlockingQueue:
    (1)直接提交队列:SynchronousQueue,需要设置很大的maximumPoolSize值;
    (2)有界任务队列:ArrayBlockingQueue,线程数量为0~maximumPoolSize,当线程数量大于corePoolSize时,任务存放到任务队列,当总线程数大于maximumPoolSize时,则执行拒绝策略;
    (3)无界任务队列:LinkedBlockingQueue,线程数量最大值为corePoolSize,maximumPoolSize没有实际意义,当总线程数大于corePoolSize时,被存储到任务队列中;
    (4)优先任务队列:PriorityBlockingQueue,可以根据任务自身的优先级顺序先后执行。
     
    1. 线程异常信息的封装
    public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
        public TraceThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
     
        public void execute(Runnable task) {
            super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
        }
     
        private Runnable wrap(final Runnable task, final Exception clientTrace, String name) {
            return () -> {
                try {
                    task.run();
                } catch (Exception e) {
                    clientTrace.printStackTrace();
                    throw e;
                }
            };
        }
     
        private Exception clientTrace() {
            return new Exception("Client stack trace");
        }
     
        public static void main(String[] args) {
            TraceThreadPoolExecutor traceThreadPoolExecutor = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<>());
     
            for (int i = 0; i< 5; i++) {
                traceThreadPoolExecutor.execute(new DivTask(100, i));
            }
        }
     
        public static class DivTask implements Runnable {
            int a, b;
     
            public DivTask(int a, int b) {
                this.a = a;
                this.b = b;
            }
     
            @Override
            public void run() {
                double re = a / b;
                System.out.println(re);
            }
        }
    }
    1. JDK并发容器
    • ConcurrentHashMap:高效并发的HashMap;
    • CopyOnWriteArrayList:和ArrayList是一族的,在读多写少的场合,List性能非常好,远远好于Vector;
    • ConcurrentLinkedQueue:高效并发队列,使用链表实现,线程安全的LinkedList;
    • BlockingQueue:堵塞队列,适合作为数据共享的通道;
    • ConcurrentSkipListMap:跳表的实现,有序Map,使用跳表的数据结构进行快速查找。
     
    锁优化
    1. 提高锁的性能
    • 减小锁持有时间:减少某个锁的占用时间,以减少线程间互斥的功能,提高系统的吞吐量;
    • 减小锁粒度:减小锁粒度是削弱多线程锁竞争的有效手段,ConcurrentHashMap添加一个新的表项,首先根据hashcode得到该表项应该存放到哪个段中,然后对该段加锁,并完成put操作。默认有16段。
    • 读写分离锁来替换独占锁:通过读写锁ReadWriteLock提高系统性能。
    • 锁分离:读写锁根据读写操作功能上的不同,进行有效的分离,使用类似的分离思想,从而对独占锁进行分离。例如LinkedBlockingQueue,take()和put()的获取和存储操作,由于是链表,两个操作分别作用于队列的前端和尾端。
    • 锁粗化:将一连串连续地对同一个锁不断进行请求和释放的操作时,便会把所有的锁操作整合成对锁的一次请求,减少对锁的请求同步次数。
    1. 无锁CAS
     
    并发模式与算法
    1. 单例模式 
    public class StaticSingleton {
        private StaticSingleton() {
            System.out.println("StaticSingle is create");
        }
     
        private static class SingletonHolder {
            private static StaticSingleton instance = new StaticSingleton();
        }
     
        public static StaticSingleton getInstance() {
            return SingletonHolder.instance;
        }
    }
    2. 不变模式 
    不变模式的使用场景需要满足条件:
    • 当对象创建之后,其内部状态和数据不再发生改变;
    • 对象需要被共享,被多线程频繁访问;
    实现不变模式的方式如下:
    • 去除setter方法以及所有修改自身属性的方法;
    • 将所有属性设置为私有,并用final标记,确保其不可修改;
    • 确保没有子类可以重载修改它的行为;
    • 有一个可以创建完整对象的构造函数;
    public final class Product {
        private final String no;
        private final String name;
        private final String price;
     
        public Product(String no, String name, String price) {
            super();
            this.no = no;
            this.name = name;
            this.price = price;
        }
     
        public String getNo() {
            return no;
        }
     
        public String getName() {
            return name;
        }
     
        public String getPrice() {
            return price;
        }
    }
    • 生产者-消费者模式
      使用BufferingQueue实现。
     
    • 无锁的缓存架构Disruptor
    public class LongEvent {
        private long value;
        public void set(long value) {
            this.value = value;
        }
     
        public long getValue() {
            return value;
        }
    }
     
    public class LongEventFactory implements EventFactory<LongEvent> {
        public LongEvent newInstance() {
            return new LongEvent();
        }
    }
     
    public class LongEventHandler implements EventHandler<LongEvent> {
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
            System.out.println("Event: " + event.getValue());
        }
    }
     
    public class LongEventProducer {
        private final RingBuffer<LongEvent> ringBuffer;
     
        public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
     
        /* 3.0之前的方法
        public void onData(ByteBuffer bb) {
            long sequence = ringBuffer.next();
            try {
                LongEvent event = ringBuffer.get(sequence);
                event.set(bb.getLong(0));
            } finally {
                ringBuffer.publish(sequence);
            }
        }
        */
     
        private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR_ONE_ARG = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
            @Override
            public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
                longEvent.set(byteBuffer.getLong(0));
            }
        };
     
        public void onData(ByteBuffer byteBuffer) {
            ringBuffer.publishEvent(TRANSLATOR_ONE_ARG, byteBuffer);
        }
    }
     
    public class Main {
        public static void main(String[] args) throws Exception {
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(),
                    1024, //RingBuffer的大小
                    Executors.newCachedThreadPool(), //消费者线程池
                    ProducerType.MULTI, //支持多事件发布者
                    new BlockingWaitStrategy());
     
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            disruptor.handleEventsWith(
                    new LongEventHandler(),
                    new LongEventHandler(),
                    new LongEventHandler(),
                    new LongEventHandler()
            );
            
            disruptor.start();
     
            LongEventProducer producer = new LongEventProducer(ringBuffer);
     
            ByteBuffer bb = ByteBuffer.allocate(8);
            for (long l = 0; true; l++) {
                bb.putLong(0, l);
                producer.onData(bb);
                Thread.sleep(1000);
            }
        }
    }
  • 相关阅读:
    哈夫曼树及哈夫曼编码
    01背包问题
    Java IO
    Java对象的复制三种方式
    TCP三次握手和四次挥手
    轻量级Java Web框架的实现原理
    Java并发
    消息队列
    赋值、浅拷贝、深拷贝
    Python文件操作(txtxlsxcsv)及os操作
  • 原文地址:https://www.cnblogs.com/liguochun/p/8433262.html
Copyright © 2011-2022 走看看