zoukankan      html  css  js  c++  java
  • Java多线程与并发库高级应用 学习笔记 10-16课

    Callable与Future的介绍

    package Thread;
    
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class CallableAndFuture {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newSingleThreadExecutor();
            Future<String> future = threadPool.submit(new Callable<String>() {
    
                @Override
                public String call() throws Exception {
                    Thread.sleep(2000);
                    return "hi~";
                }
            });
            System.out.println("wait for result");
            try {
                System.out.println("result:" + future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
            CompletionService<String> completionService = new ExecutorCompletionService<String>(
                    threadPool2);
    
            //2333 传说中的睡眠排序。
            for (int i = 0; i < 10; i++) {
                final int seq = i;
                completionService.submit(new Callable<String>() {
    
                    @Override
                    public String call() throws Exception {
                        int wait = new Random().nextInt(5000);
                        Thread.sleep(wait);
                        return seq + "等待时间:" + wait;
                    }
                });
            }
            
            //异步提交结果
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println(completionService.take().get());
                } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                }
            }
        }
    }

    Java并发编程:Lock

    package Thread;
    
    import java.util.Random;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    //这例子有点渣
    public class ReadWriteLockTest {
    
        public static void main(String[] args) {
            final Queue3 queue3 = new Queue3();
            for (int i = 0; i < 3; i++) {
                new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        while (true) {
                            queue3.get();
                        }
                    }
                }).start();
    
                new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        while (true) {
                            queue3.put(new Random().nextInt(10000));
                        }
                    }
                }).start();
            }
        }
    }
    
    class Queue3 {
        private Object data = null;
        ReadWriteLock rwl = new ReentrantReadWriteLock();
    
        public void get() {
            rwl.readLock().lock();
    
            try {
                System.out.println(Thread.currentThread().getName()
                        + " be ready to read data!");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(Thread.currentThread().getName()
                        + "have read data :" + data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                rwl.readLock().unlock();
            }
        }
    
        public void put(Object data) {
    
            rwl.writeLock().lock();
            try {
                System.out.println(Thread.currentThread().getName()
                        + " be ready to write data!");
                Thread.sleep((long) (Math.random() * 1000));
                this.data = data;
                System.out.println(Thread.currentThread().getName()
                        + " have write data: " + data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                rwl.writeLock().unlock();
            }
    
        }
    }

      

    Java线程(九):Condition-线程通信更高效的方式

    package Thread;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConditionCommunication {
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            
            final Business business = new Business();
            new Thread(
                    new Runnable() {
                        
                        @Override
                        public void run() {
                        
                            for(int i=1;i<=50;i++){
                                business.sub(i);
                            }
                            
                        }
                    }
            ).start();
            
            for(int i=1;i<=50;i++){
                business.main(i);
            }
            
        }
    
        static class Business {
                Lock lock = new ReentrantLock();
                Condition condition = lock.newCondition();
              private boolean bShouldSub = true;
              public /* synchronized*/ void sub(int i){
                  lock.lock();
                  try{
                      while(!bShouldSub){
                          try {
                              //this.wait();
                            condition.await();
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                      }
                        for(int j=1;j<=10;j++){
                            System.out.println("sub thread sequence of " + j + ",loop of " + i);
                        }
                      bShouldSub = false;
                      //this.notify();
                      condition.signal();
                  }finally{
                      lock.unlock();
                  }
              }
              
              public  void main(int i){
                  lock.lock();
                  try{
                     while(bShouldSub){
                              try {
                                condition.await();
                            } catch (Exception e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                          }
                        for(int j=1;j<=100;j++){
                            System.out.println("main thread sequence of " + j + ",loop of " + i);
                        }
                        bShouldSub = true;
                        condition.signal();
              }finally{
                  lock.unlock();
              }
          }
        
        }
    }
    package Thread;
    
    import java.util.Queue;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BoundedBuffer {
    
        final Lock lock = new ReentrantLock();
        final Condition write = lock.newCondition();
        final Condition read = lock.newCondition();
        final static int capacity = 100;
        final static Queue<Integer> queue = new ArrayBlockingQueue<Integer>(
                capacity);
    
        public void put(int x) {
            lock.lock();
            try {
                while (queue.size() == capacity) {// 写数据缓存满了
                    write.await();
                }
                queue.add(x);
                read.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public int get() {
            lock.lock();
            int x = 0;
            try {
                while (queue.isEmpty()) {
                    read.await();
                }
                x = queue.remove();
                read.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.lock();
                return x;
            }
        }
    /*
     * 假设缓存队列中已经存满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程,
     * 那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程还是写线程了,
     * 如果唤醒的是读线程,皆大欢喜,如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了很多时间。
     */
    }

    Java多线程-新特征-信号量Semaphore

    package Thread;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    /**
     * 信号量
     * 
     */
    public class SemaphoreTest {
        public static void main(String[] args) {
            // 线程池
            ExecutorService exec = Executors.newCachedThreadPool();
            // 只能5个线程同时访问
            final Semaphore semp = new Semaphore(5);
            // 模拟20个客户端访问
            for (int index = 0; index < 20; index++) {
                final int NO = index;
                Runnable run = new Runnable() {
                    public void run() {
                        try {
                            // 获取许可
                            semp.acquire();
                            System.out.println("Accessing: " + NO);
                            Thread.sleep((long) (Math.random() * 10000));
                            // 访问完后,释放
                            semp.release();
                            // availablePermits()指的是当前信号灯库中有多少个可以被使用
                            System.out.println("-----------------"
                                    + semp.availablePermits());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                exec.execute(run);
            }
            // 退出线程池
            exec.shutdown();
        }
    }

    CyclicBarrier介绍

    package Thread;
    
    import java.util.Random;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    /*
     * 等待子线程全部完成后,执行主线程任务
     */
    public class CyclicBarrierTest {
    
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
    
                @Override
                public void run() {
                    System.out.println("main thread start");
                }
            });
            
            for (int i = 0; i < 3; i++) {
                new ThreadTest(cyclicBarrier).start();
            }
        }
    }
    
    class ThreadTest extends Thread {
        private CyclicBarrier cyclicBarrier;
    
        public ThreadTest(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(new Random().nextInt(500));
                System.out.println(Thread.currentThread().getName()
                        + " sub thread have done!");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    
    }

    CountDownLatch浅析

    作用和Cyclicbarrier没有什么本质区别。

  • 相关阅读:
    SQL性能优化:如何定位网络性能问题
    ORACLE 10升级到10.2.0.5 Patch Set遇到的内核参数检测失败问题
    Linux 僵尸进程查杀
    Linux 虚拟机网络适配器从E1000改为VMXNET3
    v$session中server为none与shared值解析
    SQL SERVER导出特殊格式的平面文件
    XtraBackup出现 Can't connect to local MySQL server through socket '/tmp/mysql.sock'
    CentOS 6.6安装Xtrabackup RPM提示缺少libev.so.4()
    SQL Server Replication 中关于视图的点滴
    ORA-00988: missing or invalid password(s)
  • 原文地址:https://www.cnblogs.com/linkarl/p/4817383.html
Copyright © 2011-2022 走看看