Kafka consulting

How to write a (completely lock-free) concurrent LRU Cache with ZIO STM

Kafka consulting

Introduction

Writing concurrent data structures using traditional tools, like everything under java.util.concurrent, is generally a very complicated task. You really need to think about what you are doing so you avoid typical concurrency issues, such as deadlocks and race conditions.  And let’s be honest,  predicting all the possible scenarios that could arise is not just hard, but also sometimes infeasible.

Therefore, in this article, we are going to see how ZIO STM makes our lives a lot easier when it comes to writing concurrent data structures, such as a concurrent LRU Cache, in a completely lock-free fashion that is a lot simpler to reason about.

Requirements of an LRU Cache

A cache is a structure that stores data (which might be the result of an earlier computation or obtained from external sources such as databases) so that future requests for that data can be served faster.

Now, a Least Recently Used (LRU) Cache must fulfill these requirements:

  • Fixed capacity: This is for limiting memory usage.
  • Fast access: Insert and lookup operations should be executed in O(1) time.
  • An efficient eviction algorithm: The idea is that, when the cache capacity is reached and a new entry needs to be inserted into the cache, the Least Recently Used entry gets replaced.

More concretely, an LRU cache should support the two following operations:

  • get(key): Get the value of a given key if it exists in the cache, otherwise return an error.
  • put(key, value): Put the given (key, value) into the cache. When the cache reaches its capacity, it should evict the Least Recently Used entry before inserting a new one.

That means, for implementing an efficient LRU Cache (meaning get and put operations are executed in O(1) time), we could use two data structures:

  • Hash Map: containing (key, value) pairs.
  • Doubly linked list: which will contain the history of the referenced keys. The The Most Recently Used key will be at the start of the list, and the Least Recently Used  will be at the end.

In the following image, we can see an example of the status of an  LRU Cache (with a capacity of 4) at a given moment:

concurrent LRU Cache with ZIO

That means that the history of the referenced keys (1, 3, 2 and 4) shows that Key 1 is the Most Recently Used (because it’s at the start of the list), and that Key 4 is the Least Recently Used  (because it’s at the end of the list). So if a new item needs to be stored into the cache, Item 4 would have to be replaced.

Don’t forget to check out
7 ZIO experts share why they choose ZIO

Quick introduction to ZIO

According to the ZIO documentation page, ZIO is a library for “Type-safe, composable asynchronous and concurrent programming for Scala”. This means ZIO allows us to build applications that are:

  • Highly composable: Because ZIO is based on functional programming principles, such as using pure functions and immutable values, it allows us to easily compose solutions to complex problems from simple building blocks.
  • 100% asynchronous and non-blocking.
  • Highly performant and concurrent: ZIO implements Fiber-based concurrency. By the way, you can read more about ZIO Fibers in this really nice article written by Mateusz Sokół  here.
  • Type-safe: ZIO leverages the Scala Type System so it can catch more bugs at compile time.

The most important data type in ZIO (and also the basic building block of ZIO applications) is also called ZIO:

ZIO[-R, +E, +A]

The ZIO data type is called a functional effect, which means it is a lazy, immutable value which contains a description of a series of interactions with the outside world (database interactions, calling external APIs, etc.). A nice mental model of the ZIO data type is the following:

ZEnvironment[R] => Either[E, A]

That means a ZIO effect needs an environment of type ZEnvironment[R] to run (the environment could contain anything: a database connection, a REST client, a configuration object, etc.), and it can either fail with an error of type E or succeed with a value of type A.

Finally, it’s worth mentioning that ZIO provides some type aliases for the ZIO effect type which are very useful to represent some common use cases:

  • Task[+A] = ZIO[Any, Throwable, A]: This means a Task[A] is a ZIO effect that:
    • Doesn’t require an environment to run (that’s why the R type is replaced by Any, meaning the effect will run no matter what we provide to it as an environment)
    • Can fail with a Throwable
    • Can succeed with an A
  • UIO[+A] = ZIO[Any, Nothing, A]: This means a UIO[A] is a ZIO effect that:
    • Doesn’t require an environment to run.
    • Can’t fail
    • Can succeed with an A
  • RIO[-R, +A] = ZIO[R, Throwable, A]: This means a RIO[R, A] is a ZIO effect that:
    • Requires an environment R to run
    • Can fail with a Throwable
    • Can succeed with an A
  • IO[+E, +A] = ZIO[Any, E, A]: This means a IO[E, A] is a ZIO effect that:
    • Doesn’t require an environment to run.
    • Can fail with an E
    • Can succeed with an A
  • URIO[-R, +A] = ZIO[R, Nothing, A]: This means a URIO[R, A] is a ZIO effect that:
    • Requires an environment R to run
    • Can’t fail
    • Can succeed with an A

Implementing the LRU Cache with ZIO Ref

First, we need to add some ZIO dependencies to our build.sbt:

val scalaVer = "2.13.10"

val zioVersion = "2.0.13"

