Last time we talked about using Thread Pool Executors for background work in Android. The problem with that approach we found is that the event sender does know how the result should be handled. Let’s see what RxJava offers here.

Previous parts:

Part I: AsyncTasks

Part II: Loaders

Part III: ThreadPools and EventBus

Disclaimer: this is not an article about how to use RxJava in Android. There are tons of such articles out there. This story is about the implementation details.

Actually, RxJava 2 is not a tool for background work; it’s an instrument for event streams processing.

via GIPHY

The background here is just an aspect of such processing. The core idea of Background work approach is to use the Schedulers. Let’s jump right to the Scheduler declaration:

Rather complex, right? The good news you don’t have to implement it! The library already ships a bunch of those exectutors: Schedulers.io(), Schedulers.computation(), etc. All you need is to pass the scheduler instance to the subscribeOn()/observeOn() method in your Rx chain:

Then, rxjava will do the rest for you: it will take the lamdas you pass to the operators and submit them on the scheduler.

So, for example, if you want your observers to change the UI all you need to do is just pass AndroidSchedulers.mainThread() to observeOn() and it’s done: no more coupling, no more platform specific code, just brilliance. Of course, AndroidSchedulers is not a part of the origin RxJava library and comes with a separate one, but that’s just one more line in your build.gradle.

So what is so confusing about threading here? The trick is that you can’t just place the subscribeOn()/observeOn() in any place in your rxChain(it would be convenient, right?). Instead you need to take into account how those operators get their schedulers. First of all let’s find out, that every time you call map, or flatMap, or filter or whatever you get a new object, for example:

So basically almost every line here, creates a new object:

So, for example SingleMap will get it’s scheduler through a chain of calls, which starts with the .subscribe() call at the end of our chain:

subscribeActualis implemented for each Single-operator like this:

source.subscribe()

where sourceis the operator, preceding the current one, thus actually creating the chain we are working with and reaching the first Single, we created in this chain.

In our case it is Single.fromCallable:

Inside that lambda we’re doing our network calls.

But where our scheduler is? Here, inside SingleSubscribeOn:

Here the scheduler is the one we passed into subscribeOn() method.

With all those code I am showing how the scheduler we pass into the chain gets used by the code we pass into operator’s lambdas.

Another point is observeOn() method. It creates an instance of SingleObserveOn class(in our case), which subsribeActual looks already trivial for us:

The ObserveOnSingleObserver here is much more interesting.

Calling observeOn makes the actual observer to be called on the scheduler thread, which in turn opens a possibility to switch threads right in the rxChain: so you can obtain data from the server on Schedulers.io(), then make some heavy computations on Schedulers.computation(), update UI, calculate something else and just then proceed to actual subscribers.

Rather complex thing under the hood, RxJava became very flexible and powerful tool to perform event processing and thus manage the background work. But IMO this approach still has its caveats:

  1. It’s time consuming to learn RxJava
  2. The amount of operators to learn is relatively high and the difference between operators are not obvious
  3. The stacktraces for RxJava calls are almost unrelated to the code you write by yourself.

So, what’s next? Kotlin coroutines of course! See you next time.

Previous articles:

Part I: AsyncTasks

Part II: Loaders

Part III: Thread pool executors