Queueing and messaging platforms have been gaining in popularity in recent years. They solve numerous problems based on asynchronous message passing or consumer and producer patterns. In this blog post, we’re going to build a basic message broker functionality with ZIO for our internal clinic messaging system, specifically with ZIO Queues and ZIO Fibers.

In our clinic, we have x-ray rooms which produce x-ray photographs of hips and knees, which are sent via a messaging system. For any given body part, some physicians can perform a photographic analysis. Additionally, we want to be able to perform message logging for selected body parts.

This example accurately describes a message broker with topics: sending messages to defined topics, subscribing to them in two ways – the ‘one message one consumer’ type pattern and the multicast type pattern. We will be performing this subscribing via consumer groups to which consumers subscribe within any particular topic.

Each topic’s message is delivered to every consumer group (like multicast), but within each group, only one consumer can digest the message (like producers and consumers). Here’s an image showing this:

ZIO Fibers ZIO Queues Message broker

Of course, there are plenty of distributed platforms that can achieve this, e.g. RabbitMQ provides us with a so-called exchange – a broker between a producer and queues that decides which queues to send the message to. Broadcast is supplied via a funout exchange, as opposed to direct and topic exchange types which require a match to the message’s topic.

So let’s try to implement this concept one more time, but this time with ZIO Queues and ZIO Fibers in an effectful way.

ZIO Queues & ZIO Fibers

But first things first – let’s briefly introduce Fibers and Queues in ZIO.

So Fibers are data types for expressing concurrent computations. Fibers are loosely related to threads – a single Fiber can be executed on multiple threads by shifting between them – all with full resource safety!

What makes Fibers stronger is the seamless setting in ZIO. Having some effect e.g. IO.succeedLazy("work") we only need to call .fork on it to make it run on Fiber. Then it’s up to us what to do next: interruptstop Fiber by force, join – block current Fiber until it returns the result or races with another Fiber – runs two ZIO Fibers and returns the first that succeeded.

I should mention that the underlying implementation of race is done via raceWith – a powerful method that allows you to provide any logic for managing two separate Fibers. raceWith is used not only in race but also zipPar – for running two Fibers in parallel and returning both results as a tuple.

On the other hand, Queues in ZIO addresses issues that we can encounter while using BlockingQueue. The effectful, back-pressured ZIO Queue makes it easy to avoid blocked threads on Queues core operations such as offer and take.

Apart from a bounded back-pressured queue, ZIO Queues delivers other overflow behaviors such as sliding – for removing the last inserted element, or dropping – for discarding the newly received elements. All this in a non-blocking manner.

So the moment we use queue.offer(sth).fork on a filled back-pressured queue, we are sure that running a separate fiber will make it non-blocking for the main one. Other ZIO Queue assets are interruption (as fibers are) and safe shutdown.

Domain

We’ll start with defining our domain and request class with a topic field.

Additionally, we will implement RequestGenerator for generating Requests:

sealed trait Diagnostic

case object HipDiagnostic extends Diagnostic

case object KneeDiagnostic extends Diagnostic

case class Request[A](topic: Diagnostic, XRayImage: A)

trait RequestGenerator[A] {
  def generate(topic: Diagnostic): Request[A]
} 

Imports required by our project:

import zio._
import zio.console._
import zio.duration.Duration
import scala.util.Random
import java.util.concurrent.TimeUnit

For the sake of simplicity let’s assume our x-ray images are simply Ints:

case class IntRequestGenerator() extends RequestGenerator[Int] {
  override def generate(topic: Diagnostic): Request[Int] = Request(topic, Random.nextInt(1000))
}

Before getting started with the first part, let’s take a look at the architecture diagram. It might look strange at first so let’s leave it this way for now:

`ZIO Fibers ZIO Queues architecture diagram

Consumer

The first component of our system is a Consumer[A]. Here we are providing two API methods – createM for constructing a consumer wrapped in UIO and run that starts a new fiber that continuously waits for elements in its queue to process. The processing is rather dull but following the console logs is definitely not!

It’s worth stressing that run returns (Queue, Fiber) in effect so apart from connecting the consumer to the system we can also interrupt or join the customer:

case class Consumer[A](title: String) {
  val queueM = Queue.bounded[A](10)

  def run = for {
    queue <- queueM
    loop = for {
      img <- queue.take
      _ <- putStrLn(s"[$title] worker: Starting analyzing task $img")
      _ <- ZIO.sleep(Duration(Random.nextInt(4), TimeUnit.SECONDS))
      _ <- putStrLn(s"[$title] worker: Finished task $img")
    } yield ()
    fiber <- loop.forever.fork
  } yield (queue, fiber)
}

object Consumer {
  def createM[A](title: String) = UIO.succeed(Consumer[A](title))
}

As we are more used to an imperative approach, let's focus for a moment on the advantages of using ZIO effects here.

Any potentially dangerous side effects here are kept inside the ZIO monad. This makes a unit println method more substantial and, referentially transparent. Also, having a physical grasp on everything is really beneficial when it comes to parallelism.

Here, we were able to build an arbitrary chain of computations and make it run forever on a separate ZIO Fiber with a pleasing .forever.fork.

Topic Queue

TopicQueue is kind of the most complicated part. It's in charge of the proper distribution of messages among subscribers. The subscribeM method receives a subscriber's queue and the consumerGroup number. As you will no doubt recall, each message is passed to each consumerGroup and then to a random subscriber within each group. The run method follows the pattern from previous components - a continuous loop of acquiring messages and distributing them within the described scheme:

case class TopicQueue[A](queue: Queue[A], subscribers: Map[Int, List[Queue[A]]]) {
  def subscribeM(sub: Queue[A], consumerGroup: Int): UIO[TopicQueue[A]] = {
    val updatedMap = subscribers.get(consumerGroup) match {
      case Some(value) =>
        subscribers + (consumerGroup -> (value :+ sub))
      case None =>
        subscribers + (consumerGroup -> List(sub))
    }

    UIO.succeed(copy(subscribers = updatedMap))
  }

  def run = {
    def randomElement(list: List[Queue[A]]) = if (list.nonEmpty) {
      Some(list(Random.nextInt(list.length)))
    } else {
      None
    }
    val loop = for {
      elem <- queue.take
      mapped = subscribers.values.flatMap(randomElement(_).map(_.offer(elem)))
      _ <- ZIO.collectAll(mapped)
    } yield ()
    loop.forever.fork
  }
}

object TopicQueue {
  def createM[A](queue: Queue[A]): UIO[TopicQueue[A]] = UIO.succeed(TopicQueue(queue, Map.empty))
}

In this part, immutability is what strikes us first. No sneaky, side-effect-like modifications of a subscribers map can occur without our knowledge. For each modification, we return the effect with a new TopicQueue that we are now acknowledging as an operative.

It's worth mentioning that wrapping the constructor method in UIO is essential for consistency, as instantiating a new ZIO Queue should always be a part of our effect chain.

Exchange

Our Exchange is pretty similar to the RabbitMQ exchange. The constructor simply creates three queues - the input queue for incoming jobs (jobQueue) and two output queues for routing (queueHip and queueKnee). What our exchange is also doing is unwrapping XRayImage from Request:

case class Exchange[A]() {
  val queueHipM = Queue.bounded[A](10)
  val queueKneeM = Queue.bounded[A](10)
  val jobQueueM = Queue.bounded[Request[A]](10)

  def run = for {
    jobQueue <- jobQueueM
    queueHip <- queueHipM
    queueKnee <- queueKneeM
    hipTopicQueue = TopicQueue(queueHip, Map.empty)
    kneeTopicQueue = TopicQueue(queueKnee, Map.empty)
    loop = for {
      job <- jobQueue.take
      _ <- job.topic match {
        case HipDiagnostic =>
          queueHip.offer(job.XRayImage)
        case KneeDiagnostic =>
          queueKnee.offer(job.XRayImage)
      }
    } yield ()
    fiber <- loop.forever.fork
  } yield (jobQueue, hipTopicQueue, kneeTopicQueue, fiber)
}

object Exchange {
    def createM[A] = ZIO.succeed(Exchange[A]())
}

Producer

Producing is simply done by supplying a provided queue with Requests. You might have noticed that the run method follows some patterns. Building asynchronous computations with self-explanatory schedules and a lazy execution is easy:

case class Producer[A](queue: Queue[Request[A]], generator: RequestGenerator[A]) {
  def run = {
    val loop = for {
      _ <- putStrLn("[XRayRoom] generating hip and knee request")
      _ <- queue.offer(generator.generate(HipDiagnostic))
      _ <- queue.offer(generator.generate(KneeDiagnostic))
      _ <- ZIO.sleep(Duration(2, TimeUnit.SECONDS))
    } yield ()
    loop.forever.fork
  }
}

object Producer {
  def createM[A](queue: Queue[Request[A]], generator: RequestGenerator[A]) = UIO.succeed(Producer(queue, generator))
}

Program

Finally, the Program. Now we will combine all the previous components to assemble a fully operational clinic messaging system. First, we instantiate Consumers and launch them (reminder: ZIO Fibers are lazy, unlike Futures). Then it’s time for Exchange and Producer. Notice that returning tuples gives a  possibility to ignore the fibers that we don't need. Finally, we subscribe Consumers for the output queues and, importantly, define the ConsumerGroup with the launch:

val program = for {

  physicianHip <- Consumer.createM[Int]("Hip")
  ctxPhHip <- physicianHip.run
  (phHipQueue, phHipFiber) = ctxPhHip

  loggerHip <- Consumer.createM[Int]("HIP_LOGGER")
  ctxLoggerHip <- loggerHip.run
  (loggerHipQueue, _) = ctxLoggerHip

  physicianKnee <- Consumer.createM[Int]("Knee1")
  ctxPhKnee <- physicianKnee.run
  (phKneeQueue, _) = ctxPhKnee

  physicianKnee2 <- Consumer.createM[Int]("Knee2")
  ctxPhKnee2 <- physicianKnee2.run
  (phKneeQueue2, _) = ctxPhKnee2


  exchange <- Exchange.createM[Int]
  ctxExchange <- exchange.run
  (inputQueue, outputQueueHip, outputQueueKnee, _) = ctxExchange


  generator = IntRequestGenerator()
  xRayRoom <- Producer.createM[Int](inputQueue, generator)
  _ <- xRayRoom.run


  outputQueueHip <- outputQueueHip.subscribeM(phHipQueue, consumerGroup = 1)
  outputQueueHip <- outputQueueHip.subscribeM(loggerHipQueue, consumerGroup = 2)

  outputQueueKnee <- outputQueueKnee.subscribeM(phKneeQueue, consumerGroup = 1)
  outputQueueKnee <- outputQueueKnee.subscribeM(phKneeQueue2, consumerGroup = 1)

  _ <- outputQueueHip.run
  _ <- outputQueueKnee.run

  _ <- phHipFiber.join

} yield ()

You might also have noticed that after launching TopicQueues,  we can't subscribe to them anymore. But no worries! Since run returns the ZIO Fiber we can just interrupt it, subscribe to a new Consumer and relaunch it!

Running the program

Phew... that was a lot, let's put it into the ZIO application and run int:

object Main extends App {
  override def run(args: List[String]): ZIO[Main.Environment, Nothing, Int] = program.fold(_ => 1, _ => 0)
}

Looking into the logs we see that:

1. Multicast for all the ConsumerGroups within the hip topic works as expected - hip physician and HIP_LOGGER receive the same messages.

2. Within a single ConsumerGroup the messages are routed in a random manner (definitely field for improvement!):

[XRayRoom] generating hip and knee request
[Knee1] worker: Starting analyzing task 474
[Hip] worker: Starting analyzing task 345
[Hip] worker: Finished task 345
[HIP_LOGGER] worker: Starting analyzing task 345
[HIP_LOGGER] worker: Finished task 345
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 179
[HIP_LOGGER] worker: Starting analyzing task 179
[Hip] worker: Finished task 179
[Knee1] worker: Finished task 474
[Knee1] worker: Starting analyzing task 154
[Knee1] worker: Finished task 154
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 763
[Knee1] worker: Starting analyzing task 562
[HIP_LOGGER] worker: Finished task 179
[HIP_LOGGER] worker: Starting analyzing task 763
[Hip] worker: Finished task 763
[Knee1] worker: Finished task 562
[HIP_LOGGER] worker: Finished task 763
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 474
[Knee2] worker: Starting analyzing task 997
[HIP_LOGGER] worker: Starting analyzing task 474
[Hip] worker: Finished task 474
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 184
[Knee1] worker: Starting analyzing task 578
[Knee2] worker: Finished task 997
[HIP_LOGGER] worker: Finished task 474

Conclusion

Our simple, yet operational, program shows how to implement a message broker with direct and multicast behaviors.

Having chosen ZIO we have managed to unearth only a fraction of its potential - by using ZIO Queues and ZIO Fibers within effects. Out of the box parallelism, immutability, referential transparency, and wrapped side effect managing are what has made this example painless and really very enjoyable to write.

To see complete example see gist link below.

Links

https://gist.github.com/mtsokol/0d6ab5473c04583899e3ffdcb7812959

https://github.com/zio/zio

https://zio.dev/docs/datatypes/datatypes_queue

https://zio.dev/docs/datatypes/datatypes_fiber

More about ZIO on our BLOG:

PART I – ZIO AND SLICK

All of us are eager to start writing real-world applications using ZIO (Scala library). But ZIO is still quite new, and its ecosystem is still incomplete.

So while we wait for ZIO-HTTP and ZIO-JDBC (or whatever else comes in the future) to happen, we will have to end up integrating ZIO with other frameworks that can communicate with the outside world.

In this article, I want to explore what this kind of integration might look like for two of the most popular libraries – Slick and Akka HTTP.

Read more

Many people think that GraphQL is ‘something’ related to Graph Databases, in my opinion they’re wrong. GraphQL is to Graph DB like Javascript to Java. Are based on similar concepts, but are used for completely different things.

In short: GraphQL is a query language for APIs, optimized for performance, designed and open-sourced by Facebook. In GraphQL you can ask server for connected data and you’ll get in response only what you’ve asked for. Not more. Great! Isn’t it?

Welcome to our second article about Keycloak Server! Previously, we’ve learnt to setup the server and make use of it in a webapp -this time, we’re going to create a matching back-end API and also learn a little bit about the OAuth2/OpenID magic that powers our entire authn/authz mechanism. So let’s get started!

S3 (Amazon’s Simple Storage Service) is a popular, widely used object storage service. Uploading a file and storing it on S3 in Akka Http traditionally involves temporarily storing a file before sending it to S3 service. This process can be simplified by using Alpakka AWS S3 Connector. In this post we will take a look at those two different ways and see how they compare with each other. Read more

Introduction

In this article, we will take a closer look at the core of Akka in an attempt to improve its overall performance.

To begin with, we will provide a short overview of the Akka toolkit and the Actor system. Following that we will study the life cycle of a message in order to gain a better understanding of what makes the Akka’s actor implementation tick. We will continue with a brief review of one of the common types of Executor Services that are used in Akka, while exposing its implementation details. Read more

In this post, we will look at how primitive Scala types such as Int and Long are represented down to the bytecode level. This will help us understand what the performance effects of using them in generic classes are. We will also explore the functionalities that the Scala compiler provides us for mitigating such performance penalties.

Furthermore, we will take a look at concrete benchmark results and convince ourselves that boxing/unboxing can have a significant effect on the latency of an application. Read more

In this post I will try to present what is GraphStage in Akka Streams. My goal is to describe when it’s useful and how to use it correctly. I will start with outlining key terminology, then proceed with simple example and after that the main use case will be covered. For the latter the most upvoted issue of akka-http will serve.

At the end, I will show how to properly test GraphStage. Besides of learning API you’ll gain deeper understanding how backpressure works. Read more

A very common scenario in many kinds of software is when the input data is potentially unlimited and it can appear at arbitrary intervals. The common way of handling such cases is using the Observer pattern in it’s imperative form – callbacks.

But this approach creates what’s commonly called “Callback Hell”. It’s a concept basically identical to the more commonly known “GOTO Hell” as they both mean erratic jumps in flow of control that can be very hard to reason about and work with. When writing an application we need to analyze all the callbacks to be sure e.g. we’re not using a value that can be changed by a callback at a random point of time.

But there exists a declarative approach to solving this problem that let’s us reason about it in a much more predictable and less chaotic fashion – Streams. Read more

conductR logo

Part of the success of modern application is targeting it globally – all over the world. It isn’t possible to run such application on a single machine, even with most powerful hardware.

Definitions like Distributed computing or Reactive applications were born in the process of IT globalization. Nowadays, applications run on multiple virtual machines distributed over multiple physical machines which are often spread around the world. Such applications aren’t easy to maintain.

Every service has different hardware requirements and dependencies, so it has to be deployed and upgraded continuously. In addition each machine has to be configured in such a way that allows communication within the cluster and with external services. Although Devops have helpful deployment tools like Chef, Puppet or Ansible, these tasks still aren’t easy, trust me. Read more