lazy val compileDependencies = Seq(
  "dev.zio" %% "zio"        % zioVersion,
  "dev.zio" %% "zio-macros" % zioVersion
) map (_ % Compile)

lazy val testDependencies = Seq(
  "dev.zio" %% "zio-test"     % zioVersion,
  "dev.zio" %% "zio-test-sbt" % zioVersion
) map (_ % Test)

lazy val settings = Seq(
  name := "zio-lru-cache",
  version := "2.0.0",
  scalaVersion := scalaVer,
  scalacOptions += "-Ymacro-annotations",
  libraryDependencies ++= compileDependencies ++ testDependencies,
  testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)

lazy val root = (project in file("."))
  .settings(settings)

Now, we can begin with the implementation of the LRU Cache. An initial model could be:

final class LRUCache[K, V](
  private val capacity: Int,
  private var items: Map[K, CacheItem[K, V]],
  private var start: Option[K],
  private var end: Option[K]
)

Now, the LRUCache should have:

  • A capacity (which should be a positive integer set on creation and shouldn’t change anymore, that’s why it’s modeled as a val).
  • A Map containing items, this will change all the time, which is why we are modeling this as a var. By the way, the model of a CacheItem would be like this:
final case class CacheItem[K, V](value: V, left: Option[K], right: Option[K])

This means each CacheItem should not just contain a value to be stored, but also references to the left and right keys in the history of referenced keys (remember we’ll use a doubly linked list for keeping a history of referenced keys). These are modeled as Option because, if an item is at the start of the history (meaning it’s the Most Recently Used item), there won’t be any item on its left. Something similar happens when an item is at the end of the history (meaning it’s the Least Recently Used item), there won’t be any item on its right.

  • References to the start and end keys, these will also change all the time, and that’s why they are vars.

There’s a problem with this implementation though: the fact we are resorting to vars. In functional programming, we should model everything as immutable values, and also using vars will make it harder to use the LRUCache in concurrent scenarios (using mutability in our applications instantly makes them prone to race conditions!).

So, what can we do? Well, ZIO has the answer! We can use its Ref[A] data type, which is a purely functional description of a mutable reference. The fundamental operations of a Ref are get and set, and both of them return ZIO effects which describe the operations of reading from and writing to the Ref.

Then, a better (and purely functional) version of our LRUCache would be:

import zio._

final case class LRUCacheRef[K, V](
  private val capacity: Int,
  private val itemsRef: Ref[Map[K, CacheItem[K, V]]],
  private val startRef: Ref[Option[K]],
  private val endRef: Ref[Option[K]]
)

 In addition, let’s define the interface LRUCacheRef which should implement:

import zio._
import zio.macros.accessible

@accessible
trait LRUCache[K, V] {
  def get(key: K): IO[NoSuchElementException, V]

  def put(key: K, value: V): UIO[Unit]

  def getStatus: UIO[(Map[K, CacheItem[K, V]], Option[K], Option[K])]
}

By the way, you can see we are using the @accessible annotation from the zio-macros library, which just generates the following accessor methods in the LRUCache companion object:

object LRUCache {
  def get[K, V](key: K): ZIO[LRUCache[K, V], NoSuchElementException, V]

  def put[K, V](key: K, value: V): URIO[LRUCache[K, V], Unit]

  def getStatus[K, V]: URIO[LRUCache[K, V], (Map[K, CacheItem[K, V]], Option[K], Option[K])]
}

If you want to understand more about accessors (and also ZLayers), you can take a look at the “Mastering Modularity in ZIO with ZLayer” ebook in the Scalac blog.

Now we can do the following:

  • Make the constructor private.
  • Make LRUCacheRef extend the LRUCache trait.
  • Create a ZLayer in the LRUCacheRef companion object that will contain the description of how to instantiate an  LRUCacheRef.
import zio._

final class LRUCacheRef[K, V] private (
  private val capacity: Int,
  private val itemsRef: Ref[Map[K, CacheItem[K, V]]],
  private val startRef: Ref[Option[K]],
  private val endRef: Ref[Option[K]]
) extends LRUCache[K, V]

object LRUCacheRef {
  def layer[K: Tag, V: Tag](capacity: Int): ULayer[LRUCache[K, V]] =

ZLayer {
      if (capacity > 0) {
        for {
          itemsRef <- Ref.make(Map.empty[K, CacheItem[K, V]])
          startRef <- Ref.make(Option.empty[K])
          endRef   <- Ref.make(Option.empty[K])
        } yield LRUCacheRef(capacity, itemsRef, startRef, endRef)
      } else ZIO.die(new IllegalArgumentException("Capacity must be a positive number!"))
    }
}

