Reactive State Mutations via CQRS

October 21, 2018

State-state-state. It surrounds us. Think hard enough and everything around will become either a state or a state mutation. The current time is a state and each passing second is a state mutation. A tree can be represented by a state and each drop of rain mutates it, increasing the water supply level and applying the pressure on leaves.

The concept is not new, but sometimes it becomes too hard to manage it. Even in software development, which was basically created to represent the world around us in strict terms.

Real Life Example

There is a brand-new project for book recommendations. The very first step is getting a list of books from a backend. This is enough forever and ever. Sounds good.

interface BookService {
    fun getBooks(): Single<List<Book>>
}

Suddenly — because Agile — we need to save books on the backend. All right.

interface BookService {
    fun getBooks(): Single<List<Book>>
    fun createBook(book: Book): Completable
}

New features, new screens! Unfortunately, it means that the books fetching progress should be preserved across screens. A property should do the trick…

interface BookService {
    fun getBooks(): Single<List<Book>>
    val getBooksProgress: Observable<Boolean>

    fun createBook(book: Book): Completable
}

Damn, the QA team brought up an issue at the very last minute before the release. Fetching books might fail, and we need to show it on all screens to give the ability to re-fetch them. Just a sec, another property and here we go.

interface BookService {
    fun getBooks(): Single<List<Book>>
    val getBooksProgress: Observable<Boolean>
    val getBooksFailure: Observable<Boolean>

    fun createBook(book: Book): Completable
}

The project hit the production! It works all right, but the very first customer has a complaint that the book the one created has a wrong name and there is no way to delete it. Sounds like creating a book, but some would say it is the reverse

interface BookService {
    fun getBooks(): Single<List<Book>>
    val getBooksProgress: Observable<Boolean>
    val getBooksFailure: Observable<Boolean>

    fun createBook(book: Book): Completable
    fun deleteBook(book: Book): Completable
}

And then someone brings up that the BookService should cache books…

This is Bad

The BookService is far from perfect.

BookService clients gradually became more and more complicated. Instead of a comfy stateless life they are forced to remember that creating a book should trigger re-fetching books from a backend. At the same time, this refresh operation should be done only on BookCreateResult.Success and not on BookCreateResult.Failure. The same goes to the delete operation. Most likely this logic will be distributed and copy-pasted across the client code.

Another distinct feature is how easily the BookService transformed from being stateless to be stateful. Essentially a pure getBooks produced getBooksProgress and getBooksFailure side effects. It is understandable — requirements have been changed, but the mistake is still there. The change in nature hadn’t been followed by the change in design. The burden of complications was transitioned to clients.

I’ve spared the details of the implementation since the resulting API is bad enough. Under the hood the BookService probably is juggling multiple Subject or Relay in combination with onNext. Forget about proper thread-safety — at this point it is on clients shoulders as well. The requirement to cache data (at least in memory) will complicate things even more.

Do not forget that the evolution above seems to be rapid but in reality these changes are applied gradually. Since no one has time to do a proper refactoring, the BookService has a pretty good chance to stay this way forever. Just like dinosaurs. Until the meteorite nuked them. You know how it goes.

CQRS

Each time I think there is something smart and fresh, a careful research reveals that the concept was there for years. CQRS is one of them.

CQRS stands for Command Query Responsibility Segregation. It is a variety of CQS — Command-Query Separation. Usually, it is connected to Event Sourcing, but it is a different story.

📖 This article will narrow down the concept. For further explanation I suggest reading the Martin Fowler peace and the Microsoft documentation.

Basically saying, CQRS replaces CRUD-like interactions with two separate entities.

This brings a couple of benefits on the table.

The Grand Refactoring

Let’s take CQRS, mix it with the reactive approach and apply it to the BookService.

API

First of all, we now know that the BookService is not stateless but stateful. The clear state representation will make the API much more explicit.

sealed class State {
    object Progress : State()
    data class Content(val books: List<Book>) : State()
    object Error : State()
}

val state: Observable<State>

This is a major step on the right course.

