Introduction to Streams in Akka
A very common scenario in many kinds of software is when the input data is potentially unlimited and it can appear at arbitrary intervals. The common way of handling such cases is using the Observer pattern in its imperative form – callbacks.
But this approach creates what’s commonly called “Callback Hell”. It’s a concept basically identical to the more commonly known “GOTO Hell” as they both mean erratic jumps in the flow of control that can be very hard to reason about and work with. When writing an application we need to analyze all the callbacks to be sure e.g. we’re not using a value that can be changed by a callback at a random point in time.
But there exists a declarative approach to solving this problem that lets us reason about it in a much more predictable and less chaotic fashion – Streams
.
Introduction to the concept of Streams
Basic concept and terminology
First and foremost – why use Streams
? What kind of advantage do they give us over the standard ways (e.g. callbacks) of handling Codata
(i.e. potentially unlimited data)?
The answer is simple – it abstracts away from the imperative nature of how the data is inputted into the application giving us a declarative way of describing, handling it and hiding details that we don’t care about. This makes reasoning and design much easier.
Of course as with all abstractions we loose some control over how it’s handled that we could use to e.g. implement a more fine-grained control of how the input data is handled that could lead to optimizing how the application works.
Any non-cyclical Stream
consists of at least 2 parts – the Source
and the Sink
. In between them can be any finite number of Flow
elements. The Source
and Sink
are special cases of the Flow
element, depending on whenever only the output is open (Source
), or the input (Sink
).
Here is an explanation of what each element is, using the terminology from Akka Streams
:
Source
– as the name suggests the source of the data, consists of exactly one outputFlow
– basically an ordered collection of transformation that act upon the data from theSource
, consists of exactly one output and one inputSink
– the receiver of the data after it’s transformed by theFlow
, consists of exactly one input
You can think about a Source
as a Publisher
and the Sink
as a Subscriber
.
Stream composition and abstraction
A very important thing to notice is that we can combine these elements to get another part. E.g. if we combine a Source
and a Flow
to obtain a new Source
or n-Flow
s to get another Flow
. Combining a Source
and Sink
will produce a Flow
as it has one input and one output.
This gives us the ability to compose Streams
just as we compose functions and ease up reasoning and developing the system as we start to operate on independent constructs that we “glue” together later on.
Basically, we can put together a few Streams
, put them in a “box” to hide the details we don’t need and use it as an independent element with its own unique functionality. This lets us reason about them on a higher level, easing up the development and design process.
An example of this would be combining a Source
that emits natural numbers with a Flow
that filters out non-prime numbers. By combining them both we get Source
that outputs primes which we can use in further Stream
composition. We don’t care from what Streams
it’s built, just that it is a Source
that emits primes.
Back Pressure
A possible problematic scenario is when the Source
produces values too fast for the Sink
to handle and can possibly overwhelm it. As it gets more data that it cannot
process at the moment it constantly buffers it for processing in the future.
The biggest problem with this is that we handle Codata
which is possibly infinite thus the buffer would need to also be of infinite size (which is, of course, impossible and eventually the system runs out of memory).
To combat this the Sink
would need to communicate with the Source
to inform it that it should “slow down” with pushing new data until it finishes handling the current batch. This enables a constant size buffer for the Sink
as it will inform the Source
to stop sending new data when it’s not ready.
This kind of bilateral communication is called Back Pressure
. Streams
that employ this mechanism are often called Reactive Streams
.
Streams in Akka
Introduction
Akka provides us with its own framework for working with Streams
in it’s Akka Streams API
. This text will be introductory as it would be impossible to cover it all in one single post while keeping it readable.
I will cover the basic project/dependency setup and show a simple code example along with details on how it works.
Example
I will now present a simple code snippet of how to use Akka Streams
in Scala (The akka-streams
version is 2.4.17
):
Or presented as a diagram:
In the example above we’ve created the:
ActorSystem
andActorMaterializer
instances that we will use to run theStream
. This is needed becauseAkka Streams
is backed by Akka’s Actor model.Source
based on the static number sequence’s iteratorFlow
that filters that only lets through even numbersSink
that will print out its input to the console usingprintln
- Complete
Stream
by connectingevenNumbers
withconsoleSink
and running it by usingrunWith
In the following sections, I will try to explain in more detail what each of these elements does, how their API looks like, and how to use them.
Actor Materializer
The ActorMaterializer class instance is needed to materialize a Flow
into a Processor
which represents a processing stage, which is a construct from the Reactive Streams standard, which Akka Streams
implements.
In fact Akka Streams
employs back-pressure as described in the Reactive Streams
standard mentioned above. Source
, Flow
, Sink
get eventually transformed into low-level Reactive Streams
constructs via the process of materialization
.
Source
Source, as explained previously, represents a Stream
(set of processing steps) that consists of only one open output.
Source
takes two type parameters. The first one represents the type of data it emits and the second one is the type of auxiliary value it can produce when ran/materialized. If we don’t produce any we use the NotUsed
type provided by Akka
.
The static methods to create a Source
are provided by the Source
companion object. I’ll provide a quick rundown of them:
fromIterator
– This is what we used in the example. It takes a function that returns anIterator
. It will take elements until the iterator is empty ornext()
fails.fromPublisher
– Uses an object that provides thePublisher
functionality to produce elementsfromFuture
– Start a newSource
from the givenFuture
. The stream will consist of one element when theFuture
is completed with a successful value.fromGraph
– ASource
shapedGraph
is of course also a source, this will simply type it asSource
. Graphs inAkka Streams
won’t be explained in this introduction post.
The Source
object also provides us with an apply
helper method that simply takes an Iterable
, creating a Source
from it. The main difference from fromIterator
is that every subscriber to the Source
will get an individual flow of elements, always starting from the beginning, regardless of when they subscribed to it.
Also, these three methods can be pretty useful in many use-cases:
single
– As the name suggests it takes a single element and creates aSource
with one element from it.repeat
– Creates aSource
that will continually emit the given element.empty
– ASource
with no elements, it’s immediately completed for any connectedSink
.
To run the Stream
we have three convenient methods, two of them let us omit to create a separate Sink
instance:
runWith
– Convenience method run aSource
,Flow
orSink
. To run aSource
we need to supply aSink
, for theSink
we need to supply aSource
and for theFlow
we need to supply two elements – aSink
andSource
. Returns aFuture
that when the stream ends holds the final value of the evaluation or aFailure
.runFold
– Shortcut method for running theSource
with aSink.fold(...)(...)
. It returns aFuture
that when the stream ends holds the final value of the evaluation or aFailure
.runForeach
– Shortcut for running thisSource
with aSink.foreach(...)
. As above it returns aFuture
.
This should give You a brief outlook on how to create and work with the Source
class.
Flow
Flow represents a Stream
(set of processing steps) with one open input and one open output.
Basically a Flow
is an ordered chain of transformations to its input, the cumulative effect of which it emits on its output. It takes three type parameters – first for the input data type, the second for the output, and the last one for the auxiliary type.
The most straightforward way of creating a Flow
class instance is by using the companion objects apply
method. It creates an “identity” Flow
, i.e. it simply propagates its input to its output, thus takes only one type parameter.
val uselessFlow: Flow[Int, Int, NotUsed] = Flow[Int]
The example above as You may guess is not very usable in real-world scenarios. So how do we create something more useful? By method cascading. The Flow
consists of a finite number of transformations upon the Stream
that we put together using method cascading, as mentioned above:
val flowCascade: Flow[Int, Boolean, NotUsed] = Flow[Int].filter((num) => num > 2).map((num) => num % 2 == 0)
Here we simply discard all input smaller than 3 and on the output check if the number is even. We can chain as many transformations as we want when creating the flow or simply combine two flow into one using its join
method.
We can also create a Source
by combining an existing one and a Flow
. This was used in the example code:
val evenNumbersSource: Source[Int, NotUsed] = numberSource.via(isEvenFlow)
The possible transformations are basically the same as with standard Scala collections, thus we’ll omit to go into much detail about them. For more detailed information about them please consult the API documentation.
Sink
Sink represents a Stream
(set of processing steps) with only one open input.
The Sink
is the last element of our Stream
puzzle. Basically, it’s a subscriber of the data sent/processed by a Source
. Usually, it outputs its input to some system IO (TCP port, console, file, etc.) creating the side-effect of our application working.
It’s basically a Flow
which uses an foreach
or fold
function to run a procedure (function with no return value, Unit
) over its input elements and propagate the auxiliary value (e.g. a Future
that will complete when it finishes writing the input to a file or console).
As with Source
and Flow
the companion object provides a method for creating an instance of it. As mentioned above the two main methods of doing so are:
foreach
– Invokes the given procedure for each received element.foreachParallel
– same as above, except it, invokes the procedure in parallel. Takes the number of concurrent executions to be used at the same time as the argument.fold
– Invokes the given procedure for each received element, propagating the resulting value to the next iteration.
For details about the rest of the possible methods for creating and operating on a Sink
please consult the documentation.
To run/materialize the Sink
and the whole Stream
we need to connect it to a Source
as mentioned previously in the Source
section. The runWith
method produces a Future
that will be completed when the Source
is empty and the Sink
finished processing the elements it got from it. If the processing fails it will return a Failure
.
We can also create a RunnableGraph
instance and run it manually using toMat
(or viaMat
), i.e.:
val streamGraph: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
Every stage of the Stream
produces a value but we only want the one produced by the Sink
thus we use Keep.right
. Then just run it:
val materializedValue: Future[Int] = streamGraph.run()
Doing this manually has a point if we have some reason to persist the values produced by the intermediate processing stages.
Summary
This more or less exhausts the basics of what are Streams
and how to use Akka Streams
to utilize them. Of course, the example presented here was very simplistic and we only talked about a small part of the capabilities offered us by Akka Streams
.
An important and very useful feature provided by the library that we didn’t go into in this post is describing the Stream
structure as a graph by using a pretty straight-forward, declarative DSL. This enables us to much more flawlessly transform our design in graph/diagram form into clean and maintainable code.
As this is a pretty big and important topic I didn’t want to shove it in here as it would most probably have a negative effect upon the size and complexity of the post (as it is supposed to be an introduction). Most probably I’ll try to write another post that explains the whole concept of using graphs and the Graph DSL
with more complex examples to show how it eases up developing application based on Stream
processing.
If someone is interesting in researching the topic further I’ve added a link to the official documentation. Also the Quick Start Guide
contains a great introduction to using graphs.
Another pretty important concept is controlling the rate at which the elements are processed and how to control them.
Also worth reading is based on what principles and ideas Akka Streams
are designed and built. I’ve added a link to the documentation explaining that in the Links section.
I hope You’ve enjoyed this short introduction to the concept of Streams
and I hope that You’ve got the chance to utilize this knowledge in all kinds of interesting projects!
Links
- Stream Wikipedia
- Codata Wikipedia
- Quick Start Guide
- Working with Flows
- Reactive Streams
- Docs on Stream composition
- Working with Graphs
- Design Principles behind Akka Streams
Do you like this post? Let us know in the comments!
Read also