public class ConcurrencyWithSchedulersDemoFragment
extends BaseFragment {
@Bind(R.id.progress_operation_running)
ProgressBar _progress;
@Bind(R.id.list_threading_log)
ListView _logsList;
private LogAdapter _adapter;
private List<String> _logs;
private Subscription _subscription;
@Override
public void onDestroy() {
super.onDestroy();
RxUtils.unsubscribeIfNotNull(_subscription);
ButterKnife.unbind(this);
}
@Override
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
super.onActivityCreated(savedInstanceState);
_setupLogger();
}
@Override
public View onCreateView(LayoutInflater inflater,
@Nullable ViewGroup container,
@Nullable Bundle savedInstanceState) {
View layout = inflater.inflate(R.layout.fragment_concurrency_schedulers, container, false);
ButterKnife.bind(this, layout);
return layout;
}
@OnClick(R.id.btn_start_operation)
public void startLongOperation() {
_progress.setVisibility(View.VISIBLE);
_log("Button Clicked");
_subscription = _getObservable()//
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(_getObserver()); // Observer
}
private Observable<Boolean> _getObservable() {
return Observable.just(true).map(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean aBoolean) {
_log("Within Observable");
_doSomeLongOperation_thatBlocksCurrentThread();
return aBoolean;
}
});
}
/**
* Observer that handles the result through the 3 important actions:
* <p/>
* 1. onCompleted
* 2. onError
* 3. onNext
*/
private Observer<Boolean> _getObserver() {
return new Observer<Boolean>() {
@Override
public void onCompleted() {
_log("On complete");
_progress.setVisibility(View.INVISIBLE);
}
@Override
public void onError(Throwable e) {
Timber.e(e, "Error in RxJava Demo concurrency");
_log(String.format("Boo! Error %s", e.getMessage()));
_progress.setVisibility(View.INVISIBLE);
}
@Override
public void onNext(Boolean bool) {
_log(String.format("onNext with return value "%b"", bool));
}
};
}
// -----------------------------------------------------------------------------------
// Method that help wiring up the example (irrelevant to RxJava)
private void _doSomeLongOperation_thatBlocksCurrentThread() {
_log("performing long operation");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Timber.d("Operation was interrupted");
}
}
private void _log(String logMsg) {
if (_isCurrentlyOnMainThread()) {
_logs.add(0, logMsg + " (main thread) ");
_adapter.clear();
_adapter.addAll(_logs);
} else {
_logs.add(0, logMsg + " (NOT main thread) ");
// You can only do below stuff on main thread.
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
_adapter.clear();
_adapter.addAll(_logs);
}
});
}
}
private void _setupLogger() {
_logs = new ArrayList<String>();
_adapter = new LogAdapter(getActivity(), new ArrayList<String>());
_logsList.setAdapter(_adapter);
}
private boolean _isCurrentlyOnMainThread() {
return Looper.myLooper() == Looper.getMainLooper();
}
private class LogAdapter
extends ArrayAdapter<String> {
public LogAdapter(Context context, List<String> logs) {
super(context, R.layout.item_log, R.id.item_log, logs);
}
}
}