Build your own Kafka in ZIO – Queues & Fibers
Last update: 20 January 2021
Queueing and messaging platforms have been gaining in popularity in recent years. They solve numerous problems based on asynchronous message passing or consumer and producer patterns. In this blog post, we’re going to build a basic message broker functionality with ZIO
for our internal clinic messaging system, specifically with ZIO Queues and ZIO Fibers.
In our clinic, we have x-ray rooms which produce x-ray photographs of hips and knees, which are sent via a messaging system. For any given body part, some physicians can perform a photographic analysis. Additionally, we want to be able to perform message logging for selected body parts.
This example accurately describes a message broker with topics: sending messages to defined topics, subscribing to them in two ways – the ‘one message one consumer’ type pattern and the multicast type pattern. We will be performing this subscription via consumer groups
to which consumers subscribe within any particular topic.
Each topic’s message is delivered to every consumer group
(like multicast), but within each group, only one consumer can digest the message (like producers and consumers). Here’s an image showing this:

Of course, there are plenty of distributed platforms that can achieve this, e.g. RabbitMQ
provides us with a so-called exchange
– a broker between a producer and queues that decides which queues to send the message to. Broadcast is supplied via a funout
exchange, as opposed to direct
and topic
exchange types that require a match to the message’s topic.
So let’s try to implement this concept one more time, but this time with ZIO Queues and ZIO Fibers in an effective way.
ZIO Queues & ZIO Fibers
But first things first – let’s briefly introduce Fibers
and Queues
in ZIO
.
So Fibers
are data types for expressing concurrent computations. Fibers
are loosely related to threads – a single Fiber
can be executed on multiple threads by shifting between them – all with full resource safety!
What makes Fibers
stronger is the seamless setting in ZIO
. Having some effect e.g. UIO("work")
we only need to call .fork
on it to make it run on Fiber
. Then it’s up to us what to do next: interrupt
– stop Fiber
by force, join
– block current Fiber
until it returns the result or races with another Fiber
– runs two ZIO Fibers and returns the first that succeeded.
I should mention that the underlying implementation of race
is done via raceWith
– a powerful method that allows you to provide any logic for managing two separate Fibers
. raceWith
is used not only in race
but also zipPar
– for running two Fibers
in parallel and returning both results as a tuple.
On the other hand, Queues
in ZIO
addresses issues that we can encounter while using BlockingQueue
. The effective, back-pressured ZIO Queue makes it easy to avoid blocked threads on Queues
core operations such as offer
and take
.
Apart from a bounded back-pressured queue, ZIO Queues deliver other overflow behaviors such as sliding – for removing the last inserted element, or dropping – for discarding the newly received elements. All this in a non-blocking manner.
So the moment we use queue.offer(sth).fork
on a filled back-pressured queue, we are sure that running a separate fiber will make it non-blocking for the main one. Other ZIO Queue assets are interruption (as fibers are) and safe shutdown.
Domain
We’ll start with defining our domain and request class with a topic field.
Additionally, we will implement RequestGenerator
for generating Requests
:
sealed trait Diagnostic
case object HipDiagnostic extends Diagnostic
case object KneeDiagnostic extends Diagnostic
case class Request[A](topic: Diagnostic, XRayImage: A)
trait RequestGenerator[R, A] {
def generate(topic: Diagnostic): URIO[R, Request[A]]
}
Imports required by our project:
import zio._
import zio.random._
import zio.console._
import zio.duration._
For the sake of simplicity let’s assume our x-ray images are simply Int
s:
case class IntRequestGenerator() extends RequestGenerator[Random, Int] {
override def generate(topic: Diagnostic): URIO[Random, Request[Int]] =
nextIntBounded(1000) >>= (n => UIO(Request(topic, n)))
}
Before getting started with the first part, let’s take a look at the architecture diagram. It might look strange at first so let’s leave it this way for now:

Consumer
The first component of our system is a Consumer[A]
. Here we are providing two API methods – create
for constructing a consumer wrapped in UIO
and run
that starts a new fiber that continuously waits for elements in its queue to process. The processing is rather dull but following console logs are definitely not!
It’s worth stressing that run
returns (Queue, Fiber)
in effect so apart from connecting the consumer to the system we can also interrupt
or join
the customer:
case class Consumer[A](title: String) {
def run = for {
queue <- Queue.bounded[A](10)
loop = for {
img <- queue.take
_ <- putStrLn(s"[$title] worker: Starting analyzing task $img")
rand <- nextIntBounded(4)
_ <- ZIO.sleep(rand.seconds)
_ <- putStrLn(s"[$title] worker: Finished task $img")
} yield ()
fiber <- loop.forever.fork
} yield (queue, fiber)
}
object Consumer {
def create[A](title: String) = UIO(Consumer[A](title))
}
As we are more used to an imperative approach, let’s focus for a moment on the advantages of using ZIO
effects here.
Any potentially dangerous side effects here are kept inside the ZIO
monad. This makes a unit println
method more substantial and, referentially transparent. Also, having a physical grasp on everything is really beneficial when it comes to parallelism.
Here, we were able to build an arbitrary chain of computations and make it run forever on a separate ZIO Fiber with a pleasing .forever.fork
.
Topic Queue
TopicQueue
is kind of the most complicated part. It’s in charge of the proper distribution of messages among subscribers. The subscribe
method receives a subscriber’s queue and the consumerGroup
number. As you will no doubt recall, each message is passed to each consumerGroup
and then to a random subscriber within each group. The run
method follows the pattern from previous components – a continuous loop of acquiring messages and distributing them within the described scheme:
case class TopicQueue[A](queue: Queue[A], subscribers: Ref[Map[Int, List[Queue[A]]]]) {
def subscribe(sub: Queue[A], consumerGroup: Int): UIO[Unit] =
subscribers.update { map =>
map.get(consumerGroup) match {
case Some(value) =>
map + (consumerGroup -> (value :+ sub))
case None =>
map + (consumerGroup -> List(sub))
}
}
private val loop =
for {
elem <- queue.take
subs <- subscribers.get
_ <- ZIO.foreach(subs.values) { group =>
for {
idx <- nextIntBounded(group.length)
_ <- group(idx).offer(elem)
} yield ()
}
} yield ()
def run = loop.forever.fork
}
object TopicQueue {
def create[A](queue: Queue[A]): UIO[TopicQueue[A]] =
Ref.make(Map.empty[Int, List[Queue[A]]]) >>= (map => UIO(TopicQueue(queue, map)))
}
In this part, immutability is what strikes us first. No explicit, side-effect modifications of a subscribers
map can occur without our knowledge. Here we’re using Ref
from ZIO to store the map and perform updates.
It’s worth mentioning that wrapping the constructor method in UIO
is essential for consistency, as instantiating a new ZIO Queue should always be a part of our effect chain.
Exchange
Our Exchange
is pretty similar to the RabbitMQ
exchange. The constructor simply creates three queues – the input queue for incoming jobs (jobQueue
) and two output queues for routing (queueHip
and queueKnee
). What our exchange is also doing is unwrapping XRayImage
from Request
:
case class Exchange[A]() {
def run = for {
jobQueue <- Queue.bounded[Request[A]](10)
queueHip <- Queue.bounded[A](10)
queueKnee <- Queue.bounded[A](10)
hipTopicQueue <- TopicQueue.create(queueHip)
kneeTopicQueue <- TopicQueue.create(queueKnee)
loop = for {
job <- jobQueue.take
_ <- job.topic match {
case HipDiagnostic =>
queueHip.offer(job.XRayImage)
case KneeDiagnostic =>
queueKnee.offer(job.XRayImage)
}
} yield ()
fiber <- loop.forever.fork
} yield (jobQueue, hipTopicQueue, kneeTopicQueue, fiber)
}
object Exchange {
def create[A] = UIO(Exchange[A]())
}
Producer
Producing is simply done by supplying a provided queue with Requests
. You might have noticed that the run
method follows some patterns. Building asynchronous computations with self-explanatory schedules and a lazy execution is easy:
case class Producer[R, A](queue: Queue[Request[A]], generator: RequestGenerator[R, A]) {
def run = {
val loop = for {
_ <- putStrLn("[XRayRoom] generating hip and knee request")
hip <- generator.generate(HipDiagnostic)
_ <- queue.offer(hip)
knee <- generator.generate(KneeDiagnostic)
_ <- queue.offer(knee)
_ <- ZIO.sleep(2.seconds)
} yield ()
loop.forever.fork
}
}
object Producer {
def create[R, A](queue: Queue[Request[A]], generator: RequestGenerator[R, A]) = UIO(Producer(queue, generator))
}
Program
Finally, the Program. Now we will combine all the previous components to assemble a fully operational clinic messaging system. First, we instantiate Consumers
and launch them (reminder: ZIO Fibers are lazy, unlike Futures
). Then it’s time for Exchange
and Producer
. Notice that returning tuples gives a possibility to ignore the fibers that we don’t need. Finally, we subscribe Consumers
for the output queues and, importantly, define the ConsumerGroup
with the launch:
val program = for {
physicianHip <- Consumer.create[Int]("Hip")
ctxPhHip <- physicianHip.run
(phHipQueue, phHipFiber) = ctxPhHip
loggerHip <- Consumer.create[Int]("HIP_LOGGER")
ctxLoggerHip <- loggerHip.run
(loggerHipQueue, _) = ctxLoggerHip
physicianKnee <- Consumer.create[Int]("Knee1")
ctxPhKnee <- physicianKnee.run
(phKneeQueue, _) = ctxPhKnee
physicianKnee2 <- Consumer.create[Int]("Knee2")
ctxPhKnee2 <- physicianKnee2.run
(phKneeQueue2, _) = ctxPhKnee2
exchange <- Exchange.create[Int]
ctxExchange <- exchange.run
(inputQueue, outputQueueHip, outputQueueKnee, _) = ctxExchange
generator = IntRequestGenerator()
xRayRoom <- Producer.create(inputQueue, generator)
_ <- xRayRoom.run
_ <- outputQueueHip.subscribe(phHipQueue, consumerGroup = 1)
_ <- outputQueueHip.subscribe(loggerHipQueue, consumerGroup = 2)
_ <- outputQueueKnee.subscribe(phKneeQueue, consumerGroup = 1)
_ <- outputQueueKnee.subscribe(phKneeQueue2, consumerGroup = 1)
_ <- outputQueueHip.run
_ <- outputQueueKnee.run
_ <- phHipFiber.join
} yield ()
Also after launching TopicQueues
with run
, we can still subscribe to them.
Running the program
Phew… that was a lot, let’s put it into the ZIO
application and run it:
object Main extends App {
override def run(args: List[String]) = program.exitCode
}
Looking into the logs we see that:
1. Multicast for all the ConsumerGroups
within the hip
topic works as expected – hip
physician and HIP_LOGGER
receive the same messages.
2. Within a single ConsumerGroup
the messages are routed in a random manner (definitely field for improvement!):
[XRayRoom] generating hip and knee request
[Knee1] worker: Starting analyzing task 474
[Hip] worker: Starting analyzing task 345
[Hip] worker: Finished task 345
[HIP_LOGGER] worker: Starting analyzing task 345
[HIP_LOGGER] worker: Finished task 345
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 179
[HIP_LOGGER] worker: Starting analyzing task 179
[Hip] worker: Finished task 179
[Knee1] worker: Finished task 474
[Knee1] worker: Starting analyzing task 154
[Knee1] worker: Finished task 154
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 763
[Knee1] worker: Starting analyzing task 562
[HIP_LOGGER] worker: Finished task 179
[HIP_LOGGER] worker: Starting analyzing task 763
[Hip] worker: Finished task 763
[Knee1] worker: Finished task 562
[HIP_LOGGER] worker: Finished task 763
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 474
[Knee2] worker: Starting analyzing task 997
[HIP_LOGGER] worker: Starting analyzing task 474
[Hip] worker: Finished task 474
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 184
[Knee1] worker: Starting analyzing task 578
[Knee2] worker: Finished task 997
[HIP_LOGGER] worker: Finished task 474
Conclusion
Our simple, yet operational, program shows how to implement a message broker with direct and multicast behaviors.
Having chosen ZIO
we have managed to unearth only a fraction of its potential – by using ZIO Queues and ZIO Fibers within effects. Out of the box parallelism, immutability, referential transparency, and wrapped side effect managing are what has made this example painless and really very enjoyable to write.
To see complete example see gist link below.
Links
https://gist.github.com/mtsokol/0d6ab5473c04583899e3ffdcb7812959