zoukankan      html  css  js  c++  java
  • java框架---->RxJava的使用(一)

      RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的observer。今天我们就来学习一下rxJava,并分析一下它源码感受一下它的观察者模式。

    RxJava的简单使用

    一、mavan的pom.xml中增加rxjava的依赖

    这里我们用的是rxjava1.3.0,目前最新的已经更新到2了。

    <dependency>
        <groupId>io.reactivex</groupId>
        <artifactId>rxjava</artifactId>
        <version>1.3.0</version>
    </dependency>

    二、测试的用类

    import rx.Observable;
    import rx.Subscriber;
    /**
     * @author huhx
     */
    public class RxJavaTest {
        public static void main(String[] args) {
            Observable<String> observable = Observable.just("One", "Two", "Three");
            observable.subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }
    
                @Override
                public void onError(Throwable item) {
                    System.out.println("onError");
                }
    
                @Override
                public void onNext(String item) {
                    System.out.println("Item is " + item);
                }
            });
        }
    }

    三、打印的结果如下:

    Item is One
    Item is Two
    Item is Three
    onCompleted

    RxJava源码的简单分析

    根据上述的代码,我们简单分析一下程序的流程。

    一、首先Observable.just("One", "Two", "Three")代码:

    just是一个工厂方法,用心构建一个Observable对象。

    public static <T> Observable<T> just(T t1, T t2, T t3) {
        return from((T[])new Object[] { t1, t2, t3 });
    }

    from方法的代码如下:

    public static <T> Observable<T> from(T[] array) {
        int n = array.length;
        if (n == 0) {
            return empty();
        } else
        if (n == 1) {
            return just(array[0]);
        }
        return unsafeCreate(new OnSubscribeFromArray<T>(array));
    }

    二、我们重点是看unsafeCreate方法:

    public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

    对于RxJavaHooks.onCreate()方法,第一次会先执行RxJavaHooks静态块的代码。

    static {
        init(); // 初始化了很多RxJavaHooks跟事件有着的变量
    }

    onCreate方法中,这个没有怎么看懂后续再会。

    public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

    三、对于observable.subscribe()代码

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    Observable的subscribe方法代码如下:

     1 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     2     if (subscriber == null) {
     3         throw new IllegalArgumentException("subscriber can not be null");
     4     }
     5     if (observable.onSubscribe == null) {
     6         throw new IllegalStateException("onSubscribe function can not be null.");
     7     }
     8 
     9     subscriber.onStart();
    10 
    11     if (!(subscriber instanceof SafeSubscriber)) {
    12         subscriber = new SafeSubscriber<T>(subscriber);
    13     }
    14 
    15     try {
    16         RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
    17         return RxJavaHooks.onObservableReturn(subscriber);
    18     } catch (Throwable e) {
    19         Exceptions.throwIfFatal(e);
    20         if (subscriber.isUnsubscribed()) {
    21             RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
    22         } else {
    23             try {
    24                 subscriber.onError(RxJavaHooks.onObservableError(e));
    25             } catch (Throwable e2) {
    26                 Exceptions.throwIfFatal(e2);
    27                 RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
    28                 RxJavaHooks.onObservableError(r);
    29                 throw r; // NOPMD
    30             }
    31         }
    32         return Subscriptions.unsubscribed();
    33     }
    34 }

    四、整个的正常流程会走到16行的代码,这个是我们重点的分析地方。如果是异常情况,则只会执行onError方法

    对于这个例子中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)得到的结果OnSubscribeFromArray这个类。调用它的call方法。

    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

    setProducer方法执行的代码如下:

     1 public void setProducer(Producer p) {
     2     long toRequest;
     3     boolean passToSubscriber = false;
     4     synchronized (this) {
     5         toRequest = requested;
     6         producer = p;
     7         if (subscriber != null) {
     8             // middle operator ... we pass through unless a request has been made
     9             if (toRequest == NOT_SET) {
    10                 // we pass through to the next producer as nothing has been requested
    11                 passToSubscriber = true;
    12             }
    13         }
    14     }
    15     // do after releasing lock
    16     if (passToSubscriber) {
    17         subscriber.setProducer(producer);
    18     } else {
    19         // we execute the request with whatever has been requested (or Long.MAX_VALUE)
    20         if (toRequest == NOT_SET) {
    21             producer.request(Long.MAX_VALUE);
    22         } else {
    23             producer.request(toRequest);
    24         }
    25     }
    26 }

    这里会走到21行的代码,跟进去FromArrayProducer的request方法:

    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("n >= 0 required but it was " + n);
        }
        if (n == Long.MAX_VALUE) {
            if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                fastPath();
            }
        } else
        if (n != 0) {
            if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                slowPath(n);
            }
        }
    }

    对于fastPath()的代码如下:

    void fastPath() {
        final Subscriber<? super T> child = this.child;
    
        for (T t : array) {
            if (child.isUnsubscribed()) {
                return;
            }
            child.onNext(t);
        }
        if (child.isUnsubscribed()) {
            return;
        }
        child.onCompleted();
    }

    执行onNext()方法,其中t就是Observable.just()方法的参数。child就是observable.subscribe的定义的参数。

    友情链接

  • 相关阅读:
    完成端口CreateIoCompletionPort编写高性能的网络模型程序
    offsetof的使用
    __attribute__
    nn_slow和nn_fast
    完成端口(Completion Port)详解(转)
    等待
    win8.1磁盘使用率100解决方法
    ubuntu 14.04 修改PS1提示符
    ubuntu14.04 开启root登陆
    Linux下彻底卸载LibreOffice方法
  • 原文地址:https://www.cnblogs.com/huhx/p/baseusejavarxjava1.html
Copyright © 2011-2022 走看看