It is clear that the State class represents a CQRS Query. What about Commands?

sealed class Command {
    object Refresh : Command()
    data class Create(val book: Book) : Command()
    data class Delete(val book: Book) : Command()
}

val command: Consumer<Command>

Notice the Refresh command which explicitly declares the re-fetch action instead of an implicit getBooks behavior.

This is a bit idealistic API though. In the future we might want to receive a command result outside of the State — which will become handy for error handling. Potentially it can be solved with a syntax sugar.

interface BookService {

    enum class ResultCreate { Success, Failure }
    enum class ResultDelete { Success, Failure }

    val refresh: Action

    fun create(book: Book): Single<ResultCreate>
    fun delete(book: Book): Single<ResultDelete>

    class Impl : BookService {

        private sealed class Command {
            object Refresh : Command()
            data class Create(val book: Book) : Command()
            data class Delete(val book: Book) : Command()
        }
    }
}

Implementation

First of all, we’ll need stateless commands stream and stateful state one.

class Impl(api: BooksApi) : BookService {
    override val state = BehaviorRelay.create<State>().toSerialized()
    override val command = PublishRelay.create<Command>().toSerialized()
}

Next, we are going to react to commands and produce states based on results. The refresh command is pretty straightforward.

val refreshState = command
    .ofType<Command.Refresh>()
    .map { State.Progress }

val refreshResultState = command
    .ofType<Command.Refresh>()
    .switchMap { api.getBooks() }
    .map {
        when (it) {
            is BooksResponse.Success -> State.Content(it.books)
            is BooksResponse.Failure -> State.Error
        }
    }

Create and delete commands are a bit more tricky since the implementation depends on our needs.

In this implementation I’m gonna go with the first one.

val createResultCommand = command
    .ofType<Command.Create>()
    .switchMap { api.createBook(it.book) }
    .switchMapSingle {
        when (it) {
            is BookCreateResponse.Success -> Single.just(Command.Refresh)
            is BookCreateResponse.Failure -> Single.never()
        }
    }

val deleteResultCommand = TODO("Basically the same as the create one.")

And now it is time to combine commands and states.

disposable += Observable
    .merge(refreshState, refreshResultState)
    .subscribe(state)

disposable += Observable
    .merge(createResultCommand, deleteResultCommand)
    .subscribe(command)

Done!

interface BookService {

    sealed class State {
        object Progress : State()
        data class Content(val books: List<Book>) : State()
        object Error : State()
    }

    sealed class Command {
        object Refresh : Command()
        data class Create(val book: Book) : Command()
        data class Delete(val book: Book) : Command()
    }

    val state: Observable<State>
    val command: Consumer<Command>

    class Impl(disposable: CompositeDisposable, api: BooksApi) : BookService {

        override val state = BehaviorRelay.create<State>().toSerialized()
        override val command = PublishRelay.create<Command>().toSerialized()

        init {
            val refreshState = command
                .ofType<Command.Refresh>()
                .map { State.Progress }

            val refreshResultState = command
                .ofType<Command.Refresh>()
                .switchMap { api.getBooks() }
                .map {
                    when (it) {
                        is BooksResponse.Success -> State.Content(it.books)
                        is BooksResponse.Failure -> State.Error
                    }
                }

            val createResultCommand = command
                .ofType<Command.Create>()
                .switchMap { api.createBook(it.book) }
                .switchMapSingle {
                    when (it) {
                        is BookCreateResponse.Success -> Single.just(Command.Refresh)
                        is BookCreateResponse.Failure -> Single.never()
                    }
                }

            val deleteResultCommand = TODO("Basically the same as the create one.")

            disposable += Observable
                .merge(refreshState, refreshResultState)
                .subscribe(state)

            disposable += Observable
                .merge(createResultCommand, deleteResultCommand)
                .subscribe(command)
        }
    }
}

Lessons Learned

CQRS-like reactive APIs for state mutations can be very useful.

It isn’t a silver bullet, but I can definetly suggest it when dealing with the state.


Thanks to Artem Zinnatullin for the review!