We can see the LRUCacheRef.layer method expects to receive a capacity, and it returns a ZLayer which can die with an IllegalArgumentException (when a non-positive capacity is provided) or can succeed with an LRUCache[K, V]. We also know that the LRUCacheRef constructor expects to receive not just the capacity, but also the initial values for itemsRef, startRef and endRef. For creating these Refs, we can use the Ref.make function, which receives the initial value for the Ref and returns a UIO[Ref[A]], and because ZIO effects are monads (meaning they have map and flatMap methods), we can combine the results of calling Ref.make using for-comprehension syntax, for yielding a new LRUCacheRef.

Next, we can implement the get and put methods for LRUCacheRef. Let’s start with the get method first:

def get(key: K): IO[NoSuchElementException, V] =
 for {
   items <- self.itemsRef.get
   item  <- ZIO.from(items.get(key)).orElseFail(new NoSuchElementException(s"Key does not exist: $key"))
   _     <- removeKeyFromList(key) *> addKeyToStartOfList(key)
 } yield item.value

As you can see, the method implementation looks really nice and simple: it’s practically just a description of what to do when getting an element from the cache:

  • Firstly, we need to get the items Map from the itemsRef
  • Next, we need to obtain the requested key from the items map. This key may or may not exist, in the case it exists, the flow just continues and, if it doesn’t, the method fails with a NoSuchElementException and the flow execution stops.
  • After the item is obtained from the Map, we need to update the history of referenced keys, because the requested key becomes the Most Recently Used . That’s why we need to call the auxiliary functions removeKeyFromList and addKeyToStartOfList.
  • Finally, the item value is returned.

Now, let’s see the put method implementation, again.  It’s really simple:

def put(key: K, value: V): UIO[Unit] =
  ZIO
    .ifZIO(self.itemsRef.get.map(_.contains(key)))(
      updateItem(key, value),
      addNewItem(key, value)
    )

We can see that the method checks whether the provided key is already present in the items map (again, we are accessing the Map calling the get method on itemsRef):

  • If the key is already present, the updateItem auxiliary function is called.
  • Otherwise, a new item is added, by calling the addNewItem auxiliary function.

Now we can take a look at some auxiliary functions (we won’t go into the details of every auxiliary function, for more details, you can take a look at the complete source code in the jorge-vasquez-2301/zio-lru-cache repository). First off, we have the removeKeyFromList function:

private def removeKeyFromList(key: K): UIO[Unit] =
  for {
    cacheItem      <- getExistingCacheItem(key)
    optionLeftKey  = cacheItem.left
    optionRightKey = cacheItem.right
    _ <- (optionLeftKey, optionRightKey) match {
          case (Some(l), Some(r)) =>
            updateLeftAndRightCacheItems(l, r)
          case (Some(l), None) =>
            setNewEnd(l)
          case (None, Some(r)) =>
            setNewStart(r)
          case (None, None) =>
            clearStartAndEnd
        }
  } yield ()

As you can see, the implementation is pretty straightforward, and it considers all the possible cases when removing a key from the history of referenced keys:

  • When the key to be removed has other keys to its left and right, the corresponding cache items have to be updated so they point to each other.
  • When the key to be removed has another key to its left, but not to its right, it means the key to be removed is at the end of the list, so the end has to be updated.
  • When the key to be removed has another key to its right, but not to its left, it means the key to be removed is at the start of the list, so the start has to be updated.
  • When the key to be removed has no keys to left nor right, that means the key to be removed is the only one, so the start and end references have to be cleared.

And here is the getExistingCacheItem function implementation:

private def getExistingCacheItem(key: K): UIO[CacheItem[K, V]] =
  self.itemsRef.get.map(_.get(key)).someOrElseZIO(ZIO.dieMessage(s"Key does not exist: $key, but it should!"))

This function is named like this because the idea is that, when we use it, we are expecting that the cache item we want to get exists. If the item does not exist, it means there’s some kind of problem, and we are signaling that with ZIO.dieMessage.

Another interesting function to look at is updateLeftAndRightCacheItems, because it shows a use case of ZIO Ref#update, which atomically modifies a Ref with the specified function.

private def updateLeftAndRightCacheItems(l: K, r: K): UIO[Unit] =
 for {
   leftCacheItem  <- getExistingCacheItem(l)
   rightCacheItem <- getExistingCacheItem(r)
   _              <- self.itemsRef.update(_.updated(l, leftCacheItem.copy(right = Some(r))))
   _              <- self.itemsRef.update(_.updated(r, rightCacheItem.copy(left = Some(l))))
 } yield ()

Finally, let’s take a look at the addKeyToStartOfList function, which is also pretty straightforward. Something to notice is we are using Ref#updateSome for updating the endRef value, just when it’s empty.

private def addKeyToStartOfList(key: K): UIO[Unit] =
 for {
   oldOptionStart <- self.startRef.get
   _ <- getExistingCacheItem(key).flatMap { cacheItem =>
         self.itemsRef.update(_.updated(key, cacheItem.copy(left = None, right = oldOptionStart)))
       }
   _ <- oldOptionStart match {
         case Some(oldStart) =>
           getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>
             self.itemsRef.update(_.updated(oldStart, oldStartCacheItem.copy(left = Some(key))))
           }
         case None => ZIO.unit
       }
   _ <- self.startRef.set(Some(key))
   _ <- self.endRef.updateSome { case None => Some(key) }
 } yield ()

