Custom GraphStage in Akka Streams
In this post I will try to present what is
GraphStage in Akka Streams. My goal is to describe when it’s useful and how to use it correctly. I will start with outlining key terminology, then proceed with simple example and after that the main use case will be covered. For the latter the most upvoted issue of akka-http will serve.
At the end, I will show how to properly test
GraphStage. Besides of learning API you’ll gain deeper understanding how backpressure works.
When reading various materials about Akka Streams I see that common understanding of some terms are taken by granted. I think it’s not always the case, therefore I want to build up our terminology step by step. I just assume that you are already familiar with concepts of
Sink as they were explained in previous post.
Downstream and upstream
Let me explain
upstream with the following graph:
upstream are relative so you need to know what is your reference point. Let’s say we are talking about
flow1 – then its
source while its
flow2 and anything “on right hand side” of
Basically, it’s about direction in which data flows – places where messages are seen earlier are called
upstream and places where they’re seen later are called
downstream. That being said we can proceed to fundamental fact:
- Upstream may
faildownstream. When upstream
completesit means that it has no more data to send and by emitting such message informs about this fact its downstream. Conversely, when upstream
failsit means that it was forced to abort its processing due to an error and propagates this error to its downstream.
- Downstream may
cancelupstream. Cancellation means that downstream is no longer interested in incoming data and by emitting such cancellation asks upstream not to send more data.
ProcessingStage is a common name for all building blocks that build up a graph.
Shape is determined by number of
inlets (input ports) and
outlets(output ports). Besides of that it reveals nothing about processing semantics. Let’s take a look at two examplary shapes:
Shape1 described above is called
FlowShape. It has one
inlet and one
filter) are of shape
Shape in general may have arbitrary number of
outlets, so the following one is an example of different
Shape2 from above has two
inlets and one
What is GraphStage
You can think of
GraphStage as an implementation block behind any
RunnableGraph. Such broad definition implies that any
Flow may be implemented as
GraphStage. We will try to do that in next section.
Example of simple GraphStage
Let’s try to implement
GraphStage responsible for filtering. Here’s a minimal code to have it done (full code with all imports and implicit may be found in accompanying repo for this and all following snippets):
Above implementation may be run with such code (full code):
OK, you pasted some code, it works as expected, now let’s analyze how we actually achieved that.
GraphStage boils down to implementing two methods. The first one is:
def shape: S <: Shape– each element of execution graph is of defined
Another method is:
def createLogic(inheritedAttributes: Attributes): GraphStageLogic
Here we implement the logic of
GraphStage. You should put all state variables within
GraphStageLogic as this is the only part that is not reused between different materializations of stream. Our simple
FilterStage is stateless – its processing logic depend solely on input elements. Therefore all we see in its
GraphStageLogic are two calls of
There are 2 types of handlers:
Handler for inlets. You have to implement one method:
def onPush(): Unit. There are other methods you may override, but let’s ignore them for a while.
onPush is called when new element has been received on inlet. Usually implemented in terms of
grab(inlet) returns element recently pushed to inlet.
push(el, outlet) pushes element to given outlet.
Handler for outlets. You have to implement one method:
def onPull(): Unit. There is another method you may override, but we’ll leave it out for a while.
onPull is called when demand for element has been signalled for this outlet. Usually implemented in terms of
pull which allows to propagate demand upstream.
All inlets and outlets may be represented as Final State Machines. Consequently some operations are not valid in some states. When you call operation which is invalid for current state an exception will be thrown.
More information about it and detailed FMS diagrams may be found at Port states section of Akka documentation. To understand why API looks like this and consequently to be able to use it in a correct way we need to think about its relation to backpressure.
Excerpted from Akka documentation:
Backpressure, in general, is a method where a consumer is able to regulate the rate of incoming data from a corresponding producer.
It nicely sums up the ultimate goal of backpressure. Such statement also suggest that consumer needs to have some mean to “regulate the rate of incoming data”. It can does so by sending control messages up to the producer by special channel. Now you should start to feel why API needs to have
Those methods refer not to data elements themselves but to this special channel that is used to propagate demand up to producer. Therefore we can say that demand flows from downstream to upstream as opposed to data which flows from upstream to downstream. It can be illustrated as a diagram (taken from akka docs, original):
GraphStageLogic callbacks are thread safe the same way as
receive method is thread safe. That means they are never called concurrently and they can safely modify state of
GraphStageLogic. On the other hand similar restriction to
Actor one applies –
Future’s callbacks should not close over state.
Real world Filter GraphStage
At the beginning I wrote that any node of execution graph may be implemented as
GraphStage. That’s actually an understatement, because actually a lots of built-in combinators are implemented as
GraphStage. (I lack knowledge here to say if it’s majority of built-in combinators or even all). Let’s take a look at
Well, it differs a little bit from our naive implemenation but the essence remains the same. The main difference here is that Akka’s implementation takes care of applying proper Supervision policy in case of failure. It’s advisable here because filter executes code injected by user which potentially may throw exceptions.
I find the fact that most built-in combinators are implemented as
GraphStagequite useful as it means that you can easily inspect how they work underneath. Also, you can use them as a reference when writing your own
When you should consider to write your own GraphStage
The pragmatic answer to such question is when thing you need to do is impossible to achieve with built-in combinators. Here are some signs that you should think about custom
- you need to maintain state in order to process incoming messages. This is not a sufficient reason just by itself as you can do stateful computations using e.g.
statefulMapConcatbut this is not something that scales well as complexity grows and therefore should be used just for simple cases
- you don’t pair input and output elements in one-to-one fashion. Simple example here may be
filter– you don’t know how many elements of initial Source will Sink receive. More complicated
GraphStagesmay buffer incoming messages in internal state and push them to downstream based on some criterias. It’s possible to do that because on
GraphStagelevel we have access to
pullmethods and callbacks
More involving example
Now, after I introduced the basics of
GraphStage we can move to more involving example. I came across it when working on akka-http#192. The goal is to establish TCP connection to proxy host. Then we need to send
CONNECTmessage and after successful response from proxy we can send and receive actual requests.
To keep our scope on
GraphStage I will skip some HTTP-specific things like parsing requests. Also, instead of presenting final version of code at the beginning we will go step by step, so I will present also flawed version of code. I believe it may be beneficial to reader to see whole process instead of just end result.
Where to plug ProxyStage
Let’s start by finding place where we will put not yet built ProxyStage. It looks like this currently:
You may wonder what is
BidiFlow. This is an element of execution graph with two input and two output ports. Data elements arrive at first input (let’s call it I1), are processed and flows out at first output (O1), then the response comes back at second input (I2) and after being processed is yielded at second output (O2). Thus
tlsStage can be illustrated with such diagram:
BidiFlow is especially useful when subsequent stages of your graph represents consecutive layers of communication. That’s why it was used for
TLSStage and is commonly used in akka-http.
Now it’s clear that our ultimate goal can be achieved with such topology:
ProxyStage does not exist yet – we need to implement it. The only interesting thing that
ProxyStage does is at the beginning of communication. After HTTPS tunnel is established it will serve just as transparent proxy that forwards what it receives. Let’s start with this easy part – we’ll call it
NeutralStage. It may look like this:
It can be run with:
OK, it simply forwards messages in both directions. That’s a good starting point, now we need to send
CONNECT message before we forward any other message. We can do it in
onPull – it will be called when transport layer asks for data:
We also introduced
state variable of type
State. It helps us to track current state of GraphStage. It is defined as:
We also need to take care of incoming data elements on input
I1. We append them to
bufferedMsgs. As soon as we are in
Connected state we can simply push what we received.
The last piece we need to add is validating Proxy response for
proxyResponseValid returns true we consider it a success and change internal state to
Connected. Besides of that we need to remember to send buffered messages. We can do it with
Whole code may be found in HttpsProxyStage0.scala in repo. We can run this code for sample
This code will hang up. Do you have any idea why?
The problem is that we don’t pull
bytesIn enough. The only place when we pull
bytesIn is inside
onPull. It was called once (Sink signals it wants to consume some data) and then in bytesIn’s
onPush we don’t push it outside as “OK” response is a technical message and
Sink doesn’t care about it.
In other words – our
HttpsProxyStage will see one data element more for ports communicating with
transport than for ports communicating with outside world. (That’s an oversimplification in case of akka-http. Because of its streaming nature we cannot be sure that response to CONNECT will come as a single data element. For simplicity in this article we assume that it will always come in one data element).
After this change it seems to work.
Reduce state variables
We gained some deeper knowledge about relations between number of data elements. Now let’s analyze if we really need
bufferedMsgs. You may think that we need to do something with incoming messages in
onPush – and the only reasonable thing to do seems to save it to internal state.
And that’s right, but key observation here is that we can avoid
onPush to be called unless connection is established. We can do it by not pulling
sslIn – as long as it’s not pulled no one is able to push anything to it.
bufferedMsgs removal (
When we run this code it will throw an exception:
java.lang.IllegalArgumentException: requirement failed: Cannot pull port (OutgoingSSL.in) twice. Exception message is quite clear, in stack trace we can even found fault line – it’s
bytesIn.onPush is called before
bytesOut.onPull which results in calling
pull(sslIn) twice in row. We shouldn’t code against any specific order as we don’t control layer behind
What we should do is to test the code against both cases. The problem here is that with current testing code – namely
transport being a simple flow created with
Flow[ByteString].statefulMapConcat we don’t have enough control over data and demand elements. Moreover, logic of
ProxyStage is getting quite complex and we should do some proper testing. Having tests done beforehand will make it easier to fix the bug.
We can test custom
GraphStage and akka-streams in general with
akka-stream-testkit. In simple cases it’s enough to run flow you want to test – in following example named
flowUnderTest – in such fashion:
I want to stress out here that
akka-stream-testkit gives us a huge control over all kind of messages used in Akka Reactive Streams implementation. Besides of data elements and completions we can also send and check errors or subscriptions. Let’s see how we can rewrite the last code snippet:
request(1) with subsequent
expectNext(9). That way we decoupled pulling from asserting receival of expected message. Let’s take a look at the most used sink’s methods:
def request(n: Long)– pulls for
ndata elements from upstream
def expectNext(element: I)– expects arrival of
element(does not pull)
def expectNext(): I– expects arrival of any element which is returned as result
def requestNext(element: T):– does what
Going back to
HttpsProxyStage – we want to test stage of BidiShape which is little bit more complicated. We can construct
transport in such manner:
In that way we have access to
transportOutFlowwhich will allow us to test different transport layer behaviors.
What we wanted to achieve is to have
bytesOut.onPull called before
bytesIn.onPush. We can do this with following code:
We cannot replace
transportInProbe.requestNext() as it would hang up test. We want to just pull at that place and then only after “OK” response was sent
HttpsProxyStage will send anything more. More tests might be found in repo.
You can look at the eventual implementation in
CorrectHttpsProxyStage in repo.
We also override
onDownstreamFinish to achieve transparency to errors. When a stage fails it firstly cancels all inputs and only after that sends error to all outputs. This ordering is problematic in case of
BidiFlow because default implementation of
onDownstreamFinish is to complete stage. So if
transport will call
failStage it will call
onUpstreamFailure will never be called for
ProxyStage as it already won’t be running when transport propagate errors to its downstream… That’s why we need to do this:
failStage vs throwing exception
We can see that both styles are used in ultimate implementation but there’s a slight difference between them.
failStage should be used when something that was anticipated happened and we want to cancel upstream, fail downstream and then stop stage. Error should flows down the stream and may be e.g. materialized to failed
In our case it’s server responding with something different than “OK”. It’s a situation that may happen, is out of our control and we should be prepared to this.
On the other hand we have situations when some assertions, on which we rely on, may fail. In that case it’s advisable to throw an exception. The effect will be similar to
failStage and error also will flow down the stream but situation will also we logged to akka log as an error.
GraphStageis useful when you run out of possibilities to get what you want with built-in combinators
GraphStageis especially useful when you want to have precise control over details how data elements and demand should flow
- It takes great dilligence to properly balance
akka-stream-testkitis there to help you test your
GraphStageand flows in general. It’s very powerful tool which gives you great control over how data element and demand flow
- When writing custom
GraphStages take into account failure – how you want to propagate errors and if you want to suit it to Supervision Policy
BidiFlowis useful for modelling layered protocols
GraphStageAPI is a nice abstraction over Reactive Streams. Understanding this API may help you gain deeper understanding about Reactive Streams and backpressure in general
- Most built-in combinators are implemented with
GraphStage. You can study existing implementations and their tests
- Repository for this article
- Implementing a Custom Akka Streams Graph Stage
- Mastering GraphStages part 1 and part 2
- Akka docs on GraphStage
- akka-stream-contrib with quite a lot of
Thanks to James Roper for his suggestions in PR review.