Part III: ThreadPools and EventBus
The time came, this is actually the article I am writing the whole series for: to explain how this new approach works under the hood. If you still don’t know how to USE the coroutines here are the bunch of links to help:
Coroutines - Kotlin Programming Language
Cool. Once you get used to coroutines you may start wondering what enabled Kotlin to provide this feature, how do they work etc. Please note, I cover here only the compile-time part: the runtime deserves a separate article.
First thing we need to understand that no coroutines exist in runtime. The compiler converts the function with a suspend modifier into a function with parameter of Continuation. This interface has two methods:
The type T is a type of a return value of your original suspend function. So, what actually happens is that suspend function is executed on some thread(hold on, we are covering that too), and it’s result is passed into resume function of the continuation the suspend function is called in context of. If the function fails to obtain the result and throws an exception, then resumeWithException is called thus propagating the error to the calling code.
Ok, but what this contination comes from? From coroutine builder of course! Let’s check the code which creates any of a coroutine, say launch:
Here, the builder creates a coroutine — an instance of AbstractCoroutine class which in turn implements Continuation interface. The start method belongs to Job interface. But the definition of startmethod is gravely to find. We may start our journey from the other side. An attentive reader would notice the first argument of the launch function is a CoroutineContext and is set to DefaultDispatcher by default. Dispatchers are classes which manage coroutines execution, so they definitely contribute to understanding what actually happens. Let’s see the DefaultDispatcher declaration:
So it’s CommonPool basically, but javadocs say this is a subject to change. What’s CommonPool?
CommonPool is a coroutine dispatcher which uses ForkJoinPool as an ExecutorService implementation. Yep, that’s correct, in the end all your coroutines lamdas are just Runnables submitted to an Executor with a portion of sophisticated translations. But the devil is in details.
According to my twitter poll I need to explain a little bit what FJP is :)
First of all, ForkJoinPool is a state-of-the-art executor invented to use for Java 8 Parallel streams. The original task was to provide effective parallelism for handling stream api which is basically forking threads to process portions of data and joining them after all data was processed. Simplifying things, suppose you have:
The sum of such stream will not be calculated in a single thread; instead ForkJoinPool is going to recursively split the range into parts (500,000 and 500,000; then each part to 250,000 and 250,000 and so on) , calculate the sum of each part and the join the results into a single sum. The visualization of such process can be seen here:
The effectiveness of FJP comes from the algorithm of work stealing — when a particular thread runs out of tasks to execute it goes to the queues of other threads in the pool and steam their tasks. For more understanding please refer to this video of Aleksei Shipilevor slides. Unfortunately it’s in Russian but the diagrams are useful.
Ok, great, we got what executes our coroutines! But how do they end up there?
The actual submission happens inside CommonPool#dispatchmethod:
The dispatch method is called from resume(value: T) method of a DispatchedContinuation. Hey, it sounds familiar! We remember Continuation is an interface implemented by AbstractCoroutine. But how are they connected?
The trick is inside CoroutineDispatcher class. It implements the interface ContinuationInterceptor with the following code:
You see? You submit a simple block in the coroutine builder. You are not obliged to implement none of the interfaces you don’t want to give a damn about. You just need to submit a block. The coroutines library takes the rest on itself. It intercepts the execution, replaces the continuation with a DispatchedContinuationwhich knows how to be dispatched and submit it to a pool which guarantees to execute your code in a most efficient way possible.
At this point the only thing we need to figure out how the dispatch is called from the start method. Let’s fill the gap. Resumemethod is called from a block’s extension function startCoroutine:
startCoroutine in turn is invoked by `()` operator of CoroutineStartenum. Your coroutine builder accepts it as a second parameter and by default is set to CoroutineStart.DEFAULT. So, that’s it!
This is actually why I admire the coroutines approach: it’s not only the fancy syntax, but a genius implementation. Good job, Jetbrains!