Testing implementation with a single fiber

It’s time to put our LRUCacheRef under test! Firstly, we are going to test it under a single-fiber scenario. The testing code is the following (by the way, this testing code reflects the example shown on this link):

object UseLRUCacheRefWithOneFiber extends ZIOAppDefault {
  lazy val run: UIO[Unit] =
    (for {
      _ <- put(1, 1)
      _ <- put(2, 2)
      _ <- get(1)
      _ <- put(3, 3)
      _ <- get(2)
      _ <- put(4, 4)
      _ <- get(1)
      _ <- get(3)
      _ <- get(4)
    } yield ()).provideLayer(LRUCacheRef.layer(capacity = 2))

  private def get(key: Int): URIO[LRUCache[Int, Int], Unit] =
    (for {
      v <- Console.printLine(s"Getting key: $key") *> LRUCache.get[Int, Int](key)
      _ <- Console.printLine(s"Obtained value: $v")
    } yield ()).catchAll(ex => Console.printLine(ex.getMessage).orDie)

  private def put(key: Int, value: Int): URIO[LRUCache[Int, Int], Unit] =
    Console.printLine(s"Putting ($key, $value)").orDie *> LRUCache.put(key, value)
}

So now we are running the application with a LRUCacheRef, with a capacity of 2. After executing the above program, the following result is obtained:

Putting (1, 1)

Putting (2, 2)

Getting key: 1

Obtained value: 1

Putting (3, 3)

Getting key: 2

Key does not exist: 2

Putting (4, 4)

Getting key: 1

Key does not exist: 1

Getting key: 3

Obtained value: 3

Getting key: 4

Obtained value: 4

As we can see, the behavior is correct! That means our implementation looks good so far.

Testing implementation with multiple concurrent fibers

Now, let’s test the LRUCacheRef again, but against multiple concurrent fibers this time. The testing code is the following:

object UseLRUCacheWithMultipleFibers extends ZIOAppDefault {
 lazy val run =
   (for {
     fiberReporter  <- reporter.forever.fork
     fiberProducers <- startWorkers(producer)
     fiberConsumers <- startWorkers(consumer)
     _              <- Console.readLine.orDie *> (fiberReporter <*> fiberProducers <*> fiberConsumers).interrupt
   } yield ()).provideLayer(layer)

 lazy val layer = LRUCacheRef.layer[Int, Int](capacity = 3)

 def startWorkers(worker: URIO[LRUCache[Int, Int], Unit]) =
   ZIO.forkAll {
     ZIO.replicate(100) {
       worker.forever.catchAllCause(cause => Console.printLineError(cause.prettyPrint))
     }
   }

 lazy val producer: URIO[LRUCache[Int, Int], Unit] =
   for {
     number <- Random.nextIntBounded(100)
     _      <- Console.printLine(s"Producing ($number, $number)").orDie *> LRUCache.put(number, number)
   } yield ()

 lazy val consumer: URIO[LRUCache[Int, Int], Unit] =
   (for {
     key   <- Random.nextIntBounded(100)
     value <- Console.printLine(s"Consuming key: $key") *> LRUCache.get[Int, Int](key)
     _     <- Console.printLine(s"Consumed value: $value")
   } yield ()).catchAll(ex => Console.printLine(ex.getMessage).orDie)

 lazy val reporter: URIO[LRUCache[Int, Int], Unit] =
   for {
     status                          <- LRUCache.getStatus[Int, Int]
     (items, optionStart, optionEnd) = status
     _                               <- Console.printLine(s"Items: $items, Start: $optionStart, End: $optionEnd").orDie
   } yield ()
}

We can see that a layer which builds an LRUCacheRef with a capacity of 3 is provided. In addition, 100 producers and 100 consumers of random integers are started in different fibers, and we have a reporter that will just print to the console the cache current status (stored items, start and end keys of the recently used items history). When we execute this, some ugly stuff happens:

  • Firstly, more items than the defined capacity (a lot more) are being stored! And also, the stored items have a lot of inconsistencies. For example, you can see below that, for any given moment, the end key (the Least Recently Used key) is 97, but looking at the corresponding CacheItem we see it has other keys to its left and right (58 and 9 respectively), but… if 97 is at the end of the list, it shouldn’t have an item to its right! Besides this, there are a lot more  discrepancies between CacheItems:

Items: HashMap(5 -> CacheItem(5,Some(45),Some(6)), 84 -> CacheItem(84,Some(51),Some(91)), 69 -> CacheItem(69,Some(83),Some(36)), 0 -> CacheItem(0,None,Some(37)), 88 -> CacheItem(88,Some(82),Some(94)), 10 -> CacheItem(10,Some(37),Some(45)), 56 -> CacheItem(56,Some(54),Some(42)), 42 -> CacheItem(42,Some(6),Some(60)), 24 -> CacheItem(24,Some(30),Some(18)), 37 -> CacheItem(37,Some(0),Some(10)), 52 -> CacheItem(52,Some(70),Some(91)), 14 -> CacheItem(14,Some(72),Some(1)), 20 -> CacheItem(20,None,Some(46)), 46 -> CacheItem(46,Some(28),Some(70)), 93 -> CacheItem(93,Some(40),Some(6)), 57 -> CacheItem(57,Some(12),Some(45)), 78 -> CacheItem(78,None,Some(41)), 61 -> CacheItem(61,None,Some(26)), 1 -> CacheItem(1,Some(14),Some(2)), 74 -> CacheItem(74,None,Some(33)), 6 -> CacheItem(6,Some(5),Some(42)), 60 -> CacheItem(60,Some(42),Some(80)), 85 -> CacheItem(85,None,Some(99)), 70 -> CacheItem(70,Some(46),Some(52)), 21 -> CacheItem(21,None,Some(65)), 33 -> CacheItem(33,Some(77),Some(32)), 28 -> CacheItem(28,None,Some(46)), 38 -> CacheItem(38,Some(98),Some(68)), 92 -> CacheItem(92,Some(63),Some(0)), 65 -> CacheItem(65,Some(21),Some(51)), 97 -> <strong>CacheItem(97,Some(58),Some(9))</strong>, 9 -> CacheItem(9,Some(97),Some(99)), 53 -> CacheItem(53,None,Some(91)), 77 -> CacheItem(77,Some(27),Some(33)), 96 -> CacheItem(96,Some(3),Some(58)), 13 -> CacheItem(13,Some(14),Some(28)), 41 -> CacheItem(41,Some(78),Some(90)), 73 -> CacheItem(73,None,Some(41)), 2 -> CacheItem(2,Some(1),Some(92)), 32 -> CacheItem(32,Some(33),Some(98)), 45 -> CacheItem(45,Some(10),Some(5)), 64 -> CacheItem(64,None,Some(34)), 17 -> CacheItem(17,None,Some(35)), 22 -> CacheItem(22,None,Some(7)), 44 -> CacheItem(44,Some(79),Some(92)), 59 -> CacheItem(59,Some(15),Some(68)), 27 -> CacheItem(27,Some(4),Some(77)), 71 -> CacheItem(71,Some(46),Some(19)), 12 -> CacheItem(12,Some(75),Some(57)), 54 -> CacheItem(54,None,Some(56)), 49 -> CacheItem(49,None,Some(63)), 86 -> CacheItem(86,None,Some(43)), 81 -> CacheItem(81,Some(98),Some(1)), 76 -> CacheItem(76,None,Some(35)), 7 -> CacheItem(7,Some(22),Some(33)), 39 -> CacheItem(39,None,Some(4)), 98 -> CacheItem(98,Some(32),Some(81)), 91 -> CacheItem(91,Some(52),Some(75)), 66 -> CacheItem(66,None,Some(27)), 3 -> CacheItem(3,Some(94),Some(96)), 80 -> CacheItem(80,Some(60),Some(84)), 48 -> CacheItem(48,None,Some(9)), 63 -> CacheItem(63,Some(49),Some(3)), 18 -> CacheItem(18,Some(24),Some(26)), 95 -> CacheItem(95,None,Some(65)), 50 -> CacheItem(50,Some(68),Some(58)), 67 -> CacheItem(67,None,Some(21)), 16 -> CacheItem(16,None,Some(82)), 11 -> CacheItem(11,Some(5),Some(73)), 72 -> CacheItem(72,Some(99),Some(14)), 43 -> CacheItem(43,Some(86),Some(3)), 99 -> CacheItem(99,Some(9),Some(72)), 87 -> CacheItem(87,Some(36),Some(46)), 40 -> CacheItem(40,Some(11),Some(93)), 26 -> CacheItem(26,Some(18),Some(16)), 8 -> CacheItem(8,Some(3),Some(0)), 75 -> CacheItem(75,Some(91),Some(12)), 58 -> CacheItem(58,Some(96),Some(97)), 82 -> CacheItem(82,Some(16),Some(88)), 36 -> CacheItem(36,Some(69),Some(87)), 30 -> CacheItem(30,Some(11),Some(24)), 51 -> CacheItem(51,Some(65),Some(84)), 19 -> CacheItem(19,None,Some(83)), 4 -> CacheItem(4,Some(62),Some(27)), 79 -> CacheItem(79,None,Some(44)), 94 -> CacheItem(94,Some(88),Some(3)), 47 -> CacheItem(47,Some(35),Some(37)), 15 -> CacheItem(15,Some(68),Some(59)), 68 -> CacheItem(68,Some(38),Some(50)), 62 -> CacheItem(62,None,Some(4)), 90 -> CacheItem(90,Some(41),Some(33)), 83 -> CacheItem(83,Some(19),Some(69))), Start: Some(16), <strong>End: Some(97)</strong>

  • On top of that, because of the issues mentioned above, we see fibers dying because of unexpected errors. For example:

