Buffer
-
设定收集n个元素为一组,以下方代码为例,三个为一组,则当组满三个元素时,返回一次List数据 |
-
没组满三个元素时,如果调用onComplete,直接发送剩余元素,没调用onComplete,一直等待 |
PublishSubject<String> subject = PublishSubject.create();
Disposable disposable = subject
.buffer(3)//获取三个为一组发送
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> s) throws Exception {
StringBuilder content = new StringBuilder();
for (String index : s) {
content.append(index).append(",");
}
LogUtils.e("rxJavaBuffer==" + content);
}
});
subject.onNext("1");
subject.onNext("2");
subject.onNext("3");
subject.onNext("4");
subject.onNext("5");
subject.onNext("6");
subject.onNext("7");
subject.onNext("8");
subject.onNext("9");
subject.onNext("10");
subject.onComplete();
复制代码
GroupBy
-
很有意思的操作符。先将获取的元素分组(自己分配key),生成对应的GroupedObservable |
-
GroupedObservable有点类似HashMap,包含key(自己分配的)和元素 |
-
应该注意的是,当每组GroupedObservable首次订阅新的订阅者后,后续同组元素直接将数据发送给新的订阅者。看代码 |
PublishSubject<String> subject = PublishSubject.create();
Disposable disposable = subject
.groupBy(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
// ? 第一步。 数据分类,分配不同的key
if (Integer.valueOf(s) < 4) {
return "one";
} else if (Integer.valueOf(s) < 7) {
return "two";
}
if (Integer.valueOf(s) < 10) {
return "three";
}
return "other";
}
})
.subscribe(new Consumer<GroupedObservable<String, String>>() {
@Override
public void accept(GroupedObservable<String, String> sub) throws Exception {
// ? 第二步。 根据不同的key,绑定新的订阅者。
// 如果改组已经订阅了新的订阅者,直接发送给新的订阅者
LogUtils.e(sub.getKey());
switch (sub.getKey()) {
case "one":
sub.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e("GroupedObservable==one" + s);
}
});
break;
case "two":
sub.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e("GroupedObservable==two" + s);
}
});
break;
case "three":
sub.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e("GroupedObservable==three" + s);
}
});
break;
default:
sub.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e("GroupedObservable==other" + s);
}
});
break;
}
}
});
subject.onNext("1");
subject.onNext("2");
subject.onNext("3");
subject.onNext("4");
subject.onNext("5");
subject.onNext("6");
subject.onNext("7");
subject.onNext("8");
subject.onNext("9");
subject.onNext("10");
subject.onComplete();
复制代码