Queueing and messaging platforms have been gaining in popularity in recent years. They solve numerous problems based on asynchronous message passing or consumer and producer patterns. In this blog post, we’re going to build a basic message broker functionality with ZIO for our internal clinic messaging system, specifically with ZIO Queues and ZIO Fibers.

In our clinic, we have x-ray rooms which produce x-ray photographs of hips and knees, which are sent via a messaging system. For any given body part, some physicians can perform a photographic analysis. Additionally, we want to be able to perform message logging for selected body parts.

This example accurately describes a message broker with topics: sending messages to defined topics, subscribing to them in two ways – the ‘one message one consumer’ type pattern and the multicast type pattern. We will be performing this subscribing via consumer groups to which consumers subscribe within any particular topic.

Each topic’s message is delivered to every consumer group (like multicast), but within each group, only one consumer can digest the message (like producers and consumers). Here’s an image showing this:

ZIO Fibers ZIO Queues Message broker

Of course, there are plenty of distributed platforms that can achieve this, e.g. RabbitMQ provides us with a so-called exchange – a broker between a producer and queues that decides which queues to send the message to. Broadcast is supplied via a funout exchange, as opposed to direct and topic exchange types which require a match to the message’s topic.

So let’s try to implement this concept one more time, but this time with ZIO Queues and ZIO Fibers in an effectful way.

ZIO Queues & ZIO Fibers

But first things first – let’s briefly introduce Fibers and Queues in ZIO.

So Fibers are data types for expressing concurrent computations. Fibers are loosely related to threads – a single Fiber can be executed on multiple threads by shifting between them – all with full resource safety!

What makes Fibers stronger is the seamless setting in ZIO. Having some effect e.g. UIO("work") we only need to call .fork on it to make it run on Fiber. Then it’s up to us what to do next: interruptstop Fiber by force, join – block current Fiber until it returns the result or races with another Fiber – runs two ZIO Fibers and returns the first that succeeded.

I should mention that the underlying implementation of race is done via raceWith – a powerful method that allows you to provide any logic for managing two separate Fibers. raceWith is used not only in race but also zipPar – for running two Fibers in parallel and returning both results as a tuple.

On the other hand, Queues in ZIO addresses issues that we can encounter while using BlockingQueue. The effectful, back-pressured ZIO Queue makes it easy to avoid blocked threads on Queues core operations such as offer and take.

Apart from a bounded back-pressured queue, ZIO Queues delivers other overflow behaviors such as sliding – for removing the last inserted element, or dropping – for discarding the newly received elements. All this in a non-blocking manner.

So the moment we use queue.offer(sth).fork on a filled back-pressured queue, we are sure that running a separate fiber will make it non-blocking for the main one. Other ZIO Queue assets are interruption (as fibers are) and safe shutdown.


We’ll start with defining our domain and request class with a topic field.

Additionally, we will implement RequestGenerator for generating Requests:

sealed trait Diagnostic

case object HipDiagnostic extends Diagnostic

case object KneeDiagnostic extends Diagnostic

case class Request[A](topic: Diagnostic, XRayImage: A)