Exception in thread "zio-fiber-102" java.lang.RuntimeException: Key does not exist: 54, but it should!

at com.example.cache.LRUCacheRef.getExistingCacheItem(LRUCacheRef.scala:107)

at com.example.cache.LRUCacheRef.removeKeyFromList(LRUCacheRef.scala:69)

at com.example.cache.LRUCacheRef.replaceEndCacheItem(LRUCacheRef.scala:47)

at com.example.UseLRUCacheRefWithMultipleFibers.producer(Main.scala:46)

at com.example.UseLRUCacheRefWithMultipleFibers.run(Main.scala:36)

at com.example.UseLRUCacheRefWithMultipleFibers.run(Main.scala:34)

And so, it seems our current LRUCacheRef implementation just works correctly in a single-fiber scenario, but not in a concurrent scenario, which is really bad. So now, let’s reflect on what’s happening.

Why doesn’t our implementation work in a concurrent scenario?

It may seem weird that our current implementation is not working as expected when multiple fibers use it concurrently. We have used immutable values everywhere, pure functions, purely functional mutable references (Ref[A]) that provide atomic operations on them. But wait a moment, Ref[A] provides atomic operations on SINGLE VALUES, but what happens if we need to keep consistency across MULTIPLE VALUES? Remember that in our LRUCacheRef implementation, we have three Refs: itemsRef, startRef and endRef. So it seems using Ref[A] is not a powerful enough solution for our use case:

  • Refs allow atomic operations on single values only.
  • Refs don’t compose! So you can’t compose two Refs to get a resulting Ref.

So what can we do now?

Enter ZIO STM!

The solution to our problem is to use ZIO STM (Software Transactional Memory). For that, ZIO provides two basic data types:

  • ZSTM[-R, +E, +A]: Represents an effect that can be performed transactionally, that requires an environment ZEnvironment[R] to run and that may fail with an error E or succeed with a value A. Also, ZSTM has type aliases very similar to those for ZIO:
TaskSTM[+A]   = ZSTM[Any, Throwable, A]
USTM[+A]      = ZSTM[Any, Nothing, A]
RSTM[-R, +A]  = ZSTM[R, Throwable, A]
STM[+E, +A]   = ZSTM[Any, E, A]
URSTM[-R, +A] = ZSTM[R, Nothing, A]
  • TRef[A]: Represents a Transactional Ref, meaning a purely functional mutable reference that can be used in the context of a transaction.

Therefore, basically, a ZSTM describes a bunch of operations across several TRefs.

Important things to note:

  • ZSTMs are composable (we can use them in for-comprehensions!)
  • All those methods in TRef are very similar to those in Ref, but they return ZSTM effects instead of ZIO effects.
  • To convert a ZSTM effect to a ZIO effect, you need to commit the transaction. When you commit a transaction, all of its operations are performed in an atomic, consistent and isolated fashion, very similar to how relational databases work.

It’s also worth mentioning that we could use other classic concurrency structures from java.util.concurrent such as Locks and Semaphores for solving concurrency issues.  However, that’s really complicated, low-level and error prone, and race conditions or deadlocks are very likely to happen. Instead, ZIO STM replaces all of this low-level machinery with a high-level concept: transactions in memory, and we have no race conditions and no deadlocks!

Finally, ZIO STM provides other nice data structures that can participate in transactions (all of them are based in TRef):

  • TArray
  • TDequeue
  • TEnqueue
  • THub
  • TMap
  • TPriorityQueue
  • TPromise
  • TQueue
  • TRandom
  • TReentrantLock
  • TRef
  • TSemaphore
  • TSet

Our LRU Cache goes concurrent! Moving from Ref to TRef

TThe concurrent version of our LRUCache will be very similar to what we had before, but we are going to make some changes to use ZIO STM:

import zio._
import zio.stm._

final case class LRUCacheSTM[K, V] private (
  private val capacity: Int,
  private val items: TMap[K, CacheItem[K, V]],
  private val startRef: TRef[Option[K]],
  private val endRef: TRef[Option[K]]
) extends LRUCache[K, V]

As you can see, we are changing Refs to TRefs, and instead of having items: TRef[Map[K, CacheItem[K, V]]], we are using the more convenient and efficient TMap data type that ZIO STM provides.
The LRUCacheSTM.layer method will also be very similar to the  LRUCacheRef.layer:

object LRUCacheSTM {
  def layer[K: Tag, V: Tag](capacity: Int): ULayer[LRUCache[K, V]] =
    ZLayer {
      if (capacity > 0) {
        (for {
          itemsRef <- TMap.empty[K, CacheItem[K, V]]
          startRef <- TRef.make(Option.empty[K])
          endRef   <- TRef.make(Option.empty[K])
        } yield LRUCacheSTM[K, V](capacity, itemsRef, startRef, endRef)).commit
      } else ZIO.die(new IllegalArgumentException("Capacity must be a positive number!"))
    }
}

