A very common scenario in many kinds of software is when the input data is potentially unlimited and it can appear at arbitrary intervals. The common way of handling such cases is using the Observer pattern in it’s imperative form – callbacks.
But this approach creates what’s commonly called “Callback Hell”. It’s a concept basically identical to the more commonly known “GOTO Hell” as they both mean erratic jumps in flow of control that can be very hard to reason about and work with. When writing an application we need to analyze all the callbacks to be sure e.g. we’re not using a value that can be changed by a callback at a random point of time.
But there exists a declarative approach to solving this problem that let’s us reason about it in a much more predictable and less chaotic fashion –
Introduction to the concept of Streams
Basic concept and terminology
First and foremost – why use
Streams? What kind of advantage do they give us over the standard ways (e.g. callbacks) of handling
Codata (i.e. potentially unlimited data)?
The answer is simple – it abstracts away from the imperative nature of how the data is inputted into the application giving us a declarative way of describing, handling it and hiding details that we don’t care about. This makes reasoning and design much easier.
Of course as with all abstractions we loose some control over how it’s handled that we could use to e.g. implement a more fine-grained control of how the input data is handled that could lead to optimizing how the application works.
Stream consists of at least 2 parts – the
Source and the
Sink. In between them can be any finite number of
Flow elements. The
Sink are special cases of the
Flow element, depending on whenever only the output is open (
Source), or the input (
Here is an explanation of what each element is, using the terminology from
Source– as the name suggests the source of the data, consists of exactly one output
Flow– basically an ordered collection of transformation that act upon the data from the
Source, consists of exactly one output and one input
Sink– the receiver of the data after it’s transformed by the
Flow, consists of exactly one input
You can think about a
Source as a
Publisher and the
Sink as a
Stream composition and abstraction
A very important thing to notice is that we can combine these elements to get another part. E.g. if we combine a
Source and a
Flow to obtain a new
Source or n-
Flows to get another
Flow. Combining a
Sinkwill produce a
Flow as it has one input and one output.
This gives us the ability to compose
Streams just as we compose functions and ease up reasoning and developing the system as we start to operate on independent constructs that we “glue” together later on.
Basically we can put together a few
Streams, put them in a “box” to hide the details we don’t need and use it as an independent element with its own unique functionality. This let’s us reason about them on a higher level, easening up the development and design process.
An example of this would be combining a
Source that emits natural numbers with a
Flow that filters out non-prime numbers. By combining them both we get
Source that outputs primes which we can use in further
Streamcomposition. We don’t care from what
Streams it’s built, just that it is a
Source that emits primes.
A possible problematic scenario is when the
Source produces values too fast for the
Sink to handle and can possibly overwhelm it. As it gets more data that it cannot
process at the moment it constantly buffers it for processing in the future.
The biggest problem with this is that we handle
Codata which is possibly infinite thus the buffer would need to be also be of infinite size (which is of course impossible and eventually the system runs out of memory).
To combat this the
Sink would need to communicate with the
Source to inform it that it should “slow down” with pushing new data until it finishes handling the current batch. This enables a constant size buffer for the
Sink as it will inform the
Source to stop sending new data when it’s not ready.
This kind of bilateral communication is called
Streams that employ this mechanism are often called
Streams in Akka
Akka provides us with it’s own framework for working with
Streams in it’s
Akka Streams API. This text will be introductory as it would be impossible to cover it all in one single post while keeping it readable.
I will cover the basic project/dependency setup and show a simple code example along with details how it works.
I will now present a simple code snippet of how to use
Akka Streams in Scala (The
akka-streams version is
Or presented as a diagram:
In the example above we’ve created the:
ActorMaterializerinstances that we will use to run the
Stream. This is needed because
Akka Streamsis backed by Akka’s Actor model.
Sourcebased of the static number sequence’s iterator
Flowthat filters that only let’s through even numbers
Sinkthat will print out its input to the console using
consoleSinkand running it by using
In the following sections I will try to explain in more detail what each of these elements does, how their API looks like and how to use them.
The ActorMaterializer class instance is needed to materialize a
Flow into a
Processor which represents a processing stage, which is a construct from the Reactive Streams standard, which
Akka Streams implements.
Akka Streams employs back-pressure as described in the
Reactive Streams standard mentioned above.
Sink get eventually transformed into low-level
Reactive Streams constructs via the process of
Source as explained previously represents a
Stream (set of processing steps) that consists of only one open output.
Source takes two type parameters. The first one represents the type of data it emits and the second one is the type of the auxiliary value it can produce when ran/materialized. If we don’t produce any we use the
NotUsed type provided by
The static methods to create a
Source are provided by the
Sourcecompanion object. I’ll provide a quick rundown of them:
fromIterator– This is what we used in the example. It takes a function that returns an
Iterator. It will take elements until iterator is empty or
fromPublisher– Uses an object that provides the
Publisherfunctionality to produce elements
fromFuture– Start a new
Sourcefrom the given
Future. The stream will consist of one element when the
Futureis completed with a successful value.
Graphis of course also a source, this will simply type it as
Source. Graphs in
Akka Streamswon’t be explained in this introduction post.
Source object also provides us with an
apply helper method that simply takes an
Iterable, creating a
Source from it. The main difference from
fromIterator is that every subscriber to the
Source will get an individual flow of elements, always starting from the beginning, regardless when they subscribed to it.
Also these three methods can be pretty useful in many use-cases:
single– As the name suggest it takes a single element and creates a
Sourcewith one element from it.
repeat– Creates a
Sourcethat will continually emit the given element.
Sourcewith no elements, it’s immediately completed for any connected
To run the
Stream we have three convenient methods, two of them let us omit creating a separate
runWith– Convenience method run a
Sink. To run a
Sourcewe need to supply a
Sink, for the
Sinkwe need to supply a
Sourceand for the
Flowwe need to supply two elements – a
Source. Returns a
Futurethat when the stream ends holds the final value of the evaluation or a
runFold– Shortcut method for running the
Sink.fold(...)(...). It return a
Futurethat when the stream ends holds the final value of the evaluation or a
runForeach– Shortcut for running this
Sink.foreach(...). As above it returns a
This should give You a brief outlook on how to create and work with the
Flow represents a
Stream (set of processing steps) with one open input and one open output.
Flow is an ordered chain of transformations to its input, the cumulative effect of which it emits on its output. It takes three type parameters – first for the input data type, the second for the output and the last one for the auxiliary type.
The most straightforward way of creating a
Flow class instance is by using the companion objects
apply method. It creates an “identity”
Flow, i.e. it simply propagates its input to its output, thus takes only one type parameter.
val uselessFlow: Flow[Int, Int, NotUsed] = Flow[Int]
The example above as You may guess is not very usable in real world scenarios. So how do we create something more useful? By method cascading. The
Flowconsists of a finite number of transformations upon the
Stream that we put together using method cascading, as mentioned above:
val flowCascade: Flow[Int, Boolean, NotUsed] = Flow[Int].filter((num) => num > 2).map((num) => num % 2 == 0)
Here we simply discard all input smaller than 3 and on the output check if the number is even. We can chain as many transformations as we want when creating the flow or simply combine two flow into one using its
We can also create a
Source by combining an existing one and a
Flow. This was used in the example code:
val evenNumbersSource: Source[Int, NotUsed] = numberSource.via(isEvenFlow)
The possible transformations are basically the same as with standard Scala collections, thus we’ll omit going into much detail about them. For more detailed information about them please consult the API documentation.
Sink represents a
Stream (set of processing steps) with only one open input.
Sink is the last element of our
Stream puzzle. Basically it’s a subscriber of the data sent/processed by a
Source. Usually it outputs its input to some system IO (TCP port, console, file, etc.) creating the side-effect of our application working.
It’s basically a
Flow which uses an
fold function to run a procedure (function with no return value,
Unit) over its input elements and propagate the auxiliary value (e.g. a
Future that will complete when when it finishes writing the input to a file or console).
Flow the companion object provides method for creating an instance of it. As mentioned above the two main methods of doing so are:
foreach– Invokes the given procedure for each received element.
foreachParallel– same as above, except it invokes the procedure in parallel. Takes the number of concurrent executions to be used at the same time as the argument.
fold– Invokes the given procedure for each received element, propagating the resulting value to the next iteration.
For details about the rest of possible methods for creating and operating on a
Sink please consult the documentation.
To run/materialize the
Sink and the whole
Stream we need to connect it to a
Source as mentioned previously in the
Source section. The
runWithmethod produces a
Future that will be completed when the
Source is empty and the
Sink finished processing the elements it got from it. If the processing fail it will return a
We can also create a
RunnableGraph instance and run it manually using
val streamGraph: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
Every stage of the
Stream produces a value but we only want the one produced by the
Sink thus we use
Keep.right. Then just run it:
val materializedValue: Future[Int] = streamGraph.run()
Doing this manually has a point if we have some reason to persist the values produced by the intermediate processing stages.
This more or less exhausts the basics of what are
Streams and how to use
Akka Streams to utilize them. Of course the example presented here was very simplistic and we only talked about a small part of the capabilities offered us by
An important and very useful feature provided by the library that we didn’t go into in this post is describing the
Stream structure as a graph by using a pretty straight-forward, declarative DSL. This enables us to much more flawlessly transform our design in graph/diagram form into clean and maintainable code.
As this is a pretty big and important topic I didn’t want to shove it in here as it would most probably have a negative effect upon the size and complexity of the post (as it is supposed to be an introduction). Most probably I’ll try to write another post that explains the whole concept of using graphs and the
Graph DSL with more complex examples to show how it eases up developing application based on
If someone is interesting on researching the topic further I’ve added a link to the official documentation. Also the
Quick Start Guide contains a great introduction to using graphs.
An other pretty important concept is controlling the rate at which the elements are processed and how to control it.
Also worth reading is based on what principles and ideas
Akka Streams are designed and built. I’ve added a link to the documentation explaining that in the Links section.
I hope You’ve enjoyed this short introduction to the concept of
Streams and I hope that You’ve get the chance to utilize this knowledge in all kinds of interesting projects!
- Stream Wikipedia
- Codata Wikipedia
- Quick Start Guide
- Working with Flows
- Reactive Streams
- Docs on Stream composition
- Working with Graphs
- Design Principles behind Akka Streams
Do you like this post? Let us know in the comments!