Observable novel = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("连载1"); emitter.onNext("连载2"); emitter.onNext("连载3"); emitter.onComplete(); } });
//Observer who observe something intresested
Observer<String> reader=new Observer<String>() { @Override public void onSubscribe(Disposable d) { mDisposable=d; Log.e(TAG,"onSubscribe"); } @Override public void onNext(String value) { if ("2".equals(value)){ mDisposable.dispose(); return; } Log.e(TAG,"onNext:"+value); } @Override public void onError(Throwable e) { Log.e(TAG,"onError="+e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"onComplete()"); } };
//something observable is now subscribed by observer
novel.subscribe(reader);//一行代码搞定
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } });