zoukankan      html  css  js  c++  java
  • Netty中的Future

    先看下Future的整个继承体系,还有一个ChannelFuture不在里面;

        在并发编程中,我们通常会用到一组非阻塞的模型:Promise,Future 和 Callback。其中的 Future 表示一个可能还没有实际完成的异步任务的结果,针对这个结果可以添加 Callback 以便在任务执行成功或失败后做出对应的操作,而 Promise 交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。 可以说这一套模型是很多异步非阻塞架构的基础。
        这一套经典的模型在 Scala、C# 中得到了原生的支持,但 JDK 中暂时还只有无 Callback 的 Future 出现,当然也并非在 JAVA 界就没有发展了,比如 Guava 就提供了ListenableFuture 接口,而 Netty 4+ 更是提供了完整的 Promise、Future 和 Listener 机制,在 Netty 的官方文档 Using as a generic library 中也介绍了将 Netty 作为一个 lib 包依赖,并且使用 Listenable futures 的示例。在实际的项目使用中,发现 Netty 的 EventLoop 机制不一定适用其他场景,因此想去除对 EventLoop 的依赖,实现一个简化版本。
        Netty自己实现了一套并发库,Future是其中的一块,下篇文章讲下他的并发库的线程池实现。Netty的Future的特性
    • Future<V>的V为异步结果的返回类型
    • getNow 是无阻塞调用,返回异步执行结果,如果未完成那么返回null
    • await 是阻塞调用,等到异步执行完成
    • isSuccess 执行成功是否成功
    • sync  阻塞调用,等待这个future直到isDone(可能由于正常终止、异常或取消而完成)返回true; 如果该future失败,重新抛出失败的原因。 和await区别就是返回结果不同,它返回一个Future对象,通过这个Future知道任务执行结果。
    • 添加GenericFutureListener, 执行完成(future可能由于正常终止、异常或取消而完成)后调用该监听器。
    如果我实现这个Future怎么实现:1:任务执行器,这样我们可以控制任务的执行了。2:执行结果,用于返回。3:监听器集合。4:执行结果状态
    5:触发监听器的函数。那么怎么执行完成后自定触发监听器,应该在Future里面又启动了一个线程去执行这个触发机制。这些都是我的猜想。看下他的子类怎么实现的:
     
    AbstractFuture实现了JDK中的get方法,调用netty中future的方法,一个模板模式出现了。所有的Future都会继承该类
    Promise:任务执行者可以标记任务执行成功或失败,添加了setFailure(Throwable)可以知道Promise的子类需要有个成员变量来保存异常,添加了setSuccess(V) 方法,setUncancellable()方法, 和tryFailure和trySuccess方法。这样所有的操作都可以返回一个Promise,你自己检测是否执行成功,好处是啥,接口统一吗?
    ScheduledFuture:一个定时任务的执行结果
    ProgressiveFuture:可以跟踪任务的执行进度。
     
    下面看下几个类的具体实现:
    DefaultPromise:成员变量如下:
    1. privatefinalEventExecutor executor; //任务执行器
    2. privatevolatileObject result;//不仅仅是结果,也有可能是异常
    3. * 一个或多个监听器,可能是GenericFutureListener或者DefaultFutureListeners。如果是NULL有两种可能
      * 1:没有添加触发器
      * 2:已经出发了
    4. privateObject listeners;
    5. privateLateListeners lateListeners;
    6. privateshort waiters;
    看来异常也是结果。
    看一个重要方法的实现:
    isDone:可能由于正常终止、异常或取消而完成
    1. privatestaticboolean isDone0(Object result){
    2. return result !=null&& result != UNCANCELLABLE;
    3. }
    isSuccess: 任务执行成功
    1. publicboolean isSuccess(){
    2. Object result =this.result;
    3. if(result ==null|| result == UNCANCELLABLE){
    4. returnfalse;
    5. }
    6. return!(result instanceofCauseHolder);
    7. }
    getNow:返回执行结果
    1. public V getNow(){
    2. Object result =this.result;
    3. if(result instanceofCauseHolder|| result == SUCCESS){
    4. returnnull;
    5. }
    6. return(V) result;
    7. }
    sync:同步阻塞方法
    1. @Override
    2. publicPromise<V> sync()throwsInterruptedException{
    3. await();
    4. rethrowIfFailed();
    5. returnthis;
    6. }
    看来比await多了抛出异常。
    await:等待任务执行完成
    1. @Override
    2. publicPromise<V> await()throwsInterruptedException{
    3. if(isDone()){
    4. returnthis;
    5. }
    6. if(Thread.interrupted()){
    7. thrownewInterruptedException(toString());
    8. }
    9. synchronized(this){
    10. while(!isDone()){
    11. checkDeadLock();//判断当前线程是否是执行线程。如果是抛出异常。
    12. incWaiters();//添加等待个数
    13. try{
    14. wait();//释放锁,等待唤醒,阻塞该线程
    15. }finally{
    16. decWaiters();
    17. }
    18. }
    19. }
    20. returnthis;
    21. }
    执行器所在的线程不能调用await(),只能是调用者所在的线程才可以,waiters有什么用呢?
    cancle方法使用:
    1. @Override
    2. publicboolean cancel(boolean mayInterruptIfRunning){
    3. Object result =this.result;
    4. if(isDone0(result)|| result == UNCANCELLABLE){
    5. returnfalse;
    6. }
    7. synchronized(this){
    8. // Allow only once.
    9. result =this.result;
    10. if(isDone0(result)|| result == UNCANCELLABLE){
    11. returnfalse;
    12. }
    13. this.result = CANCELLATION_CAUSE_HOLDER;
    14. if(hasWaiters()){
    15. notifyAll();
    16. }
    17. }
    18. notifyListeners();
    19. returntrue;
    20. }
    当我们修改DefaultPromise的状态时,要触发监听器。
    notifyListeners:
    1. /**
    2. * 该方法不需要异步,为啥呢
    3. * 1:这个方法在同步代码块里面调用,因此任何监听器列表的改变都happens-before该方法
    4. * 2:该方法只有isDone==true的时候调用,一但 isDone==true 那么监听器列表将不会改变
    5. */
    6. privatevoid notifyListeners(){
    7. Object listeners =this.listeners;
    8. if(listeners ==null){
    9. return;
    10. }
    11. EventExecutor executor = executor();
    12. if(executor.inEventLoop()){
    13. finalInternalThreadLocalMap threadLocals =InternalThreadLocalMap.get();
    14. finalint stackDepth = threadLocals.futureListenerStackDepth();
    15. if(stackDepth < MAX_LISTENER_STACK_DEPTH){
    16. threadLocals.setFutureListenerStackDepth(stackDepth +1);
    17. try{
    18. if(listeners instanceofDefaultFutureListeners){
    19. notifyListeners0(this,(DefaultFutureListeners) listeners);
    20. }else{
    21. finalGenericFutureListener<?extendsFuture<V>> l =
    22. (GenericFutureListener<?extendsFuture<V>>) listeners;
    23. notifyListener0(this, l);
    24. }
    25. }finally{
    26. this.listeners =null;
    27. threadLocals.setFutureListenerStackDepth(stackDepth);
    28. }
    29. return;
    30. }
    31. }
    32. if(listeners instanceofDefaultFutureListeners){
    33. finalDefaultFutureListeners dfl =(DefaultFutureListeners) listeners;
    34. execute(executor,newRunnable(){
    35. @Override
    36. publicvoid run(){
    37. notifyListeners0(DefaultPromise.this, dfl);
    38. DefaultPromise.this.listeners =null;
    39. }
    40. });
    41. }else{
    42. finalGenericFutureListener<?extendsFuture<V>> l =
    43. (GenericFutureListener<?extendsFuture<V>>) listeners;
    44. execute(executor,newRunnable(){
    45. @Override
    46. publicvoid run(){
    47. notifyListener0(DefaultPromise.this, l);
    48. DefaultPromise.this.listeners =null;
    49. }
    50. });
    51. }
    52. }
    任务永远不能在主线程中执行,需要放到执行器所在的线程执行。DefaultFutureListeners和GenericFutureListener,一个是容器,一个是元素
    还有几个修改状态的方法:
    1. @Override
    2. publicboolean setUncancellable(){
    3. Object result =this.result;
    4. if(isDone0(result)){
    5. return!isCancelled0(result);
    6. }
    7. synchronized(this){
    8. // Allow only once.
    9. result =this.result;
    10. if(isDone0(result)){
    11. return!isCancelled0(result);
    12. }
    13. this.result = UNCANCELLABLE;
    14. }
    15. returntrue;
    16. }
    17. privateboolean setFailure0(Throwable cause){
    18. if(cause ==null){
    19. thrownewNullPointerException("cause");
    20. }
    21. if(isDone()){
    22. returnfalse;
    23. }
    24. synchronized(this){
    25. // Allow only once.
    26. if(isDone()){
    27. returnfalse;
    28. }
    29. result =newCauseHolder(cause);
    30. if(hasWaiters()){
    31. notifyAll();
    32. }
    33. }
    34. returntrue;
    35. }
    36. privateboolean setSuccess0(V result){
    37. if(isDone()){
    38. returnfalse;
    39. }
    40. synchronized(this){
    41. // Allow only once.
    42. if(isDone()){
    43. returnfalse;
    44. }
    45. if(result ==null){
    46. this.result = SUCCESS;
    47. }else{
    48. this.result = result;
    49. }
    50. if(hasWaiters()){
    51. notifyAll();
    52. }
    53. }
    54. returntrue;
    55. }
    CompleteFuture的几个子类是状态Promise
     
    PromiseTask:该类继承了RunnableFuture接口,该类表示异步操作的结果也可以异步获得,类似JDK中的FutureTask,实例化该对象时候需要传一个Callable的对象,如果没有该对象可以传递一个Runnable和一个Result构造一个Callable对象。
    1. privatestaticfinalclassRunnableAdapter<T>implementsCallable<T>{
    2. finalRunnable task;
    3. final T result;
    4. RunnableAdapter(Runnable task, T result){
    5. this.task = task;
    6. this.result = result;
    7. }
    8. @Override
    9. public T call(){
    10. task.run();
    11. return result;
    12. }
    13. @Override
    14. publicString toString(){
    15. return"Callable(task: "+ task +", result: "+ result +')';
    16. }
    17. }
    看下他的run方法。
    1. @Override
    2. publicvoid run(){
    3. try{
    4. if(setUncancellableInternal()){
    5. V result = task.call();
    6. setSuccessInternal(result);
    7. }
    8. }catch(Throwable e){
    9. setFailureInternal(e);
    10. }
    该类的setFailure,setSuccess等方法都会抛出一异常,而加了internal的方法会成功执行,他们是protected,子类或者同一个package中可以调用。
    ScheduledFutureTask:该类是定时任务返回的ChannelFuture是这个结构中最重要的类,从名称可以知道是通道异步执行的结果:在netty中所有的IO操作都是异步的。这意味这所有的IO调用都会立即返回,且不保证IO操作完成。
    IO调用会返回一个ChannelFuture的实例,通过该实例可以查看IO操作的结果和状态,
    ChannelFuture有完成和未完成两种状态,当IO操作开始,就会创建一个ChannelFuture的实例,该实例初始是未完成状态,它不是成功,失败,或者取消,因为IO操作还没有完成,如果IO操作完成了那么将会有成功,失败,和取消状态,
    *                                      +---------------------------+
    * | Completed successfully |
    * +---------------------------+
    * +----> isDone() = <b>true</b> |
    * +--------------------------+ | | isSuccess() = <b>true</b> |
    * | Uncompleted | | +===========================+
    * +--------------------------+ | | Completed with failure |
    * | isDone() = <b>false</b> | | +---------------------------+
    * | isSuccess() = false |----+----> isDone() = <b>true</b> |
    * | isCancelled() = false | | | cause() = <b>non-null</b> |
    * | cause() = null | | +===========================+
    * +--------------------------+ | | Completed by cancellation |
    * | +---------------------------+
    * +----> isDone() = <b>true</b> |
    * | isCancelled() = <b>true</b> |
    * +---------------------------+
     该类提供了很多方法用来检查IO操作是否完成,等待完成,和接受IO操作的结果。还可以添加ChannelFutureListener的监听器,这样IO操作完成时就可以得到提醒
     * 强烈建议使用addListener而不是await。
     * addListener是非阻塞的,它简单的添加指定的ChannelFutureListener到ChannelFuture中,
     * IO线程将在当绑定在这个future的IO操作完成时,触发这个触发器,优点是提高效率和资源的利用率
     * await()是一个阻塞方法,一旦调用,调用线程将会阻塞直到IO操作完成。优点是容易实现顺序逻辑

     
     
     
     
     
     
     
     





  • 相关阅读:
    CentOS搭建LAMP环境
    CentOS下为Apache部署Symantec SSL证书并实现强制https访问
    [Raspi]树莓派4B装机使用上手指南
    [FPGA]Verilog利用PWM调制完成RGB三色彩虹呼吸灯
    [FPGA]浅谈LCD1602字符型液晶显示器(Verilog)
    [FPGA]Verilog实现JK触发器组成的8421BCD码十进制计数器
    [FPGA]Verilog实现可自定义的倒计时器(24秒为例)
    [FPGA]Verilog实现8位串并转换器HC595
    [FPGA]Verilog实现寄存器LS374
    STM32F407之USART
  • 原文地址:https://www.cnblogs.com/gaoxing/p/4401794.html
Copyright © 2011-2022 走看看