February 29, 2020

Forget RxJava: Kotlin Coroutines are all you need. Part 2/2 — Channels

Forget RxJava: Kotlin Coroutines are all you need. Part 2/2 — Channels

The series about RxJava — Coroutines replacement started here continues. Now we’re considering more complex case than having a bunch network calls.

RxJava is a powerful event processing library. But how we can do the same with Kotlin Coroutines wasn’t quite covered in a previous part, which the readers didn’t miss a chance to point it out:

Let’s consider such a case.

Imagine we’re implementing the Github repositories search in a mobile app, like this:

The RxJava code will be the following:

We’re definitely benefiting from RxJava here: the fact that we can debounce sending the network requests as we don’t want to do it on each letter type, and that we can save them in case the user added and removed a letter just with a couple lines of code is great.

Integration with the EditText is simple as well due to PublishSubject usage:

searchQuery.addTextChangedListener(object: TextWatcher {       override 	fun afterTextChanged(s: Editable?) {                   					publishSubject.onNext(s.toString())         
	}
}

Let’s take a look what Kotlin has to offer here.

In non-blocking world of coroutines there is an entity called “Channel”. You can think of a channel as a non-blocking variant of BlockingQueue.

Instead of blocking put and take methods of BlockingQueue the Channel has send and receive ones:

It appears that we can use this send-receive pair in order to mimik the PublishSubject from Rx world.

Although the RxJava code is rather short and clean we can do even better with Kotlin channels:

In order to send the queries there, we need to offer them to the channel, just as we did with PublishSubject:

searchQuery.addTextChangedListener(object: TextWatcher {    
	override fun afterTextChanged(s: Editable?) {       			        	     broadcast.send(s.toString())    
	}
}

We don’t see here any receive() calls, that’s because we’re using consumeEach extension function for it. What you need to know now is that channels support different capacity, so you can choose how many elements to accept before suspending.

Answering the question of full replacement for RxJava , we need to say 2 things:

  1. Of course channels don’t have such a huge amount of operators. For example you don’t have zip, switchMap and so on. But a) you not necessary need them due to the way you write the code b) if you do need it, it is easy to implement. Here in EPAM we’re working on a library of operators to close this gap, the code is already in review and will be published to github soon.
  2. However, this approach only supports hot streams, and the cold streams are only being discussed nowadays. You already have some kind of cold streams support with CoroutineStart parameter, but it doesn’t cover all the cases. This leads to the fact that channels API is still experimental and is subject to change in future.

The examples here are pretty simple, but you are free to experiment on substituting the current Rx chains you have in your application to a non-blocking paradigm.

Don’t forget to follow me on Twitter in order to stay tuned!