一、什么是 RxJava?
RxJava 是一个响应式编程框架,采用观察者设计模式。所以自然少不了 Observable 和 Subscriber 这两个东东了。
RxJava 是一个开源项目,地址:https://github.com/ReactiveX/RxJava
还有一个RxAndroid,用于 Android 开发,添加了 Android 用的接口。地址:https://github.com/ReactiveX/RxAndroid
二、例子
通过请求openweathermap 的天气查询接口返回天气数据
1、增加编译依赖
dependencies { compile fileTree(dir: 'libs', include: ['*.jar']) compile 'com.android.support:appcompat-v7:22.0.0' compile 'io.reactivex:rxjava:1.0.9' compile 'io.reactivex:rxandroid:0.24.0' compile 'com.squareup.retrofit:retrofit:1.9.0' }
retrofit 是一个 restful 请求客户端。详见:http://square.github.io/retrofit/
2、服务器接口
/** * 接口 * Created by Hal on 15/4/26. */ public class ApiManager { private static final String ENDPOINT = "http://api.openweathermap.org/data/2.5"; /** * 服务接口 */ private interface ApiManagerService { @GET("/weather") WeatherData getWeather(@Query("q") String place, @Query("units") String units); } private static final RestAdapter restAdapter = new RestAdapter.Builder().setEndpoint(ENDPOINT).setLogLevel(RestAdapter.LogLevel.FULL).build(); private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class); /** * 将服务接口返回的数据,封装成{@link rx.Observable} * @param city * @return */ public static Observable<WeatherData> getWeatherData(final String city) { return Observable.create(new Observable.OnSubscribe<WeatherData>() { @Override public void call(Subscriber<? super WeatherData> subscriber) { //订阅者回调 onNext 和 onCompleted subscriber.onNext(apiManager.getWeather(city, "metric")); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()); } }
订阅者的回调有三个方法,onNext,onError,onCompleted
/** * 多个 city 请求 * map,flatMap 对 Observable进行变换 */ Observable.from(CITIES).flatMap(new Func1<String, Observable<WeatherData>>() { @Override public Observable<WeatherData> call(String s) { return ApiManager.getWeatherData(s); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(/*onNext*/new Action1<WeatherData>() { @Override public void call(WeatherData weatherData) { Log.d(LOG_TAG, weatherData.toString()); } }, /*onError*/new Action1<Throwable>() { @Override public void call(Throwable throwable) { } }); /** * 单个 city 请求 */ ApiManager.getWeatherData(CITIES[0]).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<WeatherData>() { @Override public void call(WeatherData weatherData) { Log.d(LOG_TAG, weatherData.toString()); ((TextView) findViewById(R.id.text)).setText(weatherData.toString()); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Log.e(LOG_TAG, throwable.getMessage(), throwable); } }); /** * Android View 事件处理 */ ViewObservable.clicks(findViewById(R.id.text), false).subscribe(new Action1<OnClickEvent>() { @Override public void call(OnClickEvent onClickEvent) { } });
subscribeOn(Schedulers.io())与observeOn(AndroidSchedulers.mainThread())分别定义了这两个动作的线程。Android UI 更新需要在主线程。
4、retrofit 支持 rxjava 整合
/** * 服务接口 */ private interface ApiManagerService { @GET("/weather") WeatherData getWeather(@Query("q") String place, @Query("units") String units); /** * retrofit 支持 rxjava 整合 * 这种方法适用于新接口 */ @GET("/weather") Observable<WeatherData> getWeatherData(@Query("q") String place, @Query("units") String units); }