The biggest difference in this method is that the for-comprehension returns a ZSTM effect, and we need to commit it to get a ZIO effect, which is what we want to return.

Next, we have the get and put methods for the LRUCacheSTM:

def get(key: K): IO[NoSuchElementException, V] =
  (for {
    optionItem <- self.items.get(key)
    item       <- STM.fromOption(optionItem).mapError(_ => new NoSuchElementException(s"Key does not exist: $key"))

  _          <- removeKeyFromList(key) *> addKeyToStartOfList(key)
  } yield item.value).commitEither

def put(key: K, value: V): UIO[Unit] =
  STM.ifSTM(self.items.contains(key))(updateItem(key, value), addNewItem(key, value)).commitEither

Again, you can see these methods are very similar to the ones we had before! The only difference is that the for-comprehensions in both methods return values of type ZSTM, so we need to commit the transactions (we are using commitEither in this case, so transactions are always committed despite errors, and failures are handled at the ZIO level).

Now, we can take a look at the same auxiliary functions we’ve seen before, but this time with ZIO STM. Firstly, we have the removeKeyFromList function:

private def removeKeyFromList(key: K): USTM[Unit] =
  for {
    cacheItem      <- getExistingCacheItem(key)
    optionLeftKey  = cacheItem.left
    optionRightKey = cacheItem.right
    _ <- (optionLeftKey, optionRightKey) match {
          case (Some(l), Some(r)) =>
            updateLeftAndRightCacheItems(l, r)
          case (Some(l), None) =>
            setNewEnd(l)
          case (None, Some(r)) =>
            setNewStart(r)
          case (None, None) =>
            clearStartAndEnd
        }
  } yield ()

As you may realize, the implementation is practically the same! The difference is that the function is returning a ZSTM effect instead of a ZIO effect. In this case (and the same happens for all private methods) we are not committing the transaction yet, that’s because we want to use these private functions in combination with others, to form bigger transactions that are committed in the get and put methods.

And so here is the getExistingCacheItem function implementation.  Again, it’s very similar to the one we had before, but now a ZSTM effect is returned, and also getting an element from the items Map is easier now, thanks to TMap:

pprivate def getExistingCacheItem(key: K): USTM[CacheItem[K, V]] =
  self.items.get(key).someOrElseSTM(STM.dieMessage(s"Key $key does not exist, but it should!"))

And for updateLeftAndRightCacheItems, putting elements into the items Map is a lot easier now too:

private def updateLeftAndRightCacheItems(l: K, r: K): USTM[Unit] =
  for {
    leftCacheItem  <- getExistingCacheItem(l)
    rightCacheItem <- getExistingCacheItem(r)
    _              <- self.items.put(l, leftCacheItem.copy(right = Some(r)))
    _              <- self.items.put(r, rightCacheItem.copy(left = Some(l)))
  } yield ()

We also have addKeyToStartOfList, which again is very similar to the previous version:

private def addKeyToStartOfList(key: K): USTM[Unit] =
  for {
    oldOptionStart <- self.startRef.get
    _ <- getExistingCacheItem(key).flatMap { cacheItem =>
          self.items.put(key, cacheItem.copy(left = None, right = oldOptionStart))
        }
    _ <- oldOptionStart match {
          case Some(oldStart) =>
            getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>
              self.items.put(oldStart, oldStartCacheItem.copy(left = Some(key)))
            }
          case None => STM.unit
        }
    _ <- self.startRef.set(Some(key))
    _ <- self.endRef.updateSome { case None => Some(key) }
  } yield ()

If you’re looking for more beginner content, see:
Introduction to Programming with ZIO Functional Effects
Introducción a la Programación con Efectos Funcionales usando ZIO

Testing implementation with multiple fibers

Now that we have our LRUCacheSTM, let’s put it under test with the testing code we already have. The only difference is that we will provide an LRUCacheSTM.layer now:

object UseLRUCacheWithMultipleFibers extends ZIOAppDefault {
 lazy val run = …

 lazy val layer = LRUCacheSTM.layer[Int, Int](capacity = 3)

 …
}

When we run this, everything works as it should! (and the best part is, we didn’t need to use Locks at all!) No more unexpected errors happen, and the reporter shows our cache keeps internal consistency. Here is an example of what is printed to console for two executions of the reporter:

Items: Map(43 -> CacheItem(43,Some(16),None), 16 -> CacheItem(16,Some(32),Some(43)), 32 -> CacheItem(32,None,Some(16))), Start: Some(32), End: Some(43)<br>Items: Map(30 -> CacheItem(30,None,Some(69)), 53 -> CacheItem(53,Some(69),None), 69 -> CacheItem(69,Some(30),Some(53))), Start: Some(30), End: Some(53)

Writing unit tests for the Concurrent LRU Cache using ZIO Test

