zoukankan      html  css  js  c++  java
  • java:并发编程-Callable与Future模式

    自己对线程池的理解:

    coresize 3

    maxsize 5

    blockLinkedQuenue 3

    当提交的任务在<=3时,创建三个线程干活

    大于3时,把任务先加入阻塞式队列,当有空闲的核心线程便去执行他们,队列中的任务执行是实际运行的线程在复用执行

    如果后面有提交了很多任务,队列都放不下了,就赶紧创建新的线程去执行他们,如果任务已经大于了>队列+最大线程数,没有能力干活了,只能崩塌了,抛出拒绝异常

    一、合理配置线程池

    CPU密集

    CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。

    CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程,该任务都不可能得到加速,因为CPU总的运算能力就那些。

    IO密集

    IO密集型,即该任务需要大量的IO,即大量的阻塞。在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即时在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

    接着上一篇探讨线程池留下的尾巴,如何合理的设置线程池大小。

    要想合理的配置线程池的大小,首先得分析任务的特性,可以从以下几个角度分析:

    1. 任务的性质:CPU密集型任务、IO密集型任务、混合型任务。

    2. 任务的优先级:高、中、低。

    3. 任务的执行时间:长、中、短。

    4. 任务的依赖性:是否依赖其他系统资源,如数据库连接等。

    性质不同的任务可以交给不同规模的线程池执行。

    对于不同性质的任务来说,CPU密集型任务应配置尽可能小的线程,如配置CPU个数+1的线程数,IO密集型任务应配置尽可能多的线程,因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,如配置两倍CPU个数+1,而对于混合型的任务,如果可以拆分,拆分成IO密集型和CPU密集型分别处理,前提是两者运行的时间是差不多的,如果处理时间相差很大,则没必要拆分了。

    若任务对其他系统资源有依赖,如某个任务依赖数据库的连接返回的结果,这时候等待的时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才能更好的利用CPU。

    当然具体合理线程池值大小,需要结合系统实际情况,在大量的尝试下比较才能得出,以上只是前人总结的规律。

    最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

    比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:

    最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

    可以得出一个结论:  线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。  以上公式与之前的CPU和IO密集型任务设置线程数基本吻合。

    CPU密集型时,任务可以少配置线程数,大概和机器的cpu核数相当,这样可以使得每个线程都在执行任务

    IO密集型时,大部分线程都阻塞,故需要多配置线程数,2*cpu核数

    操作系统之名称解释:

    某些进程花费了绝大多数时间在计算上,而其他则在等待I/O上花费了大多是时间,

    前者称为计算密集型(CPU密集型)computer-bound,后者称为I/O密集型,I/O-bound。

    wait:

    1)当一个线程处于wait()状态时,也即等待它之前所持有的object's monitor被释放,通过notify()方法可以让该线程重新处于活动状态,从而去抢夺object's monitor,唤醒该线程。
    (2)如果多个线程同时处于等待状态,那么调用notify()方法只能随机唤醒一个线程。
    (3)在同一时间内,只有一个线程能够获得object's monitor,执行完毕之后,则再将其释放供其它线程抢占。

    Callable

    Java中,创建线程一般有两种方式,一种是继承Thread类,一种是实现Runnable接口。然而,这两种方式的缺点是在线程任务执行结束后,无法获取执行结果。我们一般只能采用共享变量或共享存储区以及线程通信的方式实现获得任务结果的目的。 不过,Java中,也提供了使用Callable和Future来实现获取任务结果的操作。Callable用来执行任务,产生结果,而Future用来获得结果。

    Callable接口与Runnable接口是否相似,查看源码,可知Callable接口的定义如下:

    @FunctionalInterface

    public interface Callable<V> {

        /**

         * Computes a result, or throws an exception if unable to do so.

         *

         * @return computed result

         * @throws Exception if unable to compute a result

         */

        V call() throws Exception;

    }

    可以看到,与Runnable接口不同之处在于,call方法带有泛型返回值V。

    Future常用方法

    V get() :获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。

    V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。

    boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。

    boolean isCanceller() :如果任务完成前被取消,则返回true。

    boolean cancel(boolean mayInterruptRunning) :如果任务还没开始,执行cancel(...)方法将返回false;如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回true;当任务已经启动,执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false;当任务已经完成,执行cancel(...)方法将返回false。mayInterruptRunning参数表示是否中断执行中的线程。

    通过方法分析我们也知道实际上Future提供了3种功能:(1)能够中断执行中的任务(2)判断任务是否执行完成(3)获取任务执行完成后额结果。

    我们通过简单的例子来体会使用Callable和Future来获取任务结果的用法。

    public class TestMain {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

    ExecutorService executor = Executors.newCachedThreadPool();

    Future<Integer> future = executor.submit(new AddNumberTask());

    System.out.println(Thread.currentThread().getName() + "线程执行其他任务");

    Integer integer = future.get();

    System.out.println(integer);

    // 关闭线程池

    if (executor != null)

    executor.shutdown();

    }

    }

    class AddNumberTask implements Callable<Integer> {

    public AddNumberTask() {

    }

    @Override

    public Integer call() throws Exception {

    System.out.println("####AddNumberTask###call()");

    Thread.sleep(5000);

    return 5000;

    }

    }

    Future模式

    Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑

    Futrure模式:对于多线程,如果线程A要等待线程B的结果,那么线程A没必要等待B,直到B有结果,可以先拿到一个未来的Future,等B有结果是再取真实的结果。

     在多线程中经常举的一个例子就是:网络图片的下载,刚开始是通过模糊的图片来代替最后的图片,等下载图片的线程下载完图片后在替换。而在这个过程中可以做一些其他的事情。

     

    首先客户端向服务器请求RealSubject,但是这个资源的创建是非常耗时的,怎么办呢?这种情况下,首先返回Client一个FutureSubject,以满足客户端的需求,于此同时呢,Future会通过另外一个Thread 去构造一个真正的资源,资源准备完毕之后,在给future一个通知。如果客户端急于获取这个真正的资源,那么就会阻塞客户端的其他所有线程,等待资源准备完毕。

    公共数据接口,FutureData和RealData都要实现。

    public interface Data {

    public abstract String getRequest();

    }

    FutureData,当有线程想要获取RealData的时候,程序会被阻塞。等到RealData被注入才会使用getReal()方法。

    public class FurureData implements Data {

    public volatile static boolean ISFLAG = false;

    private RealData realData;

    public synchronized void setRealData(RealData realData) {

    // 如果已经获取到结果,直接返回

    if (ISFLAG) {

    return;

    }

    // 如果没有获取到数据,传递真是对象

    this.realData = realData;

    ISFLAG = true;

    // 进行通知

    notify();

    }

    @Override

    public synchronized String getRequest() {

    while (!ISFLAG) {

    try {

    wait();

    } catch (Exception e) {

    }

    }

    // 获取到数据,直接返回

    return realData.getRequest();

    }

    }

    真实数据RealData

    public class RealData implements Data {

    private String result;

    public RealData(String data) {

    System.out.println("正在使用data:" + data + "网络请求数据,耗时操作需要等待.");

    try {

    Thread.sleep(3000);

    } catch (Exception e) {

    }

    System.out.println("操作完毕,获取结果...");

    result = "余胜军";

    }

    @Override

    public String getRequest() {

    return result;

    }

    FutureClient  客户端

    public class FutureClient {

    public Data request(String queryStr) {

    FurureData furureData = new FurureData();

    new Thread(new Runnable() {

    @Override

    public void run() {

    RealData realData = new RealData(queryStr);

    furureData.setRealData(realData);

    }

    }).start();

    return furureData;

    }

    }

    调用者:

    public class Main {

    public static void main(String[] args) {

    FutureClient futureClient = new FutureClient();

    Data request = futureClient.request("请求参数.");

    System.out.println("请求发送成功!");

    System.out.println("执行其他任务...");

    String result = request.getRequest();

    System.out.println("获取到结果..." + result);

    }

    }

    调用者请求资源,client.request("name"); 完成对数据的准备

    当要获取资源的时候,data.getResult() ,如果资源没有准备好isReady = false;那么就会阻塞该线程。直到资源获取然后该线程被唤醒。

  • 相关阅读:
    进程池,线程池,协程,gevent模块,协程实现单线程服务端与多线程客户端通信,IO模型
    线程相关 GIL queue event 死锁与递归锁 信号量l
    生产者消费者模型 线程相关
    进程的开启方式 进程的join方法 进程间的内存隔离 其他相关方法 守护进程 互斥锁
    udp协议 及相关 利用tcp上传文件 socketserver服务
    socket套接字 tcp协议下的粘包处理
    常用模块的完善 random shutil shevle 三流 logging
    day 29 元类
    Django入门
    MySQL多表查询
  • 原文地址:https://www.cnblogs.com/hejunhong/p/11546278.html
Copyright © 2011-2022 走看看