import android.util.Log;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
public class RXJavaDemo {
private static final String TAG = RXJavaDemo.class.getSimpleName();
private int count = 0;
public RXJavaDemo() {
}
public void call() {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
mObservable.subscribe(mSubscriber);
mObservable.subscribe(action1);
Observable.just("just Object").subscribe(action1);
}
}
}).start();
}
private Observable<String> mObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}
});
private Subscriber<String> mSubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.v(TAG, "onNext, string : " + s);
Log.v(TAG, "onNext, count : " + count);
count++;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onCompleted() {
Log.v(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.v(TAG, "onError, e : " + e.toString());
}
};
private Action1<String> action1 = new Action1() {
@Override
public void call(Object o) {
if (o == null) {
Log.v(TAG, "Action1, object is null");
return;
}
Log.v(TAG, "Acition1, o : " + ((String) o));
}
};
}
使用RXJava之前:
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
使用RXJava之后:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
build.gradle
compile 'io.reactivex:rxjava:1.0.9'
compile 'io.reactivex:rxandroid:0.24.0'
compile 'com.squareup.retrofit:retrofit:1.9.0'
RxJava提供四种不同的Subject:PublishSubject、BehaviorSubject、、ReplaySubject.、AsyncSubject
BehaviorSubject, 会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。
ReplaySubject, 会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发。
AsyncSubject, 当Observable完成时只会发布最后一个数据给已经订阅的每一个观察者。
PublishSubject, 没有发送数据,观察者只能等待,没有线程阻塞,没有资源消耗。在调用publishSubject.onNext时,才发送消息。 发送消息结束以后,publishSubject并没有结束,观察者等待消息再一次的发送。如果想关闭publishSubject,publishSubject需调用publishSubject.onCompleted方法关闭。此时,publishSubject再发送消息,观察者不能收到发送的消息。