Finally, we can write some unit tests for our LRUCacheSTM using zio-test:

import zio._
import zio.test._

object LRUCacheSTMSpec extends ZIOSpecDefault {
 def spec =
   suite("LRUCacheSTM")(
     test("can't be created with non-positive capacity") {
       for {
         result <- LRUCache
                     .put(1, 1)
                     .provideLayer(LRUCacheSTM.layer(-2))
                     .absorb
                     .either
                     .left
                     .map(_.getMessage)
       } yield assertTrue(result == "Capacity must be a positive number!")
     },
     test("works as expected") {
       val expectedOutput = Vector(
         "Putting (1, 1)\n",
         "Putting (2, 2)\n",
         "Getting key: 1\n",
         "Obtained value: 1\n",
         "Putting (3, 3)\n",
         "Getting key: 2\n",
         "Key does not exist: 2\n",
         "Putting (4, 4)\n",
         "Getting key: 1\n",
         "Key does not exist: 1\n",
         "Getting key: 3\n",
         "Obtained value: 3\n",
         "Getting key: 4\n",
         "Obtained value: 4\n"
       )

       for {
         _      <- put(1, 1)
         _      <- put(2, 2)
         _      <- get(1)
         _      <- put(3, 3)
         _      <- get(2)
         _      <- put(4, 4)
         _      <- get(1)
         _      <- get(3)
         _      <- get(4)
         output <- TestConsole.output
       } yield assertTrue(output == expectedOutput)
     }.provideLayer(LRUCacheSTM.layer[Int, Int](2))
   ) @@ TestAspect.silent

 private def get(key: Int): URIO[LRUCache[Int, Int], Unit] =
   (for {
     v <- Console.printLine(s"Getting key: $key") *> LRUCache.get[Int, Int](key)
     _ <- Console.printLine(s"Obtained value: $v")
   } yield ()).catchAll(ex => Console.printLine(ex.getMessage).orDie)

 private def put(key: Int, value: Int): URIO[LRUCache[Int, Int], Unit] =
   Console.printLine(s"Putting ($key, $value)").orDie *> LRUCache.put(key, value)
}

The first test is for asserting that trying to create an LRUCacheSTM with a non-positive capacity would result in a failure.

The second test is for asserting that the cache works as expected. For that we use the TestConsole module provided by zio-test, for asserting that the expected messages are printed to the console. 

I won’t go into more details about how zio-test works, but you can read about it on the ZIO documentation page.

Summary

In this article, we’ve seen a concrete example of writing concurrent data structures with ZIO: a concurrent LRU cache, and because ZIO is based on functional programming principles, such as using pure functions and immutable values, it was really easy and painless to evolve our initial implementation – which didn’t support concurrency and was based on Ref – to a fully concurrent version, based on TRef from ZIO STM. This is without all the complicated stuff that comes when using lower-level concurrency structures such as Locks, and with no deadlocks or race conditions at all.

In addition, this was just a very specific example of what you can do with ZIO STM for writing concurrent data structures, so there’s a lot more you can do with it in your own projects, in a totally async, non-blocking and thread-safe fashion. So make sure to give it a try!

References


Read more

Download e-book:

Scalac Case Study Book

Download now

Authors

Jorge Vasquez

I'm a software developer, mostly focused on the backend. I've had the chance to work with several technologies and programming languages across different industries, such as Telco, AdTech, and Online Education. I'm always looking to improve my skills, finding new and better ways to solve business problems. I love functional programming, I'm convinced it can help to make better software, and I'm excited about new libraries like ZIO that are making Scala FP more accessible to developers. Besides programming, I enjoy photography, and I'm trying to get better at it.

Latest Blogposts

04.04.2024 / By  Aleksander Rainko

Scala 3 Data Transformation Library: ducktape 0.2.0.

Scala 3 Data Transformation Library: Ducktape 2.0

Introduction: Is ducktape still all duct tape under the hood? Or, why are macros so cool that I’m basically rewriting it for the third time? Before I go off talking about the insides of the library, let’s first touch base on what ducktape actually is, its Github page describes it as this: Automatic and customizable […]

28.03.2024 / By  Matylda Kamińska

Scalendar April 2024

scala conferences april 2024

Event-driven Newsletter Another month full of packed events, not only around Scala conferences in April 2024 but also Frontend Development, and Software Architecture—all set to give you a treasure trove of learning and networking opportunities. There’re online and real-world events that you can join in order to meet colleagues and experts from all over the […]

14.03.2024 / By  Dawid Jóźwiak

Implementing cloud VPN solution using AWS, Linux and WireGuard

Implementing cloud VPN solution using AWS, Linux and WireGuard

What is a VPN, and why is it important? A Virtual Private Network, or VPN in short, is a tunnel which handles all the internet data sent and received between Point A (typically an end-user) and Point B (application, server, or another end-user). This is done with security and privacy in mind, because it effectively […]

software product development

Need a successful project?

Estimate project