trait RequestGenerator[R, A] {
  def generate(topic: Diagnostic): URIO[R, Request[A]]

Imports required by our project:

import zio._
import zio.random._
import zio.console._
import zio.duration._

For the sake of simplicity let’s assume our x-ray images are simply Ints:

case class IntRequestGenerator() extends RequestGenerator[Random, Int] {
  override def generate(topic: Diagnostic): URIO[Random, Request[Int]] =
    nextIntBounded(1000) >>= (n => UIO(Request(topic, n)))

Before getting started with the first part, let’s take a look at the architecture diagram. It might look strange at first so let’s leave it this way for now:

`ZIO Fibers ZIO Queues architecture diagram


The first component of our system is a Consumer[A]. Here we are providing two API methods – create for constructing a consumer wrapped in UIO and run that starts a new fiber that continuously waits for elements in its queue to process. The processing is rather dull but following console logs are definitely not!

It’s worth stressing that run returns (Queue, Fiber) in effect so apart from connecting the consumer to the system we can also interrupt or join the customer:

case class Consumer[A](title: String) {
  def run = for {
    queue <- Queue.bounded[A](10)
    loop = for {
      img  <- queue.take
      _    <- putStrLn(s"[$title] worker: Starting analyzing task $img")
      rand <- nextIntBounded(4)
      _    <- ZIO.sleep(rand.seconds)
      _    <- putStrLn(s"[$title] worker: Finished task $img")
    } yield ()
    fiber <- loop.forever.fork
  } yield (queue, fiber)

object Consumer {
  def create[A](title: String) = UIO(Consumer[A](title))

As we are more used to an imperative approach, let's focus for a moment on the advantages of using ZIO effects here.

Any potentially dangerous side effects here are kept inside the ZIO monad. This makes a unit println method more substantial and, referentially transparent. Also, having a physical grasp on everything is really beneficial when it comes to parallelism.

Here, we were able to build an arbitrary chain of computations and make it run forever on a separate ZIO Fiber with a pleasing .forever.fork.

Topic Queue

TopicQueue is kind of the most complicated part. It's in charge of the proper distribution of messages among subscribers. The subscribe method receives a subscriber's queue and the consumerGroup number. As you will no doubt recall, each message is passed to each consumerGroup and then to a random subscriber within each group. The run method follows the pattern from previous components - a continuous loop of acquiring messages and distributing them within the described scheme:

case class TopicQueue[A](queue: Queue[A], subscribers: Ref[Map[Int, List[Queue[A]]]]) {
  def subscribe(sub: Queue[A], consumerGroup: Int): UIO[Unit] =
    subscribers.update { map =>
      map.get(consumerGroup) match {
        case Some(value) =>
          map + (consumerGroup -> (value :+ sub))
        case None =>
          map + (consumerGroup -> List(sub))

  private val loop =
    for {
      elem <- queue.take
      subs <- subscribers.get
      _    <- ZIO.foreach(subs.values) { group =>
        for {
          idx <- nextIntBounded(group.length)
          _   <- group(idx).offer(elem)
        } yield ()
    } yield ()

  def run = loop.forever.fork

object TopicQueue {
  def create[A](queue: Queue[A]): UIO[TopicQueue[A]] =
    Ref.make(Map.empty[Int, List[Queue[A]]]) >>= (map => UIO(TopicQueue(queue, map)))

In this part, immutability is what strikes us first. No explicit, side-effect modifications of a subscribers map can occur without our knowledge. Here we're using Ref from ZIO to store the map and perform updates.

It's worth mentioning that wrapping the constructor method in UIO is essential for consistency, as instantiating a new ZIO Queue should always be a part of our effect chain.


Our Exchange is pretty similar to the RabbitMQ exchange. The constructor simply creates three queues - the input queue for incoming jobs (jobQueue) and two output queues for routing (queueHip and queueKnee). What our exchange is also doing is unwrapping XRayImage from Request:

case class Exchange[A]() {
  def run = for {
    jobQueue       <- Queue.bounded[Request[A]](10)
    queueHip       <- Queue.bounded[A](10)
    queueKnee      <- Queue.bounded[A](10)
    hipTopicQueue  <- TopicQueue.create(queueHip)
    kneeTopicQueue <- TopicQueue.create(queueKnee)
    loop = for {
      job <- jobQueue.take
      _   <- job.topic match {
        case HipDiagnostic =>
        case KneeDiagnostic =>
    } yield ()
    fiber <- loop.forever.fork
  } yield (jobQueue, hipTopicQueue, kneeTopicQueue, fiber)

object Exchange {
  def create[A] = UIO(Exchange[A]())


Producing is simply done by supplying a provided queue with Requests. You might have noticed that the run method follows some patterns. Building asynchronous computations with self-explanatory schedules and a lazy execution is easy:

case class Producer[R, A](queue: Queue[Request[A]], generator: RequestGenerator[R, A]) {
  def run = {
    val loop = for {
      _    <- putStrLn("[XRayRoom] generating hip and knee request")
      hip  <- generator.generate(HipDiagnostic)
      _    <- queue.offer(hip)
      knee <- generator.generate(KneeDiagnostic)
      _    <- queue.offer(knee)
      _    <- ZIO.sleep(2.seconds)
    } yield ()

object Producer {
  def create[R, A](queue: Queue[Request[A]], generator: RequestGenerator[R, A]) = UIO(Producer(queue, generator))


Finally, the Program. Now we will combine all the previous components to assemble a fully operational clinic messaging system. First, we instantiate Consumers and launch them (reminder: ZIO Fibers are lazy, unlike Futures). Then it’s time for Exchange and Producer. Notice that returning tuples gives a  possibility to ignore the fibers that we don't need. Finally, we subscribe Consumers for the output queues and, importantly, define the ConsumerGroup with the launch:

val program = for {

  physicianHip             <- Consumer.create[Int]("Hip")
  ctxPhHip                 <- physicianHip.run
  (phHipQueue, phHipFiber) = ctxPhHip

  loggerHip           <- Consumer.create[Int]("HIP_LOGGER")
  ctxLoggerHip        <- loggerHip.run
  (loggerHipQueue, _) = ctxLoggerHip

  physicianKnee    <- Consumer.create[Int]("Knee1")
  ctxPhKnee        <- physicianKnee.run
  (phKneeQueue, _) = ctxPhKnee

  physicianKnee2    <- Consumer.create[Int]("Knee2")
  ctxPhKnee2        <- physicianKnee2.run
  (phKneeQueue2, _) = ctxPhKnee2

  exchange                                         <- Exchange.create[Int]
  ctxExchange                                      <- exchange.run
  (inputQueue, outputQueueHip, outputQueueKnee, _) = ctxExchange

  generator = IntRequestGenerator()
  xRayRoom  <- Producer.create(inputQueue, generator)
  _         <- xRayRoom.run

  _ <- outputQueueHip.subscribe(phHipQueue, consumerGroup = 1)
  _ <- outputQueueHip.subscribe(loggerHipQueue, consumerGroup = 2)

  _ <- outputQueueKnee.subscribe(phKneeQueue, consumerGroup = 1)
  _ <- outputQueueKnee.subscribe(phKneeQueue2, consumerGroup = 1)

  _ <- outputQueueHip.run
  _ <- outputQueueKnee.run

  _ <- phHipFiber.join

} yield ()

Also after launching TopicQueues with run, we can still subscribe to them.

Running the program

Phew... that was a lot, let's put it into the ZIO application and run it:

object Main extends App {
  override def run(args: List[String]) = program.as(0)

Looking into the logs we see that:

1. Multicast for all the ConsumerGroups within the hip topic works as expected - hip physician and HIP_LOGGER receive the same messages.

2. Within a single ConsumerGroup the messages are routed in a random manner (definitely field for improvement!):

[XRayRoom] generating hip and knee request
[Knee1] worker: Starting analyzing task 474
[Hip] worker: Starting analyzing task 345
[Hip] worker: Finished task 345
[HIP_LOGGER] worker: Starting analyzing task 345
[HIP_LOGGER] worker: Finished task 345
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 179
[HIP_LOGGER] worker: Starting analyzing task 179
[Hip] worker: Finished task 179
[Knee1] worker: Finished task 474
[Knee1] worker: Starting analyzing task 154
[Knee1] worker: Finished task 154
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 763
[Knee1] worker: Starting analyzing task 562
[HIP_LOGGER] worker: Finished task 179
[HIP_LOGGER] worker: Starting analyzing task 763
[Hip] worker: Finished task 763
[Knee1] worker: Finished task 562
[HIP_LOGGER] worker: Finished task 763
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 474
[Knee2] worker: Starting analyzing task 997
[HIP_LOGGER] worker: Starting analyzing task 474
[Hip] worker: Finished task 474
[XRayRoom] generating hip and knee request
[Hip] worker: Starting analyzing task 184
[Knee1] worker: Starting analyzing task 578
[Knee2] worker: Finished task 997
[HIP_LOGGER] worker: Finished task 474


Our simple, yet operational, program shows how to implement a message broker with direct and multicast behaviors.

Having chosen ZIO we have managed to unearth only a fraction of its potential - by using ZIO Queues and ZIO Fibers within effects. Out of the box parallelism, immutability, referential transparency, and wrapped side effect managing are what has made this example painless and really very enjoyable to write.

To see complete example see gist link below.






More about ZIO on our BLOG:

Cryptonomic NYC Hackathon part 2

The idea

It was the first time I’d ever taken part in a hackathon. I hadn’t been to any of these events before because I was very skeptical about them. I thought: how can we make anything useful in only two days? Well, it turns out that small, but handy tools can be created, even without sacrificing code quality. The key is to choose the right project; not too big or too complicated so you can complete it within a weekend. In our case, it was a Micheline Michelson translator that I’m going to tell you more about in this article.

The hackathon

Our hackathon took place on the first weekend of August (03-04.08). Cryptonomic is a startup which provides tools and smart contracts for decentralized and consortium applications. We had to use the Cryptonomic technology stack, tools such as ConceilJS (https://github.com/Cryptonomic/ConseilJS) during the hackathon. We decided to create a Google-like translator between Michelson and Micheline; two formats of source files used in Tezos software development. 

What are Tezos and Michelson?

According to the Tezos website:

“Tezos is a new decentralized blockchain that governs itself by establishing a true digital commonwealth.”

“Tezos addresses key barriers facing blockchain adoption to date: smart contract safety, long-term upgradability, and open participation”

Michelson is a domain-specific language that we use to write smart contracts on the Tezos blockchain. Unlike Solidity or Viper which must be compiled to EVM (Ethereum Virtual Machine) byte code to be executed on EVM, Michelson code itself gets to run in the Tezos VM.

Micheline vs. Michelson

First of all, Michelson is the specification and Micheline is the concrete language syntax of Michelson encoded in JSON. Before deployment to Tezos VM, Michelson is transformed into Micheline. 

For example, here is the same program in Michelson and Micheline representation:


parameter int;
storage int;
code {CAR;                      # Get the parameter
      PUSH int 1;               # We're adding 1, so we need to put 1 on the stack
      ADD;                      # Add the two numbers
      NIL operation;            # We put an empty list of operations on the stack


      "prim": "parameter",
      "args": [
          "prim": "int"
      "prim": "storage",
      "args": [
          "prim": "int"
      "prim": "code",
      "args": [
            "prim": "CAR"
            "prim": "PUSH",
            "args": [
                "prim": "int"
                "int": "1"
            "prim": "ADD"
            "prim": "NIL",
            "args": [
                "prim": "operation"
            "prim": "PAIR"

As you can see, there is a clear correspondence between Michelson and Micheline representation. Despite this, many people still find it challenging to understand the difference between Michelson and Micheline. That’s why our team decided to create this translator. Above all, we hope it is going to help other developers to learn smart contracts development in Tezos.

The Technology stack

To create our Micheline Michelson translator, we agreed to use Scala and Akka http for the backend side. At the frontend side, we used React, Redux, and Typescript.

The Coding

A conversion between the two formats is already a part of Cryptonomic tools. So, we needed to extract it from the base source code. After that, we decided to create a separate module for conversion which, we could then import into other projects, so as not to duplicate code.

A Micheline to Michelson translation has already been implemented in Scala using Circe, so it was quite easy to integrate it into our Scala-based project. However, the Michelson to Micheline conversion code is JavaScript. We tried to come up with our own Scala parser for Michelson. Unfortunately, it was too time-consuming, and we finally decided to use a parser from ConceilJS. We also chose Node.js for running the JavaScript code.

Michelin Michelson Translator

The Solution

In short, our solution consists of one frontend and two backend modules. 

Frontend module:


Translation module:


Console backend:


(There’s more detailed information about the modules in the Readme files, so there’s no point in duplicating the text)

Also, our team selected Heroku as the deployment platform.

Final application

You can try out our solution here: https://smart-contracts-micheline-michelson-translator-for-tezos.scalac.io/

On the left side, you paste the Micheline code and click translate to see the result. That simple!

The experience

In conclusion, it turns out that over only two days, it’s possible to create a small yet beneficial application. It was also an excellent opportunity to learn about some Tezos development tools. 

The Micheline Michelson translator was one of two projects that by Scalac. Check out the Frontend data visualization app that the other team made. 

Cryptonomic NYC Hackathon part 1

The Idea

We live in an era of cryptocurrencies, and there’s no escaping it. It’s already happening, and if there’s one thing that represents well what’s going to happen next, it’s blockchain. That’s why during the Cryptonomic NYC Hackathon we decided to try and visualize tezos transactions by creating a frontend data visualization app. What is tezos, you ask? As I’ve mentioned before, it’s definitely part of our future. Here’s why. Tezos is a decentralized platform based on blockchain technology with its cryptocurrency (XTZ). It launched in June 2018 and uses the Proof of Stake algorithm. Too much information? Yeah, the cryptocurrencies topic may seem a little bit complicated at first and needs some time to get into but don’t worry, let’s go over it together. 

Frontend data visualization app – Time of day chart
Frontend data visualisation

What exactly did we build?

tAs I’ve already mentioned, we’ve built a frontend data visualization app. We wanted to visualize tezos transactions on the frontend/client-side. We’ve split the application into several sections such as Home, Transactions, Charts, and Live Chart. The Home section presents general information about our project, such as the tech stack, project description, etc. We used the Charts section to display the tezos transactions in charts. Users can choose the date that they want to see from the charts as well as the type of data (Transactions, Currency, Buyers, Sellers). There is also a second chart which presents the time of day when the transaction was committed, such as morning, afternoon, evening, night. The transactions section shows a list of the last 100 operations, including all connected data such as Source, Destination, and Timestamp. The previous section is the Live Chart which displays transactions and updates them every 10 seconds. We’ve built the chart in the form of colorful circles, with size depending on the number of transactions completed by each of the tezos.

Frontend data visualization app – List of transactions
Frontend data visualisation
Frontend data visualization app – Transactions charts

Frontend data visualisation

The Project

The tech stack we chose was React with Hooks, Redux-Saga, TypeScript, and React Chart.js for data visualization. We picked this stack because our teammates were familiar with the React library. For the communication between our frontend application and the server, we used ConseilJS RESTFUL API. Storing 100000 transactions on the frontend side was quite challenging, but we finally decided to store the data in LocalStorage and redux state. We wanted to focus on the UX/UI side to make sure our application is eye-catching and user-friendly, so no-one would have to run away, screaming, “my eyes are bleeding!”.

You can check out our project here: https://tezos-data-analytics-dashboard.scalac.io/

Live charts
Fronted Data visualization Live Charts

The Hackathon

Happy Hack Fest ! And may the odds be ever in your favor. 

We started at 9 AM on both Sunday and Saturday in our office. On day one, we split our work into roles. Damian was responsible for the Live Chart visualization, managing the redux state and request optimization, I was responsible for the Charts data visualization and Mariusz our leader was responsible for making the base of the application, transactions, server communication and making sure everything worked properly. We also joined the Cryptonomic channel via riot application so we could chat with other participants. 

The Experience

The hackathon was an enjoyable experience, one that I would recommend to everyone. It was a real pleasure to meet and work with such talented people. Sure, there was pressure and moments that we thought we wouldn’t make it, but we never gave up! Things I’ve learned from this experience are the experience itself, but also some new development knowledge as well as some blockchain stuff that might be very important and useful in the future. Thanks Cryptonomic for a great adventure!

During the hackathon, Scalac had two teams to develop their solutions. See how we made a Micheline Michelson translator.