Why Monix

In this short blog post – in just 10 minutes or less –  I’m going to attempt to present what Monix library is and try to convince you why you really need to get to know it.

Formerly known as Monifu, Monix is a library for asynchronous programming in Scala and Scala.js

It contains several useful abstractions and sometimes can be found superior to its vanilla Scala or Akka counterparts. But this post is definitely not going to be about knocking the use of Akka actors or streams. Rather, it’s about another tool in the Scala programmer’s box. I am going to be presenting some of the abstractions that Monix gives and conclude why they are invaluable.

Task vs Future

Monix comes with Task abstraction that can be seen as a replacement for Future. However, there are some major differences. Future was designed for ongoing computation which is possibly finished, Task is a description of the computation.

So when using Task as a drop in replacement for Future, one has to trigger its execution to start the computation. This means that the following code will not print anything:

To start execution, you have to do two things:

  • get an instance of monix.execution.Scheduler;
  • call one of runAsyncrunOnCompleterunSyncMaybe or foreach to execute the computation.

Scheduler is an ExecutionContext which in addition allows scheduling of single or periodical execution of Runnables.

CancelableFuture allows the programmer to attempt the cancellation of some of the computation before it ends, when it’s needed to be stopped. This is the second major difference I’ve noticed and it’s a really nice improvement in my opinion.

Monix allows a high level of control over Task execution. You can choose:

  • whether the task body is executed on each invocation of runAsync (Task.eval) or
  • whether the result should be memoized (Task.evalOnce or memoize) or
  • whether the execution should always execute in a separate thread (Task.fork).

On top of that, Taskhas an API to choose the scheduler to run on.

A short example

Let me show you just one, contrived example, which illustrates some of the error handling and scheduler choice. The code below gets Picture and should perform an analysis which  will tell if a given picture was painted by Picasso or not.

Firstly, it runs an analysis locally, but  if the local algorithm cannot decide, a remote analysis service is used. Finally, the original picture and analysis results are stored in a database. Of course, the services are just mocks, so the results are hardcoded for sake of simplicity.

Link to file in GH repo

I’ve already mentioned that Schedulers are like ExecutionContext with an ability to schedule the execution on them. But there is one more important trait a Scheduler has. It’s the ExecutionModel which describes:

  • whether tasks should be executed in separate logical threads or
  • if a trampoline should be used, or
  • if a batch of tasks should be executed synchronously and then a new thread should be used.

Just a few more words about Task.

It comes with plenty of combinators and constructors, counterparts for both Future and Scalaz Task combinators. I will mention just a few that grabbed my attention:

  • onErrorRestart – restart until completes successfully
  • restartUntil – restart until result fulfills predicate
  • timeout – adds timeout to existing Task, Monix gives you also FutureUtils.timeout to timeout Scala Futures.

To recap: Monix Task gives users a huge dose of control over asynchronous computation execution using a rich set of builders and combinators, the ability to tune schedulers and execution models and also by out-of-the-box possibilities to timeout or cancel computations. In my opinion, these are huge plusses.

There are two artifacts available: monix-cats and monix-scalaz72 with a code that turns Task into an instance of Monad from Cats or Scalaz

Coeval

Coeval is synchronous counterpart for Task. It gives you control over evaluation, side effects and error handling. All at once, but for synchronous computations.

Like Task it doesn’t run when defined but only when value or run is called. You can define Coeval to execute on each value call with

or

to memoize result (like lazy val).

This level of control allows you to remove stack overflows caused by tail recursion. OK, @tailrec can do that also but only for simple tail recursion that involve one function.

Coeval has many builders, combinators (zips, mapflatMapsequencetraverserestartUntil, error handing and others) and transformers (from/to TaskAttempt and Try).

Again, a bit of a made-up example, but this time the user will be prompted to enter a number. The user enters an invalid line 3 times, another prompt will be used and in the case of any error program, it will fall back to 42.

Code available in repo

Cats and Scalaz integrations contain Monad instances of Coeval (and the other tools previously mentioned here).

MVar

Before I write a really short part about MVar: I have never written “real” code that used this or analogical abstraction.

The thread can put or take something from MVar but both operations asynchronously block, what is expressed in return types of Task[Unit] and Task[A] respectively. When the thread takes something from empty MVar it gets Task[A] that will complete when some thread puts a value to it. putbehaves similarly, returns the Task which completes only after MVar put has been executed (which requires access to the empty MVar).

Here’s the obligatory example, this time a dummy producer-consumer problem.

Here is a more verbose version with some printlns to show the lazy behaviour of Task as well.

MVar is inspired by Haskell’s MVar but original blocks thread instead of returning asynchronous computation abstraction (Task). That’s because threads in Haskell are not so scarce.

Back-pressured streams

The last part of the library I would like to mention in this article is Monix’s take on reactive streams.

I will describe it briefly, and only from the perspective of a user, not an implementer of new Observables, Observers nor Consumers.

  • Observable is equivalent to org.reactivestreams.Publisher or akka-streams Source.
  • Observer is like org.reactivestreams.
  • Subscriber andConsumer, which specifies how to consume an Observable, resembles Sink.

Observable has a wealth of combinators: for grouping, filtering, mapping, taking, sliding, giving control when the upstream or downstream are slow, attaching callbacks for upstream events such as stream end, error handling and more. I’m not going to enumerate these but I can assure you that the library author has put an extraordinary amount of effort into those matters!

Provided set of Consumers was sufficient, I was able to find a replacement for each Sink that I have used in a real project based on akka-streams.

Next, I will reveal what unfolded when I took an akka-stream used in a recent project I was working on and rewrote it using Monix. I have changed the actual business details to dummy ones

First of all, a natural language description of the processing, then the akka-streams implementation and lastly  will be the Monix implementation.

  1. The starting point is the equivalent of a reactive streams publisher of Kafka “committable messages” (a combination of payload and the means to commit offset).
  2. The message is matched and acted upon. This part is very abstract in the following code.
  3. To avoid excessive offsets commits (which are expensive I/O), offsets are committed only once per 100 messages or 1 second, when traffic is lower.
  4. The stream then goes through an injectable trigger which can be used to cancel processing.
  5. There is no element or statistic about the stream that we are interested in, so we ignore what comes through the cancel switch.

The akka-streams original:

Monix rewrite:

A Full example s available in the GH repo. The API looks similar to some extent. The main difference is that Monix streams are run on Scheduler while akka-streams requires Materializer, which most probably is ActorMaterializer, which in turn requires ActorSystemScheduler is much more lightweight than ActorSystem.

I have also written and ran some JMH benchmarks for totally made-up set of operations although I will not summarize them as a comparison to avoid possibly false conclusions. The Monix streams are undoubtedly very efficient.

As with Task and Coeval there is a glue library that provides Cats or Scalaz Monads instances for Observables.

So why should you get to know Monix?

Simply because it gives you a very good set of tools for asynchronous programming. There’s Task as an alternative to Future, along with a lot of combinators and helpers to help write your asynchronous code.

It provides a well-engineered and efficient implementation of reactive streams. MVar type could  also sometimes be the simplest solution for synchronization problems.

There are also several miscellaneous classes I have not written about here, which could also be handy, such as Atomic which  could replace AtomicInt and such like, or CircutBreaker, or Future Utils.

Links

Do you like this post? Want to stay updated? Follow us on Twitter.

Author profile

I'm JavaScript and frontend developer, after some time as Full-Stack I decided to focus more on the front part of projects because it's what I enjoy most.