//compose:对Observabl进行变换,加工处理
Observable.just(1, 2, 3, 4, 5)
.compose(new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) { //比如给observable添加subscribeOn、observeOn
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
})
.subscribe();
//map:数据类型变换
observable.map(new Function<String, User>() {
@Override
public User apply(String,User) { //将入参String类型变换为出参User类型
User user = new User(String);
return user;
}
})
//zip:合并
Observable.zip(observable1, observable2,
new BiFunction<List<User>, List<User1>, List<User2>>() {
@Override
public List<User> apply(List<User1> cricketFans, List<User2> footballFans) {//合并多个入参为一个出参
List<User> userWhoLovesBoth = filterUserWhoLovesBoth(cricketFans, footballFans);
return userWhoLovesBoth;
}
})
//flatMap:遍历。相当于for循环。返回一个Observable。
observable.flatMap(new Function<List<User>, ObservableSource<User>>() { // flatMap - to return users one by one
@Override
public ObservableSource<User> apply(List<User> usersList) {//入参是一个列表,出参是一个ObservableSource
return Observable.fromIterable(usersList); // returning user one by one from usersList.
}
})
.filter(new Predicate<User>() { //过滤
@Override
public boolean test(User user) {
// filtering user who follows me.
return user.isFollowing;
}
})
//take:从开始位置起提取多少个。一般和skip配合使用。skip:跳过前面的多少个。
observable.flatMap(new Function<List<User>, ObservableSource<User>>() { // flatMap - to return users one by one
@Override
public ObservableSource<User> apply(List<User> usersList) {//Observable.just(usersList)
return Observable.fromIterable(usersList); // returning user one by one from usersList.
}
})
.take(4) // it will only emit first 4 users out of all
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe()
//reduce:累计操作,回调最后一次结果10。
//scan也可以实现累计操作,同时回调多次中间结果1,3,6,10。
Observable.just(1,2,3,4).reduce(new BiFunction<Integer, Integer, Integer>() {//T1,T2,R 。T1是上次的累计值,T2是当前值。
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
})
.subscribe();//最后的输出结果是10。
//takeUntil:在另一件事发生之前,才做自己的事情。一旦这件事发生了就不做自己的事情了。
Observable<Long> timerObservable = Observable.timer(10, TimeUnit.SECONDS);//timer:10秒后执行,只会执行一次
timerObservable.subscribe(getObserver());
getObservable().zipWith(Observable.interval(0, 3, TimeUnit.SECONDS), //interval:0秒开始,每3秒执行一次
(s, aLong) -> {
System.out.println("String:" + s + "," + aLong);
return aLong;
})
//Will receive the items from Strings observable until timerObservable doesn't start emitting data.
.takeUntil(timerObservable) //timerObservable触发后,interval不再继续,getObserver2()不再接收到结果。
//We need to observe on MainThread because delay works on background thread to avoid UI blocking.
.subscribe(getObserver2());
Thread.sleep(10000000);//这个只是为了不让程序退出。
//Take the items until the condition is met.与filter类似
.takeWhile(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return !s.toLowerCase().contains("honey");
}
}