一、说明
rxjava 是基于事件的异步编程。在写代码之前,我们首先要了解几个概念。 (如果有什么错误之处,还请指正)
- Observable(被观察者,可观察对象,就是要进行什么操作,相当于生产者)
- bscriber 负责处理事件,他是事件的消费者
- Operator 是对 Observable 发出的事件进行修改和变换
- subscribeOn(): 指定 subscribe() 所发生的线程
- observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程
RxJava 3具有几个基础类
io.reactivex.rxjava3.core.Flowable
:0..N流量,支持反应流和背压io.reactivex.rxjava3.core.Observable
:0..N流动,无背压,io.reactivex.rxjava3.core.Single
:正好是1个项目的流或一个错误,io.reactivex.rxjava3.core.Completable
:没有项目但只有完成或错误信号的流程,io.reactivex.rxjava3.core.Maybe
:没有项目,恰好一项或错误的流程。
二、代码示例
1. 一个简单代码示例
public class Main { public static void main(String[] args) { Flowable.just("hello world ").subscribe(System.out::println); System.out.println("结束"); } } // 结果: hello world 结束
2. 结束是打印在后面
public static void main(String[] args) { Flowable.range(0,5).map(x->x * x).subscribe(x->{ Thread.sleep(1000); System.out.println(x); }); System.out.println("结束"); }
3. 可设置生产者和消费者使用的线程模型 。
public static void main(String[] args) throws Exception{ Flowable<String> source = Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "Done"; }); Flowable<String> runBackground = source.subscribeOn(Schedulers.io()); Flowable<String> showForeground = runBackground.observeOn(Schedulers.single()); showForeground.subscribe(System.out::println, Throwable::printStackTrace); System.out.println("------"); Thread.sleep(2000); source.unsubscribeOn(Schedulers.io());//事件发送完毕后,及时取消发送 System.out.println("结束"); }
4. 发现异步效果 是并行执行的结果
Flowable.range(1, 10) .observeOn(Schedulers.single()) .map(v -> v * v) .subscribe(System.out::println); System.out.println("结束");
5. 无异步效果
Flowable.range(1, 10) .observeOn(Schedulers.single()) .map(v -> v * v) .blockingSubscribe(System.out::println); System.out.println("结束");