zoukankan      html  css  js  c++  java
  • Java 多线程高并发编程 笔记(二)

    1. 单例模式(在内存之中永远只有一个对象)

    1.1 多线程安全单例模式——不使用同步锁

    1 public class Singleton {
    2     private static Singleton sin=new Singleton();    ///直接初始化一个实例对象
    3     private Singleton(){    ///private类型的构造函数,保证其他类对象不能直接new一个该对象的实例
    4     }
    5     public static Singleton getSin(){    ///该类唯一的一个public方法    
    6         return sin;
    7     }
    8 }

      上述代码中的一个缺点是该类加载的时候就会直接new 一个静态对象出来,当系统中这样的类较多时,会使得启动速度变慢 。现在流行的设计都是讲“延迟加载”,我们可以在第一次使用的时候才初始化第一个该类对象。所以这种适合在小系统。

    1.2 多线程安全单例模式——使用同步方法

     1 public class Singleton {  
     2      private static Singleton instance;  
     3      private Singleton (){
     4          
     5      }   
     6      public static synchronized Singleton getInstance(){    //对获取实例的方法进行同步
     7        if (instance == null)     
     8          instance = new Singleton(); 
     9        return instance;
    10      }
    11  }

      上述代码中的一次锁住了一个方法, 这个粒度有点大 ,改进就是只锁住其中的new语句就OK。就是所谓的“双重锁”机制。

    1.3 多线程安全单例模式——使用双重同步锁

     1 public class Singleton {  
     2      private static Singleton instance;  
     3      private Singleton (){
     4      }   
     5      public static Singleton getInstance(){    //对获取实例的方法进行同步
     6        if (instance == null){
     7            synchronized(Singleton.class){
     8                if (instance == null)
     9                    instance = new Singleton(); 
    10            }
    11        }
    12        return instance;
    13      }
    14      
    15  }

    1.4 多线程安全单例模式——使用内部类的单例模式

      既不用加锁,也能实现懒加载

     1 public class Singleton {
     2     private Singleton(){
     3         System.out.println("single");
     4     }
     5     private static class Inner{
     6         private static Singleton s = new Singleton();
     7     }
     8     //无论有多少次,有多少个线程在调用getsingle的时候拿到的都是同一个对象
     9     private static Singleton getSingle(){
    10         return Inner.s;
    11     }
    12     public static void main(String[] agrs){
    13         Thread[] ths = new Thread[200];
    14         for(int i=0; i<ths.length;i++){
    15             ths[i]=new Thread(()->{
    16                 Singleton.getSingle();
    17             });
    18         }
    19         Arrays.asList(ths).forEach(o->o.start());
    20     }
    21 }

    2. 高并发——容器

    2.1 有N张火车票,每张票都有一个编号,同时有10个窗口对外售票,写一个模拟程序,分析可能会产生哪些问题?重复销售,超量销售;

     1 public class TicketSeller1 {
     2     static List<String> tickets = new ArrayList<>();
     3     //初始化,放票
     4     static{
     5         for(int i=0; i<10000; i++)
     6             tickets.add("票编号:"+i);
     7     }
     8 
     9     public static void main(String[] args){
    10         //启动10个线程不断往外卖票
    11         for(int i=0;i<10;i++){
    12             new Thread(()->{
    13                 while(tickets.size()>0){
    14                     System.out.println("销售了--"+tickets.remove(0));
    15                 }
    16             }).start();
    17         }
    18     }
    19 }

    改成下面的代码还有问题吗?

     1 public class TicketSeller2 {
     2     //vector本身就是一个同步容器,它所有的方法都是加锁的
     3     static Vector<String> tickets = new Vector<>();
     4     static{
     5         for(int i=0; i<10000; i++)
     6             tickets.add("票编号:"+i);
     7     }
     8     public static void main(String[] args){
     9         for(int i=0; i<10; i++){
    10             new Thread(()->{
    11                 while(tickets.size()>0){
    12                     /*
    13                     try{
    14                         TimeUnit.SECONDS.sleep(10);
    15                     }catch(InterruptedException e){
    16                         e.printStackTrace();
    17                     }*/
    18                     System.out.println("销售了--"+tickets.remove(0));
    19                 }
    20             }).start();
    21         }
    22     }
    23 }

    仍有问题,判断与操作分离了(虽然在vector中size和remove方法都是原子的)

    再改进:将判断和操作放到一个原子操作里面去

     1 public class TicketSeller3 {
     2     static List<String> tickets = new ArrayList<>();
     3     static{
     4         for(int i=0; i<10000; i++)
     5             tickets.add("票编号:"+i);
     6     }
     7 
     8     public static void main(String[] args){
     9         //启动10个线程不断往外卖票
    10         for(int i=0;i<10;i++){
    11             new Thread(()->{
    12                 while(true){
    13                     synchronized (tickets){
    14                         if(tickets.size()<=0) break;
    15                         try{
    16                             TimeUnit.SECONDS.sleep(10);
    17                         }catch(InterruptedException e){
    18                             e.printStackTrace();
    19                         }
    20 
    21                         System.out.println("销售了--"+tickets.remove(0));
    22                     }
    23                 }
    24             }).start();
    25         }
    26     }
    27 }

    加锁效率不高,尤其是每销售一张票都要把整个队列给锁定;

    引入并发容器

     1 public class TicketSeller4 {
     2     //并发容器
     3     static Queue<String> tickets = new ConcurrentLinkedQueue<>();
     4     static{
     5         for(int i=0; i<1000; i++){
     6             tickets.add("票编号:"+i);
     7         }
     8     }
     9     public static void main(String[] args){
    10         for(int i=0; i<10; i++){
    11             new Thread(()->{
    12                 while(true){
    13                     //poll从头往外拿一个数据,是同步的
    14                     String s = tickets.poll();
    15                     if(s==null) break;
    16                     else System.out.println("销售了--"+s);
    17                 }
    18             }).start();
    19         }
    20     }
    21 }
    if(s==null) break;虽然不是原子性的,但是我们判断以后没有对队列作修改操作,所以这里不会出错。

    2.2 并发容器————ConcurrentMap
    在多线程的情况下,什么样的容器效率比较高?
     1 public class T_ConcurrentMap {
     2     public static void main(String[] args){
     3         //Map<String, String> map = new ConcurrentHashMap<>();
     4         //Map<String, String> map = new ConcurrentSkipListMap<>();
     5         //HashTable 默认加锁,但是效率比较低
     6         Map<String, String> map = new Hashtable<>();
     7         //使用HashMap,自己往上加锁:Collection.synchronizedXXX
     8         //Map<String, String> map = new HashMap<>();
     9         //Map<String, String> map = new TreeMap<>();
    10 
    11         Random r = new Random();
    12         Thread[] ths = new Thread[100];
    13         CountDownLatch latch = new CountDownLatch(ths.length);
    14         long start = System.currentTimeMillis();
    15         for(int i=0; i<ths.length; i++){
    16             ths[i] = new Thread(()->{
    17                 for(int j=0; j<10000; j++)
    18                     map.put("a"+r.nextInt(100000),"a"+r.nextInt(100000));
    19                 latch.countDown();
    20             });
    21         }
    22         Arrays.asList(ths).forEach(o->o.start());
    23         try{
    24             latch.await();
    25         }catch (InterruptedException e){
    26             e.printStackTrace();
    27         }
    28 
    29         long end = System.currentTimeMillis();
    30         System.out.println(end-start);
    31     }
    32 }

    HashTable:669;

    ConcurrentHashMap:391;

    why? HashTable 往里加任何一个数据的时候都是要锁定整个对象,而HashMap,ConcurrentHashMap默认把容器分成16段,每次往容器里插数据只锁定16段里面的一段(把锁细化),两个线程往里插不同的段的数据,那么这两个线程就能并发的插入;

    ConcurrentSkipListMap:649  高并发并且排序

    往里插数据效率低一些,因为要排序,查数据方便很多;

    总结:

      1. 对于map/set的选择使用

        不加锁:hashmap;treemap;linkedhashmap;

        加锁:hashtable;Collection.sychronizedXXX(传一个不加锁map,返回一个加了锁的map),在并发性不是特别高的情况下可以使用上面两种;如果并发性比较高,用concurrenthashmap,如果还需要排序,就用concurrentskiplistmap;

    附:Collection.sychronizedXXX用法:

    1 public class T_SynchronizedList {
    2     public static void main(String[] args){
    3         List<String> strs = new ArrayList<>();
    4         List<String> strsSync = Collections.synchronizedList(strs);
    5     }
    6 }

     2.3 并发容器——CopyOnWrite 写时复制容器

    多线程环境下,写时效率低,读时效率高,适合写少读多的环境;

    比较容器效率 :

     1 public class T_CopyOnWrite {
     2     public static void main(String[] args){
     3         List<String> lists =
     4                 //new ArrayList<>();//这个会出并发问题
     5                 //new Vector<>();
     6                 new CopyOnWriteArrayList<>();
     7         Random r = new Random();
     8         Thread[] ths = new Thread[100];
     9         for(int i=0; i<ths.length; i++){
    10             Runnable task = new Runnable() {
    11                 @Override
    12                 public void run() {
    13                     for(int j=0;j<1000;j++)
    14                         lists.add("a"+r.nextInt(10000));
    15                 }
    16             };
    17             ths[i] = new Thread(task);
    18         }
    19         runAndComputeTime(ths);
    20         System.out.println(lists.size());
    21     }
    22     static void runAndComputeTime(Thread[] ths){
    23         long s1 = System.currentTimeMillis();
    24         Arrays.asList(ths).forEach(o->o.start());
    25         Arrays.asList(ths).forEach(o->{
    26             try{
    27                 o.join();
    28             }catch(InterruptedException e){
    29                 e.printStackTrace();
    30             }
    31         });
    32         long s2 = System.currentTimeMillis();
    33         System.out.println(s2-s1);
    34     }
    35 }
    CopyOnWriteArrayList:4853 100000
    Vector:114 100000
    ArrayList:97 86864(有错)

    2.4 并发容器——ConcurrentLinkedQueue

    常用方法:

     1 public class T_ConcurrentLinkedQueue {
     2     public static void main(String[] args){
     3         Queue<String> strs = new ConcurrentLinkedQueue<>();
     4         for(int i=0; i<10; i++){
     5             //类似于add,有boolean返回值
     6             strs.offer("a"+i);
     7         }
     8         System.out.println(strs);
     9         System.out.println(strs.size());
    10         System.out.println(strs.poll());//取值并删除
    11         System.out.println(strs.size());
    12         System.out.println(strs.peek());//只取值不删
    13         System.out.println(strs.size());
    14     }
    15 }

    2.5 并发容器——BlockingQueue

    在高并发的情况下可以使用两种队列:

    ConcurrentLinkedQueue:加锁式

    BlockingQueue:阻塞式

    LinkedBlockingQueue:

     1 public class T_LinkedBlockingQueue {
     2     static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
     3     static Random r = new Random();
     4     public static void main(String[] args){
     5         //生产者线程
     6         new Thread(()->{
     7             for(int i=0; i<100; i++){
     8                 try{
     9                     strs.put("a"+i);//使用put,如果满了,就会等待
    10                 }catch(InterruptedException e){
    11                     e.printStackTrace();
    12                 }
    13             }
    14         },"p1").start();
    15         //5个消费者线程
    16         for(int i=0; i<5; i++){
    17             new Thread(()->{
    18                 for(;;){
    19                     try{
    20                         System.out.println(Thread.currentThread().getName()+"task-"+strs.take());//take如果空了,就会等待
    21                     }catch(InterruptedException e){
    22                         e.printStackTrace();
    23                     }
    24                 }
    25             },"c"+i).start();
    26         }
    27     }
    28 }

    ArrayBlockingQueue:

     1 public class T_ArrayBlockingQueue {
     2     //有界队列
     3     static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
     4     static Random r = new Random();
     5     public static void main(String[] args) throws Exception{
     6         for(int i=0; i<10; i++){
     7             strs.put("a"+i);
     8         }
     9         //strs.put("aaa");//满了就会等待,程序阻塞
    10         //strs.add("aaa");//队列满了会报异常
    11         strs.offer("aaa");//队列满了不会报异常,也加不进去
    12         //strs.offer("aaa",1, TimeUnit.SECONDS);//隔一段时间之内加不进去就不往里面加了
    13         System.out.println(strs);
    14     }
    15 }

    DelayQueue:

     1 public class T_DelayQueue {
     2     //加入队列的元素只有等一定的时间之后才能被消费者拿走
     3     //默认按等待时间排序
     4     //DelayQueue 需要实现接口
     5     static BlockingQueue<MyTask> tasks = new DelayQueue<>();
     6     static Random r = new Random();
     7     static class MyTask implements Delayed{
     8         long runningTime;
     9         MyTask(long rt){
    10             this.runningTime = rt;
    11         }
    12         public int compareTo(Delayed o){
    13             if(this.getDelay(TimeUnit.MILLISECONDS)<o.getDelay(TimeUnit.MILLISECONDS))
    14                 return -1;
    15             else if(this.getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS))
    16                 return 1;
    17             else
    18                 return 0;
    19         }
    20 
    21         @Override
    22         public long getDelay(TimeUnit unit) {
    23             return unit.convert(runningTime-System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    24         }
    25 
    26         public String toString(){
    27             return ""+runningTime;
    28         }
    29     }
    30 
    31     public static void main(String[] agrs) throws InterruptedException{
    32         long now = System.currentTimeMillis();
    33         MyTask t1 = new MyTask(now+1000);
    34         MyTask t2 = new MyTask(now+2000);
    35         MyTask t3 = new MyTask(now+1500);
    36         MyTask t4 = new MyTask(now+2500);
    37         MyTask t5 = new MyTask(now+500);
    38 
    39         tasks.put(t1);
    40         tasks.put(t2);
    41         tasks.put(t3);
    42         tasks.put(t4);
    43         tasks.put(t5);
    44 
    45         System.out.println(tasks);
    46 
    47         for(int i=0; i<5; i++){
    48             System.out.println(tasks.take());
    49         }
    50     }
    51 }

    [1558935218208, 1558935218708, 1558935219208, 1558935220208, 1558935219708]
    1558935218208
    1558935218708
    1558935219208
    1558935219708
    1558935220208

    可以用来做定时执行任务

    TransferQueue:

     1 public class T_TransferQueue {
     2     public static void main(String[] agrs) throws InterruptedException{
     3         LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
     4         new Thread(()->{
     5             try{
     6                 System.out.println(strs.take());
     7             }catch(InterruptedException e){
     8                 e.printStackTrace();
     9             }
    10         }).start();
    11         //先启动几个消费者线程,生产者生产出一个产品的时候不往队列里加,
    12         //首先去找有没有消费者,有消费者直接给消费者消费,没有就阻塞
    13         //用在更高并发的情况下
    14         strs.transfer("aaa");
    15     }
    16 }

    SynchronusQueue:

     1 public class SynchronusQueue {//没有容量的队列
     2     public static void main(String[] agrs) throws InterruptedException{
     3         //同步队列是一种特殊的transferQueue
     4         //
     5         BlockingQueue<String> strs = new SynchronousQueue<>();
     6         new Thread(()->{
     7             try{
     8                 System.out.println(strs.take());
     9             }catch(InterruptedException e){
    10                 e.printStackTrace();
    11             }
    12         }).start();
    13 
    14         //strs.put("aaa");//不报错,阻塞等待消费者消费
    15         strs.add("aaa");//报错queue full
    16         System.out.println(strs.size);
    17     }
    18 }

    3. 线程池

    3.1 Executor

      执行器,这是一个接口,内部维护了一个方法execute负责执行一项任务,参数为Runnable,方法具体实现有我们执行,如下面的代码,既可以使用单纯的方法调用也可以新砌一个新的线程去执行Runnable的run方法;
     1 public class T_MyExecutor implements Executor {
     2     public static void main(String[] agrs){
     3         new T_MyExecutor().execute(()->System.out.println("hello executor"));
     4     }
     5 
     6     public void execute(Runnable commend){
     7         commend.run();
     8         //new Thread(commend).run();
     9     }
    10 }

    3.2 ExecutorService

       代表着启动一系列的线程为用户提供服务(本质上也是一个执行器),Java8官方文档就举了一个网络接受连接池的例子(代码如下)。

     1 class NetworkService implements Runnable {
     2    private final ServerSocket serverSocket;
     3    private final ExecutorService pool;
     4  
     5    public NetworkService(int port, int poolSize)
     6        throws IOException {
     7      serverSocket = new ServerSocket(port);
     8      pool = Executors.newFixedThreadPool(poolSize);
     9    }
    10  
    11    public void run() { // run the service
    12      try {
    13        for (;;) {
    14          pool.execute(new Handler(serverSocket.accept()));
    15        }
    16      } catch (IOException ex) {
    17        pool.shutdown();
    18      }
    19    }
    20  }
    21  
    22  class Handler implements Runnable {
    23    private final Socket socket;
    24    Handler(Socket socket) { this.socket = socket; }
    25    public void run() {
    26      // read and service request on socket
    27    }
    28  }

      在这里ExecutorService就代表着一个线程池对外提供接受网络请求的服务,同时它也是一系列线程池的接口,如,RorkJoinPool、ScheduledThreadPoolExecutor,、ThreadPoolExecutor等。同时,它可以提交Callable与Runnable的对象返回一个未来的执行结果对象Future。Callable是一个增强版的Runnable,它的call方法可以抛出异常可以有返回值,返回值放在Future对象中,我们可以使用Future对象的get方法来获得返回值。

      除了以上方法来创建一个ExecutorService还可以使用Executors这个工具类来创建它,在这里我们可以把Executors理解为就像utils,collections的工具类,是操作Executor的一个工具类

    2.3 ThreadPool 线程池

      

     1 public class T_ThreadPool {
     2     public static void main(String[] agrs) throws InterruptedException{
     3         //Executors有一些工厂方法,newFixedThreadPool创建一个个数为5的线程池
     4         //ExecutorService接口是可以往里面扔任务(execute,submit)的,
     5         ExecutorService service = Executors.newFixedThreadPool(5);
     6         for(int i=0; i<6;i++){
     7             //5个线程,6个任务
     8             service.execute(()->{
     9                 try{
    10                     TimeUnit.MILLISECONDS.sleep(500);
    11                 }catch(InterruptedException e){
    12                     e.printStackTrace();
    13                 }
    14                 System.out.println(Thread.currentThread().getName());
    15             });
    16         }
    17         System.out.println(service);
    18         //ExecutorService 常用方法
    19         service.shutdown();//所有任务执行完关闭
    20         System.out.println(service.isTerminated());//所有任务是否执行完了
    21         System.out.println(service.isShutdown());//关了并一定是执行完了,代表正在关闭的过程中
    22         System.out.println(service);
    23     }
    24 }

    执行结果:

    java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 5, active threads = 5, queued tasks = 1(排队的任务), completed tasks = 0]
    false
    true
    java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    pool-1-thread-1
    pool-1-thread-3
    pool-1-thread-2
    pool-1-thread-5
    pool-1-thread-4
    pool-1-thread-1

    2.4 Future

     1 public class T_Future {
     2     public static void main(String[] agrs) throws InterruptedException, ExecutionException{
     3         //RunnableTask 不产生任何返回值
     4         //new了个Callable对象并把它包装成FutureTask
     5         FutureTask<Integer> task = new FutureTask<>(()->{
     6             TimeUnit.MILLISECONDS.sleep(500);
     7             return 1000;
     8         });//相当于创建一个匿名类:new Callable(){Integer call();}}
     9         //启动一个线程
    10         new Thread(task).start();
    11         System.out.println(task.get());//阻塞,任务执行完了返回值
    12 
    13         ExecutorService service = Executors.newFixedThreadPool(5);
    14         Future<Integer> f = service.submit(()->{
    15             TimeUnit.MILLISECONDS.sleep(500);
    16             return 1;
    17         });
    18         System.out.println(f.get());
    19         System.out.println(f.isDone());//任务执行完没有啊
    20         System.out.println(f.get());
    21         System.out.println(f.isDone());
    22     }
    23 }

    2.5 线程池——newFixedThreadPool

    小程序:计算1-200000之间的质数

    比较一个线程和多个线程的效率:

     1 public class T_ParallelComputing {
     2     public static void main(String[] agrs) throws InterruptedException, ExecutionException {
     3         long start = System.currentTimeMillis();
     4         //计算1-200000之间所有的质数
     5         List<Integer> resulrs = getPrime(1,200000);
     6         //方法1:使用一个线程来计算
     7         long end = System.currentTimeMillis();
     8         System.out.println(end-start);
     9 
    10         //方法2:使用线程池
    11         final int cpuCoreNum = 4;
    12         ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
    13         //创建4个任务,继承callable接口(有返回值)
    14         MyTask t1 = new MyTask(1,80000);
    15         MyTask t2 = new MyTask(80000,130000);
    16         MyTask t3 = new MyTask(130000,170000);
    17         MyTask t4 = new MyTask(170000,200000);
    18         //将4个任务扔到线程池
    19         Future<List<Integer>> f1 = service.submit(t1);
    20         Future<List<Integer>> f2 = service.submit(t2);
    21         Future<List<Integer>> f3 = service.submit(t3);
    22         Future<List<Integer>> f4 = service.submit(t4);
    23 
    24         start = System.currentTimeMillis();
    25         f1.get();
    26         f2.get();
    27         f3.get();
    28         f4.get();
    29         end = System.currentTimeMillis();
    30         System.out.println(end-start);
    31     }
    32 
    33     static class MyTask implements Callable<List<Integer>> {
    34         int startPos,endPos;
    35 
    36         MyTask(int s,int e){
    37             this.startPos = s;
    38             this.endPos = e;
    39         }
    40         public List<Integer> call() throws Exception{
    41             List<Integer> r = getPrime(startPos, endPos);
    42             return r;
    43         }
    44     }
    45     static boolean isPrime(int num){
    46         for(int i=2;i<=num/2;i++){
    47             if(num%i==0) return false;
    48         }
    49         return true;
    50     }
    51     static List<Integer> getPrime(int start,int end){
    52         List<Integer> results = new ArrayList<>();
    53         for(int i=start;i<=end;i++){
    54             if(isPrime(i)) results.add(i);
    55         }
    56         return results;
    57     }
    58 }

    输出:

    2513
    786

    2.6 线程池——CacheThreadPool

      刚开始一个线程也没有,来一个任务起一个线程,如果来一个新的任务,线程池里刚好有一个线程空闲,直接让空闲线程执行任务,否则,起一个新线程;默认情况下,线程空闲超过60s自动销毁;

     1 public class T_CacheThreadPool {
     2     public static void main(String[] agrs) throws InterruptedException{
     3         ExecutorService service = Executors.newCachedThreadPool();
     4         System.out.println(service);
     5 
     6         for(int i=0;i<2;i++){
     7             service.execute(()->{
     8                 try{
     9                     TimeUnit.MILLISECONDS.sleep(500);
    10                 }catch(InterruptedException e){
    11                     e.printStackTrace();
    12                 }
    13                 System.out.println(Thread.currentThread().getName());
    14             });
    15         }
    16         System.out.println(service);
    17         TimeUnit.SECONDS.sleep(80);
    18         System.out.println(service);
    19     }
    20 }

    运行结果:

    java.util.concurrent.ThreadPoolExecutor@1540e19d[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    java.util.concurrent.ThreadPoolExecutor@1540e19d[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
    pool-1-thread-2
    pool-1-thread-1
    java.util.concurrent.ThreadPoolExecutor@1540e19d[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]

    2.7 线程池——SingleThreadPool

      线程池里就一个线程,保证任务顺序执行

     1 public class SingleThreadPool {
     2     public static void main(String[] agrs){
     3         ExecutorService service = Executors.newSingleThreadExecutor();
     4         for(int i=0;i<5;i++){
     5             final int j = i;
     6             service.execute(()->{
     7                 System.out.println(j+" "+Thread.currentThread().getName());
     8             });
     9         }
    10     }
    11 }

    输出:

    0 pool-1-thread-1
    1 pool-1-thread-1
    2 pool-1-thread-1
    3 pool-1-thread-1
    4 pool-1-thread-1

    2.8 线程池——SchedulePool

      与DelayQueue相对应,执行定时的任务,线程池里的线程可以复用

     1 public class SchedulePool {
     2     public static void main(String[] agrs){
     3         ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
     4         service.scheduleAtFixedRate(()->{
     5             try{
     6                 TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
     7             }catch(InterruptedException e){
     8                 e.printStackTrace();
     9             }
    10             System.out.println(Thread.currentThread().getName());
    11         },0,500, TimeUnit.MILLISECONDS);
    12     }
    13 }

    2.9 线程池——WorkStealingPool

      工作窃取:有一堆任务和一堆线程,每个线程都维护一个自己的任务队列,当一个线程执行完自己队列里的任务,会去别的线程队列中“偷”未执行的任务继续执行;

     1 public class WorkStealPool {
     2     public static void main(String[] agrs) throws IOException {
     3         ExecutorService service = Executors.newWorkStealingPool();
     4         //打印cup多少核
     5         System.out.println(Runtime.getRuntime().availableProcessors());
     6 
     7         service.execute(new R(1000));
     8         service.execute(new R(2000));
     9         service.execute(new R(2000));
    10         service.execute(new R(2000));
    11         service.execute(new R(2000));
    12         //由于产生的是守护线程,主线程不阻塞看不到输出
    13         System.in.read();
    14     }
    15     public static class R implements Runnable{
    16         int time;
    17         R(int t){
    18             this.time = t;
    19         }
    20         public void run(){
    21             try{
    22                 TimeUnit.MILLISECONDS.sleep(time);
    23             }catch (InterruptedException e){
    24                 e.printStackTrace();
    25             }
    26             System.out.println(time+" "+Thread.currentThread().getName());
    27         }
    28     }
    29 }

    2.10 线程池——ForkJoinPool

      如果有一项特别难以完成的大任务,可以把大任务切分成小的任务(Fork),最后合并小任务(Join);

     1 public class ForkJoinPool2 {
     2     static int[] nums = new int[1000000];
     3     static final int MAX_NUM = 50000;
     4     static Random r = new Random();
     5     //求和
     6     static {
     7         for(int i=0; i<nums.length;i++){
     8             nums[i]=r.nextInt(100);
     9         }
    10         System.out.println(Arrays.stream(nums).sum());
    11     }
    12 
    13     static class AddTask extends RecursiveAction{
    14         int start,end;
    15         AddTask(int s, int e){
    16             start = s;
    17             end = e;
    18         }
    19         protected void compute(){
    20             if(end-start<=MAX_NUM){
    21                 long sum = 0L;
    22                 for(int i=start;i<end;i++){
    23                     sum+=nums[i];
    24                 }
    25                 System.out.println("from:"+start+"to:"+end+"="+sum);
    26             }else{
    27                 int middle = start+(end-start)/2;
    28                 AddTask subTask1 = new AddTask(start,middle);
    29                 AddTask subTask2 = new AddTask(middle,end);
    30                 subTask1.fork();
    31                 subTask2.fork();
    32             }
    33         }
    34     }
    35     public static void main(String[] agrs) throws IOException{
    36         ForkJoinPool fjp = new ForkJoinPool();
    37         AddTask task = new AddTask(0,nums.length);
    38 
    39         fjp.execute(task);
    40         System.in.read();
    41     }
    42 }

    49508417
    from:593750to:625000=1543717
    from:468750to:500000=1548821
    from:843750to:875000=1548865
    from:968750to:1000000=1543900
    from:718750to:750000=1539386
    from:437500to:468750=1547593
    from:812500to:843750=1538471
    from:687500to:718750=1540626
    from:562500to:593750=1545775
    from:937500to:968750=1546144
    from:406250to:437500=1550529
    from:781250to:812500=1540313
    from:218750to:250000=1556540
    from:656250to:687500=1547825
    from:93750to:125000=1545292
    from:156250to:187500=1546528
    from:531250to:562500=1544129
    from:750000to:781250=1541918
    from:500000to:531250=1551838
    from:906250to:937500=1549404
    from:625000to:656250=1550692
    from:375000to:406250=1544616
    from:62500to:93750=1554626
    from:875000to:906250=1548556
    from:343750to:375000=1554709
    from:281250to:312500=1545881
    from:125000to:156250=1550330
    from:312500to:343750=1548005
    from:250000to:281250=1552814
    from:31250to:62500=1547229
    from:187500to:218750=1545491
    from:0to:31250=1547854

    没有计算总的sun,因为RecursiveAction是没有返回值的,改为继承RecursiveTask,如下

     1 public class ForkJoinPool2 {
     2     static int[] nums = new int[1000000];
     3     static final int MAX_NUM = 50000;
     4     static Random r = new Random();
     5     //求和
     6     static {
     7         for(int i=0; i<nums.length;i++){
     8             nums[i]=r.nextInt(100);
     9         }
    10         System.out.println(Arrays.stream(nums).sum());
    11     }
    12     static class AddTask extends RecursiveTask<Long> {
    13         int start,end;
    14         AddTask(int s, int e){
    15             start = s;
    16             end = e;
    17         }
    18         protected Long compute(){
    19             if(end-start<=MAX_NUM){
    20                 long sum = 0L;
    21                 for(int i=start;i<end;i++){
    22                     sum+=nums[i];
    23                 }
    24                 return sum;
    25             }
    26             int middle = start+(end-start)/2;
    27             AddTask subTask1 = new AddTask(start,middle);
    28             AddTask subTask2 = new AddTask(middle,end);
    29             subTask1.fork();
    30             subTask2.fork();
    31             return subTask1.join()+subTask2.join();
    32         }
    33     }
    34     public static void main(String[] agrs) throws IOException{
    35         ForkJoinPool fjp = new ForkJoinPool();
    36         AddTask task = new AddTask(0,nums.length);
    37         fjp.execute(task);
    38         long result = task.join();
    39         System.out.println(result);
    40         //System.in.read();
    41     }
    42 }

    2.11 线程池——ThreadPoolExecutor

      线程池的底层实现:

    发现它们都基于ThreadPoolExectorWorkStealingPool与ForkJoinPool的底层都是ForkJoinPool

    2530
    570

    2.12——parallelStreamAPI

     1 public class T_ParallelStreamAPI {
     2     public static void main(String[] agrs){
     3         List<Integer> nums = new ArrayList<>();
     4         Random r = new Random();
     5         for(int i=0;i<10000;i++){
     6             nums.add(1000000+r.nextInt(1000000));
     7         }
     8         long start = System.currentTimeMillis();
     9         nums.forEach(v->isPrime(v));
    10         long end = System.currentTimeMillis();
    11         System.out.println(end-start);
    12 
    13         //使用parallelStream api
    14         start = System.currentTimeMillis();
    15         //默认使用多线程
    16         nums.parallelStream().forEach(T_ParallelStreamAPI::isPrime);
    17         end = System.currentTimeMillis();
    18         System.out.println(end-start);
    19     }
    20     static boolean isPrime(int num){
    21         for(int i=2;i<num/2;i++){
    22             if(num%i==0)
    23                 return false;
    24         }
    25         return true;
    26     }
    27 }

    2530
    570

  • 相关阅读:
    定时器应用-最终版
    定时器应用-点击按钮,div向右移动
    通过js读取元素的样式
    延时调用
    定时器应用-切换图片的练习
    BOM对象属性定时器的调用
    BOM浏览器对象模型
    键盘移动
    Python-字符串方法
    Python实现注册和登录
  • 原文地址:https://www.cnblogs.com/PJQOOO/p/10933799.html
Copyright © 2011-2022 走看看