Reactive Pipelines in Action

November 7, 2018

Human beings are reactive by nature — fortunately or not. The reason is mostly physiology. The dopamine hormone helps us to feel comfortable and secure while we do familiar things. Eating a sandwich sounds and feels far better than gardening, doesn’t it? Essentially it is a fight between psychology (the mind, proactive actions) and physiology (the body, reactive actions).

The same thing happens in CS. Object-oriented programming (OOP) is the king of the hill and functional programming (FP) is on the outskirts. Well, it is this way because the OOP is more comfortable for the majority. We, as a society, made it this way. The educational system includes a mandatory OOP course and rarely there is an FP one. And then there is reactive programming which forms a wild beast called functional-reactive programming (FRP)…

Taking everything above into the account makes it easy… to give up. Is it even worth it to maintain a consistent reactive system? Let’s see how it might look and decide.


Don’t worry, I’m not going to explain FP and FRP all over again. We’ll need only two terms.

The success of our enterprise (not to be confused with the USS one) depends on providing enough abstractions to connect producers and consumers, forming Pipelines.

📖 Suggestions about abstractions are available in Reactive Abstractions in Android World.

Honestly saying, I find reactive pipelines beautiful. There is something deeply satisfying in understanding that a complete flow can be tracked via a single stream from a producer to a consumer.

These actions are done without ad-hoc solutions and concepts. The flow is consistent.

The formula has infinite depth in its efficacy and application but it’s staggeringly simple and completely consistent.

Revolver (2005)


📖 We’ll use Data-Domain-Presentation multitier architecture. Please refer to Martin Fowler for details.


Network-related data sources (especially on Android) most likely use Retrofit or something similar. However, often there is an in-house handling for common tasks.

Storage-related data sources are easier.

interface BooksStorageSource {
    fun getBooksPageSize(): Single<Int>
    fun setBooksPageSize(size: Int): Completable

    class Impl(
        private val context: AndroidContext,
        private val ioScheduler: Scheduler
    ) { /* ... */ }

Notice that setBooksPageSize is a Completable and not a Consumer. A Consumer makes more sense as an interaction — it is an input after all. In real life it needs to be async to not block the caller (most likely UI) thread. There are use cases when it is necessary to ensure that changes were applied before proceeding with another action. A classic example is a sign out procedure — everything needs to be cleaned up before a different account is being signed in. There are no such guarantees with a Consumer.

Both sources receive a worker Scheduler as a constructor argument. It is done this way for two reasons.


We’ll use a stateful example, but essentially this level is a mediator between the data and the presentation. Business-related decisions are done here.

📖 Suggestions about state mutations are available in Reactive State Mutations via CQRS.

interface BooksService {

    sealed class State {
        object Progress : State()
        data class Content(val books: List<Book>) : State()
        object Error : State()

    enum class Command {

    val state: Observable<State>
    val command: Consumer<Command>

    class Impl(
        private val networkSource: BooksNetworkSource,
        private val storageSource: BooksStorageSource
    ): BooksService { /* ... */ }

Notice that the service does not receive a Scheduler.


MVWhatever will do the trick, but I highly suggest giving MVI a shot.

I see presentation components as consumers, but it will be ignorant to forget that user actions are actually producers. This is not a bad thing because embracing the reactive approach makes this a benefit.

interface View {
    enum class State { Progress, Content, Error }

    val stateSwitcher: ViewAnimator<State>
    val refreshButton: Button
    val errorRefreshButton: Button

    val books: Consumer<Book>

class ViewModel(
    private val booksService: BooksService,
    private val mainScheduler: Scheduler
) {
    private val disposable = CompositeDisposable()

    fun bind(view: View) {
        disposable += Observable
            .map { BooksService.Command.Refresh }

        disposable += booksService.state
            .map {
                when (it) {
                    is BooksService.State.Progress -> View.State.Progress
                    is BooksService.State.Content -> View.State.Content
                    is BooksService.State.Error -> View.State.Error

        disposable += booksService.state
            .map { it.books }

    fun unbind() = disposable.clear()

class ViewImpl(view: AndroidView) : View { /* ... */ }

Levels Combined

What have we got as a result?

Is It Worth It?

I cannot say for everyone, but my answer is a definitive Yes.

Thinking about reproducing the same interactions as above using callbacks and listeners cause a headache. Replicating a reactive feedback without reactive approach most likely will lead to an unscalable mess. Nobody on a team will eventually know what is going on.

There is a number of concepts FRP brings on a table which are hard to beat or just replace.

It is a no-brainer. Embracing the concept and unifying the codebase behind it brings benefits on a conceptual level in a long run. Isn’t that what we want as developers and human beings?

Be proactive about being reactive!

Thanks to Artem Zinnatullin for the review!