zoukankan      html  css  js  c++  java
  • Java.Util.Concurrent包中的一些接口和类

    1.Callable<V> :接口,多线程的一种实现方式,实现类重写方法,重写的call()方法有返回值或者抛出异常,需要配合着FutureTask类(实现了Runnable接口)使用:

     1 public class CallableTest {
     2     public static void main(String[] args) {
     3         Test t=new Test();
     4         FutureTask<Integer> f=new FutureTask<>(t);  //获取结果,与callable配合着使用  
    //FutureTask类的构造函数必须使用Runnable,或者Callable接口的实例
    5 new Thread(f).start(); //FutureTask也实现了Runnable接口 6 Integer i= null; 7 try { 8 i = f.get(); //FutureTask的方法,获得个、返回的结果 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } catch (ExecutionException e) { 12 e.printStackTrace(); 13 } 14 System.out.println(i); 15 16 } 17 } 18 class Test implements Callable<Integer>{ //实现Runnable接口,重写call()方法,有返回值 19 int sum=0; 20 @Override 21 public Integer call() throws Exception { 22 for (int i = 0; i <=100; i++) { 23 sum=sum+i; 24 } 25 return sum; 26 } 27 }

    2.Semaphore类:信号量,直接new对象,semaphore.acquire():获取信号量,如果获取失败如阻塞,semaphore.relase():释放信号量,semaphore.availablePermits():获取信号量的编号。

     Semaphore s2=new Semaphore(5,true);
     Semaphore s1=new Semaphore(5);  
    //两种构造方法,(int,boolean) int:资源的访问量 boolean:是否是公平竞争 //公平竞争:等的越久越先执行
    try {
    s1.acquire();//请求资源,如果资源不可用该线程阻塞,直到有可用资源为止;
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    s1.release();//释放资源
     

     一个信号量的停车场案例:

    public class Parking2 implements Runnable{
    Semaphore semaphore;
    int id;

    public Parking2(Semaphore semaphore, int id) {
    this.semaphore = semaphore;
    this.id = id;
    }

    public static void main(String[] args) {
    Semaphore semaphore=new Semaphore(4); //4个车位
    ExecutorService executorService= Executors.newCachedThreadPool();//一个线程池
    for (int i = 1; i <= 25; i++) {
    executorService.execute(new Parking2(semaphore,i));
    }
    executorService.shutdown();

    }

    @Override
    public void run() {
    try {
    push();
    Thread.sleep(1000);
    pop();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    //停
    public void push() throws InterruptedException {
    if(semaphore.availablePermits()>0){
    System.out.println(this.id+"可以停车!");
    }else{
    System.out.println(this.id+"请等待,没有车位!");
    }
    semaphore.acquire();
    System.out.println(this.id+"停车成功!");
    }
    //取
    public void pop(){
    semaphore.release();
    System.out.println(this.id+"取车成功!");
    }
    }

    3.ReentrantLock和Condition:

    ReentrantLock:可重入排他锁,乐观锁,直接new对象,需要手动的加和释放锁,但比较灵活。

    public class ReentrantTest implements Runnable{

    Test t;
    int id;

    public ReentrantTest(Test t, int id) {
    this.t = t;
    this.id = id;
    }

    @Override
    public void run() {
    try {
    t.get();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    public static void main(String[] args) {
    ExecutorService executorService= Executors.newCachedThreadPool();//线程池
    Test t=new Test();
    //保证每一个线程访问同一个线程池,和同一把锁
    for (int i = 1; i <= 30; i++) {
    executorService.execute(new ReentrantTest(t,i));
    }
    executorService.shutdown();//关闭
    }
    }
    class Test{
    ReentrantLock reentrantLock=new ReentrantLock();//new一个锁

    public void get() throws InterruptedException {
    reentrantLock.lock(); //上锁
    System.out.println(Thread.currentThread().getName()+"_启动");
    Thread.sleep(1000);
    System.out.println(Thread.currentThread().getName()+"_结束");
    //释放锁
    reentrantLock.unlock();
    }
    }

    Condition:是一个接口,创建Condition对象需要ReentrantLock.newCondition();主要有await():导致当前线程等待,singal():唤醒一个等待线程。如下代码实例证明:1.await(),signal()只能在当前对象获得锁(ReentrantLock)是被调用;2.当对象调用await()方法后,会交出锁,让其他线程对象争用;。在调用signal()后,锁对象又回到await()调用处,继续执行。

    public class ConditionTest {
        static ReentrantLock lock=new ReentrantLock();
        static Condition condition=lock.newCondition();  //condition是一个接口,有await(),singal()方法
        static int count=1;
    
        public static void main(String[] args) throws InterruptedException {
            Runnable r1= () -> {
                lock.lock();
                System.out.println("r1带锁启动。。。");
                try {
                    System.out.println("r1即将被锁住");
                    condition.await();   //等待,并且交出锁
                    System.out.println("r1又获新生!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("r1执行完毕");
                    lock.unlock();
                }
            };
    
            Runnable r2= () -> {
                lock.lock();
                System.out.println("r2带锁启动。。。");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("r1即将被唤醒!");
                condition.signal();  //唤醒r1
                System.out.println("r2执行完毕");
                lock.unlock();
    
            };
            new Thread(r1).start();  //让r1先执行,但条件不足,只能先等待
    
            new Thread(r2).start();
        }
    }

    4.BlockingQueue:是一个阻塞队列的超类接口,此接口的实现类有:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 。每个类的具体使用可以参考API。以下的实例使用LinkedBlockingQueue做的。

     1 //设计一个阻塞队列
     2 //每个多线程对象都会往queue(为对象共享static)中放元素会涉及:
     3 //      --->如果队列已满就阻塞等待,--->如果队列空就阻塞等待
     4 //本例中一共有三个执行:  Main主线程, 往blockingqueue放元素的线程 从blockingqueue中取元素的线程
     5          //三者一起执行
     6 public class TestBlockingQueue implements Runnable{
     7     //属性:
     8     int id;  //每个对象编号
     9     static BlockingQueue<String> queue=new LinkedBlockingQueue<>(3);
    10 
    11     //构造:
    12     public TestBlockingQueue(int id) {
    13         this.id = id;
    14     }
    15 
    16     @Override
    17     public void run() {
    18         //每个线程元素都会把自己的ID放到队列中
    19         try {
    20             queue.put(String.valueOf(this.id));  //put():会抛异常
    21             System.out.println(this.id+"已被放进队列");
    22         } catch (InterruptedException e) {
    23             e.printStackTrace();
    24         }
    25     }
    26 
    27     public static void main(String[] args) {
    28         //定义一个线程池:
    29         ExecutorService executorService= Executors.newCachedThreadPool();
    30         for (int i = 0; i < 15; i++) {
    31             //线程池执行线程对象
    32             executorService.execute(new TestBlockingQueue(i));
    33         }
    34         Thread t=new Thread(()->{
    35            //run()的方法体
    36             while (true){
    37                 try {
    38                     Thread.sleep(100);
    39                     if(queue.isEmpty()){//静态属性可以类名直接调
    40                         break;
    41                     }
    42                     //拿出队列中的元素
    43                     String string=TestBlockingQueue.queue.take();//拿出并删除队列头的元素
    44                     System.out.println(string+"被拿出");
    45                 } catch (InterruptedException e) {
    46                     e.printStackTrace();
    47                 }
    48             }
    49         });
    50 
    51         //让线程池去执行线程对象t
    52         executorService.execute(t);
    53 
    54         //关闭线程池
    55         executorService.shutdown();
    56     }
    57 }

    5.CompletionService:是一个接口,创建实现类对象,是将生产新的异步任务和使用已完成任务结果分离开来的服务。有实现类:ExecutorCompletionService,方法:submit(),take(),具体用法如下实例:

     1 //测试: CompletionService = new ExecutorCompletionService(Executor实现类)--->接口,实现类,让线程池来执行任务
     2 //方法: 1.completionService.submit(Callable<V>的实现类)-->提交执行线程对象
     3    //   2.completionService.take().get()-->检索并删除下一个已完成的任务,get()获得该线程的执行结果
     4 //会阻塞等待
     5 public class CompletionServiceTest implements Callable<String> {
     6    int id;
     7 
     8     public CompletionServiceTest(int id) {
     9         this.id = id;
    10     }
    11 
    12     @Override
    13     public String call() throws Exception {
    14         //每个线程打印,并返回自己的编号
    15         System.out.println(this.id+"_已启动");
    16         Thread.sleep(100);
    17         System.out.println(this.id+"_结束");
    18         return this.id+"_thread,call()方法的返回值";
    19     }
    20 
    21     public static void main(String[] args) {
    22         //线程池
    23         ExecutorService executorService= Executors.newCachedThreadPool();
    24                                                                               //放的是executor的实现类(线程池对象)
    25         CompletionService <String>completionService= new ExecutorCompletionService<String>(executorService);
    26                     //使用ExecutorCompletionService作为执行对象,LinkedBlockingQueue:作为完成队列
    27 
    28         for (int i = 0; i < 15; i++) {
    29             completionService.submit(new CompletionServiceTest(i));
    30              //提交执行
    31         }
    32         for (int i = 0; i < 15; i++) {
    33             try {
    34                 try {
    35                     System.out.println(completionService.take().get());
    36                        //并打印返回值
    37                       //take():获取已完成的任务的结果,并从完成队列中删除该任务
    38                 } catch (ExecutionException e) {
    39                     e.printStackTrace();
    40                 }
    41             } catch (InterruptedException e) {
    42                 e.printStackTrace();
    43             }
    44         }
    45         //线程池关闭
    46         executorService.shutdown();
    47     }
    48 }

    6.CountDonwLatch:是继承自Object的类,允许一个或一组线程等待其他线程完成的辅助类

    经典案例:10个人一起赛跑,得同时出发。

     1 //CountDownLatch是继承自Object的类
     2 //1.构造方法,直接new: countdownlatck=new CountDownLatch(int)
     3 //2.主要方法:countdownlatch.await():等待计数到达0,后开始线程执行,没有就阻塞等待
     4 //           countdownlatch.countDown():计数减一
     5 //一个10一起赛跑的案例: 要求:10个线程一起开始运行,等10个线程都运行结束后输出比赛结束的信息
     6 
     7 import java.util.concurrent.CountDownLatch;
     8 import java.util.concurrent.ExecutorService;
     9 import java.util.concurrent.Executors;
    10 
    11 public class CountDownLatchTest {
    12     //比赛开始倒计时:
    13     static CountDownLatch start=new CountDownLatch(1);
    14 
    15     //比赛结束倒计时:
    16     static CountDownLatch end=new CountDownLatch(10);//10名选手,每来一个就减少一个
    17 
    18     public static void main(String[] args) {
    19         //线程池
    20         ExecutorService pool= Executors.newFixedThreadPool(10);
    21         System.out.println("这是一场10人制比赛~~~");
    22         for (int i = 0; i < 10; i++) {  //创建10个线程
    23             int id=i+1;
    24             Runnable runner= () -> {
    25                 System.out.println("运动员:"+id+"准备就绪!");
    26                 try {
    27                     start.await();  //等待开始的指令
    28 
    29                     Thread.sleep(1000); //跑步中
    30 
    31                     System.out.println("运动员"+id+"已到达!");
    32                 } catch (InterruptedException e) {
    33                     e.printStackTrace();
    34                 }finally {
    35                     end.countDown();  //每跑完一个人就让计数减一
    36                 }
    37             };
    38             pool.execute(runner);  //执行10个线程,都阻塞在开始指令之前
    39         }
    40 
    41         start.countDown();
    42 
    43         try {
    44             end.await();  //等待比赛结束
    45         } catch (InterruptedException e) {
    46             e.printStackTrace();
    47         }
    48         System.out.println("全场比赛结束!");
    49         //关闭线程池
    50         pool.shutdown();
    51     }
    52 }

    7.CyclicBarrier:是一个类,作用是:允许一组线程互相等待到达一个屏障点前的同步辅助,这个屏障在所有等待的线程被释放后可以重新使用,这是该屏障称为循环(CountDownLatch不同之处)。

     1 //一个旅游团的例子
     2 //构造方法:cyclicBarrier=new CyclicBarrier(int)-->int:要等待的线程数
     3 //重要方法:await()-->当所有线程都执行了该方法,就一起继续向前执行,否则就等待
     4 public class CyclicBarrierTest {
     5     //属性: 三种策略到达5个城市的时间数组: 西安 北京 重庆 成都 杭州
     6     static int [] bycar={1,2,3,4,5};
     7     static int [] bybus={2,3,4,5,6};
     8     static int [] onfoot={3,4,5,6,7};
     9 
    10     static String getNow(){
    11         //获取现在的时间
    12         SimpleDateFormat sdf=new SimpleDateFormat("hh:mm:ss");
    13         return sdf.format(new Date());
    14     }
    15     static class Tour implements Runnable{
    16         CyclicBarrier cyclicBarrier=new CyclicBarrier(3); //三家旅行公司,采用三种celue
    17         String tourName; //旅行店的名字
    18         int [] time;
    19 
    20         public Tour(CyclicBarrier cyclicBarrier, String tourName,int [] time) {
    21             this.cyclicBarrier = cyclicBarrier;
    22             this.tourName = tourName;
    23             this.time=time;
    24         }
    25 
    26         @Override
    27         public void run() {
    28             try {
    29                 Thread.sleep(time[0]*1000);
    30                 System.out.println(getNow()+":"+this.tourName+"到达-->西安");
    31                 try {
    32                     cyclicBarrier.await();   //等待其他线程执行到此处
    33 
    34                     //继续执行:
    35                     Thread.sleep(time[1]*1000);
    36                     System.out.println(getNow()+":"+this.tourName+"到达-->北京");
    37                     cyclicBarrier.await();  //等待其他两个团到达北京,循环使用
    38 
    39                     Thread.sleep(time[2]*1000);
    40                     System.out.println(getNow()+":"+this.tourName+"到达-->重庆");
    41                     cyclicBarrier.await();  //等待其他两个团到达北京,循环使用
    42 
    43                     Thread.sleep(time[3]*1000);
    44                     System.out.println(getNow()+":"+this.tourName+"到达-->成都");
    45                     cyclicBarrier.await();  //等待其他两个团到达北京,循环使用
    46 
    47                     Thread.sleep(time[4]*1000);
    48                     System.out.println(getNow()+":"+this.tourName+"到达-->杭州");
    49                     //cyclicBarrier.await();  //等待其他两个团到达北京,循环使用
    50                     
    51                 } catch (BrokenBarrierException e) {
    52                     e.printStackTrace();
    53                 }
    54             } catch (InterruptedException e) {
    55                 e.printStackTrace();
    56             }
    57         }
    58     }
    59     public static void main(String[] args) {
    60         //定义线程池
    61         ExecutorService pool= Executors.newFixedThreadPool(3);
    62 
    63         //定义CyclicBarrier
    64         CyclicBarrier cyclicBarrier=new CyclicBarrier(3);
    65 
    66         //线程执行线程,分别执行
    67         pool.execute(new Tour(cyclicBarrier,"T_Walk",onfoot));
    68         pool.execute(new Tour(cyclicBarrier,"T_Car",bycar));
    69         pool.execute(new Tour(cyclicBarrier,"T_Bus",bybus));
    70 
    71         //关闭线程池
    72         pool.shutdown();
    73     }
    74 }

    8.Future<V>:是一个接口,实现类:FutureTask,CompletableFuture等,get()方法将返回V类型的数据。提供方法来检查计算是否完成,等待其完成,并检索计算结果。 结果只能在计算完成后使用方法get进行检索,如有必要,阻塞,直到准备就绪。 取消由cancel方法执行。 提供其他方法来确定任务是否正常完成或被取消。 计算完成后,不能取消计算。 如果您想使用Future ,以便不可撤销,但不提供可用的结果,则可以声明Future<?>表格的类型,并返回null作为基础任务的结果。

    9.ScheduledExecutorService:接口:(实现类:ScheduledExecutorThreadPool),作用可以让一个EexcutorService在给定延迟后执行,或者是定期执行。

     1 //ScheduledExecutorService:接口:(实现类:ScheduledExecutorThreadPool)
     2 //作用可以让一个EexcutorService在给定延迟后执行,或者是定期执行。
     3 // 方法1:pool.ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6);
     4 //参数含义:command - 要执行的任务
     5 //         initialDelay - 延迟第一次执行的时间
     6 //         period - 连续执行之间的时期
     7 //         unit - initialDelay和period参数的时间单位
     8 
     9 //方法2:ScheduledFuture<?> schedule(Runnable command,
    10 //                            long delay,
    11 //                            TimeUnit unit)创建并执行在给定延迟后启用的单次操作。
    12 //参数
    13 //command - 要执行的任务
    14 //delay - 从现在开始延迟执行的时间
    15 //unit - 延时参数的时间单位
    16 public class ScheduledExecutorServiceTest {
    17     public static void main(String[] args) {
    18         //创建线程池
    19         final ScheduledExecutorService pool= Executors.newScheduledThreadPool(2);
    20                        //创建一个可以在给定延迟后运行,或者定期执行的线程池
    21         final Runnable runnable=new Runnable() {
    22             int count=0;
    23             @Override
    24             public void run() {
    25                 System.out.println(new Date()+"runnable"+(++count));
    26             }
    27         };
    28 
    29         //1秒后运行,每隔2秒运行一次
    30         ScheduledFuture<?> scheduledFuture = pool.scheduleAtFixedRate(runnable,1,2,SECONDS);
    31         //2秒后运行,往后每隔5秒运行一次
    32         ScheduledFuture<?> scheduledFuture1 = pool.scheduleAtFixedRate(runnable, 2, 5, SECONDS);
    33         //30秒后结束关闭任务,并关闭Scheduled
    34         pool.schedule(new Runnable() {
    35             @Override
    36             public void run() {
    37                 //关闭操作
    38                 scheduledFuture.cancel(true);
    39                 scheduledFuture1.cancel(true);
    40                 //关闭线程池
    41                 pool.shutdown();
    42             }
    43         },30,SECONDS);
    44 
    45     }
    46 }

    以上内容参考来自Java ApI文档;

    以及推荐博客

     
  • 相关阅读:
    pytest学习(2)
    pytest学习(1)
    facets学习(1):什么是facets
    window10下部署flask系统(apache和wsgi)
    python tips:列表推导
    python的一致性(1)sorted和len
    THULAC:一个高效的中文词法分析工具包(z'z)
    EAScript 2016的新增语法(1)
    python的property的用法
    python算法:LinkedList(双向线性链表)的实现
  • 原文地址:https://www.cnblogs.com/xbfchder/p/10975146.html
Copyright © 2011-2022 走看看