It is inevitable that COVID-19 will disturb not only health but also the economy.

Companies need to lock their businesses down and big decisions are being put aside while waiting for better times to come. However, putting the brakes on some projects may not be the way to go, since no one knows when the dust will settle down. 

The coronavirus outbreak is just a recent example, but any crisis is a potential threat to a project if the concerned parties are not prepared for persevering.

The crisis doesn’t have to fully disturb work, though. While we can’t forget about what’s happening all around us, we’re still able to accomplish tasks and take on new projects with the help of a few adjustments.

One of them is remote work. 

Today, we’ll talk a bit more about remote working and why it might save your business during these turbulent times, even if you assume otherwise.

What are some issues with running a business in challenging times?

KPMG have identified a few common contractual issues for businesses in troubling times, which you can see below. 

Source: KPMG

While all of the aforementioned issues could have created problems a few years ago, they can be easily solved via online, remote management of the projects today.

How does remote working deal with these issues?

Well, it can probably solve them all. For many businesses, a lack of work on site definitely leads to problems with contracts, but it does not apply to technology.

While there is still plenty of uncertainty going forward, the work must go on and work still needs to be done. Even those businesses that are on complete lockdown need to handle the situation somehow. The environment is challenging, but companies must do their best to run their business as usual. The same should apply to internal and external collaborations. 

Being prepared to cater for this new environment is not a choice any longer, especially when nobody knows when the turbulent times will end and business will return to normal. 

If work progresses as usual, except for being managed and reported remotely, then there’s no reason to worry about signing, terminating or breaching contracts. 

Today shows why this is, but should not be, a challenge for many. 

Those who had already adapted to remote working don’t necessarily win today, but more importantly they don’t lose. 

Work can’t stop

It’s understandable that the crisis may dampen the mood for launching, investing or spending resources. Of course, some businesses that are under financial pressure might face problems when it comes to upholding contracts and continuing work. However, if nothing except for the method of working and the economical lockdown changes, then putting technological projects on hold is very often not necessary. Actually, in certain cases, doing so could even be considered irresponsible

Any projects that were launched before the outbreak are likely to be continued now, so those that were scheduled to launch and are ready to go shouldn’t be put on hold either. 

What’s more, and what might be a tad controversial: this may be a really good time to work on certain projects while the World is slowing down a bit, especially if nothing really changes except for the type of work. This is where the following question arises:  how to make it work?

Finding the right partner to run a project is crucial. Nearly everyone is working remotely nowadays, but not many software houses know how to properly sort out their remote workflows. 

Change your approach

Facing the crisis isn’t a perfect situation for either side of a project and the atmosphere can be tense, it’s true. What’s most important is to stay safe, but also to be proactive, productive and keen to find workaround ways to deliver. 

Many businesses facing an external crisis need to quickly adjust to remote working.

Regardless of whether or not they have previously been remote working evangelists, they’re often now being thrown headfirst into a brand new situation that they need to handle.

There’s a hurtful opinion that remote working means a lack of productivity and missing deadlines. The problem doesn’t lie with remote working itself, but rather with business partners who don’t live up to expectations and put more groups off the idea as a result.

It’s a test for collaboration

While external collaboration (with software houses or agencies) is important, the crisis situation also puts internal collaboration to the test. This is another reason why it’s better to persevere on projects with those for whom remote working is their bread and butter. They may share their best practices and help you survive the transition to remote working, while maintaining the project stays up and running. 

It won’t happen overnight. First of all, it takes a lot of work and understanding.

Long story short: being surrounded by business partners who know the nature of remote working can save a whole project.

Keep growing, regardless

In turbulent times, you need to find a way of running your projects and also evaluating any potential risks. Of course you may be concerned about how the whole situation could affect your business, but to survive and be successful you need to grow and, eventually, scale.

If your team is not ready for this, your first thought might be to hire more people to accomplish some tasks. But how can you safely hire someone and conduct appropriate onboarding now, when you have been thrown in at the deep end in terms of working remotely and trying to sort out project management?

Well, you should think about contracting a software development company that is able to complete the project for you. That way, it can be done without you hiring extra people, or spending time on the searching, screening and onboarding process that go with it. Not to mention the additional never-ending costs that you would eventually have to cover.

With contractors, you don’t have such strings attached: if all goes well, you can scale the collaboration. If not, it’s fine to  part ways. What’s more, contracting can be much cheaper than hiring someone full-time, since you will hand a whole project over instead of providing benefits and business expenses for a particular employee. 

Contractors can therefore help you grow and adjust your activities to any current situation,despite reducing costs. So why not jump on a call and talk about it?

To wrap-up

The outbreak may stop some business for a while, and it will probably have an impact on operations. Does this mean that you need to stop too? Not if you don’t have a valid reason to do so, no. Running a business as usual may be one step too far, but if your team has the capabilities to work remotely then make the most of it. If not, provide them with the necessary tools to make remote working easier. In terms of external collaboration, choose those companies that know how to organize their work remotely and have already been doing so for quite a while. 

There are a lot of things to worry about right now, but remote working shouldn’t be one of them. Not with us, anyway.

Do you want to learn tactics, strategies, and tips on Managing Remote Employees?

Get our Ebook for free!

If you like this article, check out also:

Writing concurrent data structures using traditional tools – just like everything else under java.util.concurrent – is generally very complicated. You need to really think about what you are doing to avoid typical concurrency issues, such as deadlocks and race conditions.  And let’s be honest, thinking about all the possible scenarios that could arise is not just hard, but also sometimes infeasible.

So, in this article, we are going to see how ZIO STM can make our lives a lot easier when it comes to writing concurrent data structures – such as a concurrent LRU Cache – in a completely lock-free fashion that is a lot simpler to reason about.

Requirements of an LRU Cache

A cache is a structure that stores data (which might be the result of an earlier computation or obtained from external sources such as databases) so that future requests for this data can be served faster.

Now, a Least Recently Used (LRU) Cache must fulfill these requirements:

  • Fixed capacity: This is for limiting memory usage.
  • Fast access: Insert and lookup operations should be executed in O(1) time.
  • An efficient eviction algorithm: The idea is that, when the cache capacity is reached and a new entry needs to be inserted into the cache, the Least Recently Used entry gets replaced.

More concretely, an LRU cache should support these two operations:

  • get(key): Get the value of a given key if it exists in the cache, otherwise return an error.
  • put(key, value): Put the given (key, value) into the cache. When the cache reaches its capacity, it should evict the Least Recently Used entry before inserting a new one.

So, for implementing an efficient LRU Cache (meaning get and put operations are executed in O(1) time), we could use two data structures:

  • Hash Map: containing (key, value) pairs.
  • Doubly linked list: which will contain the history of referenced keys. The Most Recently Used key will be at the start of the list, and the Least Recently Used one will be at the end.

In the following image, we can see an example of the status of a LRU Cache (with capacity of 4) at a given moment:

Example of the status of a LRU Cache

So, the history of referenced keys (1, 3, 2 and 4) shows that Key 1 is the Most Recently Used one (because it’s at the start of the list), and that Key 4 is the Least Recently Used one (because it’s at the end of the list). So, if a new item needs to be stored into the cache, Item 4 would have to be replaced.

Quick introduction to ZIO

According to the ZIO documentation page, ZIO is a library for “Type-safe, composable asynchronous and concurrent programming for Scala”. This means ZIO allows us to build applications that are:

  • Highly composable: Because ZIO is based on functional programming principles, such as using pure functions and immutable values, it allows us to easily compose solutions to complex problems from simple building blocks.
  • 100% asynchronous and non-blocking.
  • Highly performant and concurrent: ZIO implements Fiber-based concurrency, and by the way, you can read more about ZIO Fibers in this really nice article written by Mateusz Sokół here.
  • Type-safe: ZIO leverages the Scala Type System so it can catch more bugs at compile time.

The most important data type in ZIO (and also the basic building block of ZIO applications) is also called ZIO:

ZIO[-R, +E, +A]

The ZIO data type is called a functional effect, which means it is a lazy, immutable value that contains a description of a series of interactions with the outside world (database interactions, calling external APIs, etc.). A nice mental model of the ZIO data type is the following:

R => Either[E, A]

This means that a ZIO effect needs an environment of type R to run (the environment could be anything: a database connection, a REST client, a configuration object, etc.), and it can either fail with an error of type E or succeed with a value of type A.

Finally, it’s worth mentioning that ZIO provides some type aliases for the ZIO effect type which are very useful in representing some common use cases:

  • Task[+A] = ZIO[Any, Throwable, A]: This means a Task[A]is a ZIO effect that:
    • Doesn’t require an environment to run (that’s why the R type is replaced by Any, meaning the effect will run no matter what we provide to it as environment)
    • Can fail with a Throwable
    • Can succeed with an A
  • UIO[+A] = ZIO[Any, Nothing, A]: This means a UIO[A] is a ZIO effect that:
    • Doesn’t require an environment to run.
    • Can’t fail
    • Can succeed with an A
  • RIO[-R, +A] = ZIO[R, Throwable, A]: This means a RIO[R, A] is a ZIO effect that:
    • Requires an environment R to run
    • Can fail with a Throwable
    • Can succeed with an A
  • IO[+E, +A] = ZIO[Any, E, A]: This means a IO[E, A] is a ZIO effect that:
    • Doesn’t require an environment to run.
    • Can fail with an E
    • Can succeed with an A
  • URIO[-R, +A] = ZIO[R, Nothing, A]: This means a URIO[R, A] is a ZIO effect that:
    • Requires an environment R to run
    • Can’t fail
    • Can succeed with an A

Implementing the LRU Cache with ZIO Ref

First, we need to add some ZIO dependencies to our build.sbt:

val zioVersion = "1.0.0-RC18-2"

lazy val compileDependencies = Seq(
  "dev.zio" %% "zio" % zioVersion
) map (_ % Compile)

lazy val testDependencies = Seq(
  "dev.zio" %% "zio-test"     % zioVersion,
  "dev.zio" %% "zio-test-sbt" % zioVersion
) map (_ % Test)
JavaScript

Now, we can begin with the implementation of the LRU Cache. An initial model could be:

final class LRUCache[K, V](
  private val capacity: Int,
  private var items: Map[K, CacheItem[K, V]],
  private var start: Option[K],
  private var end: Option[K]
)
JavaScript

So, the LRUCache should have:

  • A capacity (which should be a positive integer set on creation and shouldn’t change anymore, which is why it’s modeled as a val).
  • A Map containing items, this will change all the time, which is why we are modeling this as a var. By the way, the model of a CacheItem would be like this:

final case class CacheItem[K, V](value: V, left: Option[K], right: Option[K])

This means that each CacheItem should not just contain a value to be stored, but also references to the left and right keys in the history of referenced keys (remember we’ll use a doubly linked list for keeping a history of referenced keys). These are modeled as Options because, if an item is at the start of the history (meaning it’s the Most Recently Used item), there won’t be any item on its left. Something similar happens when an item is at the end of the history (meaning it’s the Least Recently Used item), there won’t be any item on its right.

  • References to the start and end keys, these will also change all the time, and that’s why they are vars.

There’s a problem with this implementation though: the fact we are resorting to vars. In functional programming, we should model everything as immutable values, and also using vars will make it harder to use the LRUCache in concurrent scenarios (using mutability in our applications instantly makes them prone to race conditions!).

So, what can we do? Well, ZIO has the answer! We can use its Ref[A]data type, which is a purely functional description of a mutable reference. The fundamental operations of a Ref are get and set, and both of them return ZIO effects which describe the operations of reading from and writing to the Ref.

Then, a better (and purely functional) version of our LRUCache would be:

final class LRUCache[K, V](

  private val capacity: Int,

  private val itemsRef: Ref[Map[K, CacheItem[K, V]]],

  private val startRef: Ref[Option[K]],

  private val endRef: Ref[Option[K]]

)
JavaScript

Now, we can make the constructor private, and create a smart constructor in the companion object:

final class LRUCache[K, V] private (

  private val capacity: Int,

  private val itemsRef: Ref[Map[K, CacheItem[K, V]]],

  private val startRef: Ref[Option[K]],

  private val endRef: Ref[Option[K]]

)

object LRUCache {

  def make[K, V](capacity: Int): IO[IllegalArgumentException, LRUCache[K, V]] =

    if (capacity > 0) {

      for {

        itemsRef <- Ref.make(Map.empty[K, CacheItem[K, V]])

        startRef <- Ref.make(Option.empty[K])

        endRef   <- Ref.make(Option.empty[K])

      } yield new LRUCache[K, V](capacity, itemsRef, startRef, endRef)

    } else {

      ZIO.fail(new IllegalArgumentException("Capacity must be a positive number!"))

    }

}
JavaScript

The make function is our smart constructor, and we can see it expects to receive a capacity, and it returns an effect which can fail with an IllegalArgumentException (when a non-positive capacity is provided) or can succeed with an LRUCache[K, V]. We also know that the LRUCache constructor expects to receive not just the capacity, but also the initial values for itemsRef, startRef and endRef. For creating these Refs, we can use the Ref.make function, which receives the initial value for the Ref and returns a UIO[Ref[A]]. And because ZIO effects are monads (meaning they have map and flatMap methods), we can combine the results of calling Ref.make using for-comprehension syntax, for yielding a new LRUCache.

Now, we can implement the get and put methods for the LRUCache. Let’s start with the get method first:

def get(key: K): IO[NoSuchElementException, V] =

    (for {

      items <- self.itemsRef.get

      item  <- ZIO.fromOption(items.get(key)).mapError(_ => new NoSuchElementException(s"Key does not exist: $key"))

      _     <- removeKeyFromList(key) *> addKeyToStartOfList(key)

    } yield item.value).refineToOrDie[NoSuchElementException]
JavaScript

As you can see, the method implementation looks really nice and simple: it’s practically just a description of what to do when getting an element from the cache:

  • Firstly, we need to get the items Map from the itemsRef
  • Next, we need to obtain the requested key from the items map. This key may or not exist, if it does exist the flow just continues and, if it doesn’t, the method fails with a NoSuchElementException and the flow execution stops.
  • After the item is obtained from the Map, we need to update the history of referenced keys, because the requested key becomes the Most Recently Used one. That’s why we need to call the auxiliary functions removeKeyFromList and addKeyToStartOfList.
  • Finally, the item value is returned, and the error type is refined to be just NoSuchElementException (this is the only error we are expecting to happen and that should be handled when calling get). Any other errors should make the fiber execution die because they are bugs that need to be exposed at execution time and fixed.

Now, let’s see the put method implementation. Again it’s really simple:

def put(key: K, value: V): UIO[Unit] =

    (for {

      optionStart <- self.startRef.get

      optionEnd   <- self.endRef.get

      _ <- ZIO.ifM(self.itemsRef.get.map(_.contains(key)))(

            updateItem(key, value),

            addNewItem(key, value, optionStart, optionEnd)

          )

    } yield ()).orDie
JavaScript

We can see that:

  • The method checks whether the provided key is already present in the items map (again, we are accessing the Map calling the get method on itemsRef):
    • If the key is already present, the updateItem auxiliary function is called.
    • Otherwise, a new item is added, by calling the addNewItem auxiliary function.
  • Finally, the method just yields a unit value and dies in the case of an error. This is because this method should never fail, otherwise there’s a bug that needs to be exposed at runtime and fixed.

Now we can take a look at some auxiliary functions (we won’t go into the details of every auxiliary function, for more details you can take a look at the complete source code in the jorge-vasquez-2301/zio-lru-cache repository). First up, we have the removeKeyFromList function:

private def removeKeyFromList(key: K): IO[Error, Unit] =

    for {

      cacheItem      <- getExistingCacheItem(key)

      optionLeftKey  = cacheItem.left

      optionRightKey = cacheItem.right

      _ <- (optionLeftKey, optionRightKey) match {

            case (Some(l), Some(r)) =>

              updateLeftAndRightCacheItems(l, r)

            case (Some(l), None) =>

              setNewEnd(l)

            case (None, Some(r)) =>

              setNewStart(r)

            case (None, None) =>

              clearStartAndEnd

          }

    } yield ()
JavaScript

As you can see, the implementation is pretty straightforward, and it considers all the possible cases when removing a key from the history of referenced keys:

  • When the key to be removed has other keys to its left and right, the corresponding cache items have to be updated so they point to each other.
  • When the key to be removed has another key to its left, but not to its right, it means the key to be removed is at the end of the list, so the end has to be updated.
  • When the key to be removed has another key to its right, but not to its left, it means the key to be removed is at the start of the list, so the start has to be updated.
  • When the key to be removed has no keys to left nor right, that means the key to be removed is the only one, so the start and end references have to be cleared.

And here is the getExistingCacheItemfunction implementation:

private def getExistingCacheItem(key: K): IO[Error, CacheItem[K, V]] =

    ZIO.require(new Error(s"Key does not exist: $key"))(self.itemsRef.get.map(_.get(key)))
JavaScript

This function is named this way because the idea is that when we use it, we expect that the cache item we want to get exists. If the item does not exist, it means there’s some kind of problem, and we are signaling that with an Error. (By the way, if you look again at the get and put methods of LRUCache, you can see the application will die if an Error is produced).

Another interesting function to look at is updateLeftAndRightCacheItems, because it shows a use case of ZIO Ref.update, which automically modifies an Ref with the specified function.

private def updateLeftAndRightCacheItems(l: K, r: K): IO[Error, Unit] =

    for {

      leftCacheItem  <- getExistingCacheItem(l)

      rightCacheItem <- getExistingCacheItem(r)

      _              <- self.itemsRef.update(_.updated(l, leftCacheItem.copy(right = Some(r))))

      _              <- self.itemsRef.update(_.updated(r, rightCacheItem.copy(left = Some(l))))

    } yield ()
JavaScript

Finally, let’s take a look at the addKeyToStartOfList function, which is also pretty straightforward. Something to notice is that we are using Ref.updateSome for updating the endRef value, when it’s empty.

private def addKeyToStartOfList(key: K): IO[Error, Unit] =

    for {

      oldOptionStart <- self.startRef.get

      _ <- getExistingCacheItem(key).flatMap { cacheItem =>

            self.itemsRef.update(_.updated(key, cacheItem.copy(left = None, right = oldOptionStart)))

          }

      _ <- oldOptionStart match {

            case Some(oldStart) =>

              getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>

                self.itemsRef.update(_.updated(oldStart, oldStartCacheItem.copy(left = Some(key))))

              }

            case None => ZIO.unit

          }

      _ <- self.startRef.set(Some(key))

      _ <- self.endRef.updateSome { case None => Some(key) }

    } yield ()
JavaScript

There is one more thing to do before testing: create an IntLRUCacheEnv module in the com.example.cache package object (this module will be used for testing, and for simplicity, it considers integer keys and values):

package object cache {

  type IntLRUCacheEnv = Has[IntLRUCacheEnv.Service]

  object IntLRUCacheEnv {

    trait Service {

      def getInt(key: Int): IO[NoSuchElementException, Int]

      def putInt(key: Int, value: Int): UIO[Unit]

      val getCacheStatus: UIO[(Map[Int, CacheItem[Int, Int]], Option[Int], Option[Int])]

    }

    object Service {

      val zioRefImpl: ZLayer[Has[Int], IllegalArgumentException, IntLRUCacheEnv] =

        ZLayer.fromFunctionM { hasInt: Has[Int] =>

          LRUCache.make[Int, Int](hasInt.get).map { lruCache =>

            new Service {

              override def getInt(key: Int): IO[NoSuchElementException, Int] = lruCache.get(key)

              override def putInt(key: Int, value: Int): UIO[Unit] = lruCache.put(key, value)

              override val getCacheStatus: UIO[(Map[Int, CacheItem[Int, Int]], Option[Int], Option[Int])] =

                lruCache.getStatus

            }

          }

        }

    }

  }

  def getInt(key: Int): ZIO[IntLRUCacheEnv, NoSuchElementException, Int] = ZIO.accessM(_.get.getInt(key))

  def putInt(key: Int, value: Int): ZIO[IntLRUCacheEnv, Nothing, Unit] = ZIO.accessM(_.get.putInt(key, value))

  val getCacheStatus: ZIO[IntLRUCacheEnv, Nothing, (Map[Int, CacheItem[Int, Int]], Option[Int], Option[Int])] =

    ZIO.accessM(_.get.getCacheStatus)

}
JavaScript

By the way, this module is defined based on the new ZLayer data type that comes with ZIO since 1.0.0-RC18, and you can see there’s a zioRefImpl that makes use of our LRUCache. I won’t go into the details of using ZLayer here, but you can read the ZIO documentation page to get more information, and you can also take a look at these really nice articles:

Testing implementation with a single fiber

It’s time to put our LRUCache to test! Firstly, we are going to test it under a single-fiber scenario, the testing code is the following (by the way, this testing code reflects the example shown on this link):

object UseLRUCacheWithOneFiber extends App {

  def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =

    (for {

      _ <- put(1, 1)

      _ <- put(2, 2)

      _ <- get(1)

      _ <- put(3, 3)

      _ <- get(2)

      _ <- put(4, 4)

      _ <- get(1)

      _ <- get(3)

      _ <- get(4)

    } yield 0)

      .provideCustomLayer(ZLayer.succeed(2) >>> IntLRUCacheEnv.Service.zioRefImpl)

      .catchAll(ex => putStrLn(ex.getMessage) *> ZIO.succeed(1))

  private def get(key: Int): URIO[Console with IntLRUCacheEnv, Unit] =

    (for {

      _ <- putStrLn(s"Getting key: $key")

      v <- getInt(key)

      _ <- putStrLn(s"Obtained value: $v")

    } yield ()).catchAll(ex => putStrLn(ex.getMessage))

  private def put(key: Int, value: Int): URIO[Console with IntLRUCacheEnv, Unit] =

    putStrLn(s"Putting ($key, $value)") *> putInt(key, value)

}
JavaScript

So, we are running the application with an IntLRUCacheEnv.Service.zioRefImpl, with a capacity of 2. After executing the above program, the following result is obtained:

Putting (1, 1)

Putting (2, 2)

Getting key: 1

Obtained value: 1

Putting (3, 3)

Getting key: 2

Key does not exist: 2

Putting (4, 4)

Getting key: 1

Key does not exist: 1

Getting key: 3

Obtained value: 3

Getting key: 4

Obtained value: 4

As we can see, the behavior is correct! So, our implementation looks good so far.

Testing implementation with multiple concurrent fibers

Now, let’s test the LRUCache again, but against multiple concurrent fibers this time. The testing code is the following:

object UseLRUCacheWithMultipleFibers extends App {

  def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =

    (for {

      fiberReporter  <- reporter.forever.fork

      fiberProducers <- ZIO.forkAll(ZIO.replicate(100)(producer.forever))

      fiberConsumers <- ZIO.forkAll(ZIO.replicate(100)(consumer.forever))

      _              <- getStrLn.orDie *> (fiberReporter <*> fiberProducers <*> fiberConsumers).interrupt

    } yield 0)

      .provideCustomLayer(ZLayer.succeed(3) >>> IntLRUCacheEnv.Service.zioRefImpl)

      .catchAll(ex => putStrLn(ex.getMessage) *> ZIO.succeed(1))

  val producer: URIO[Console with Random with IntLRUCacheEnv, Unit] =

    for {

      number <- nextInt(100)

      _      <- putStrLn(s"Producing ($number, $number)")

      _      <- putInt(number, number)

    } yield ()

  val consumer: URIO[Console with Random with IntLRUCacheEnv, Unit] =

    (for {

      key   <- nextInt(100)

      _     <- putStrLn(s"Consuming key: $key")

      value <- getInt(key)

      _     <- putStrLn(s"Consumed value: $value")

    } yield ()).catchAll(ex => putStrLn(ex.getMessage))

  val reporter: ZIO[Console with IntLRUCacheEnv, NoSuchElementException, Unit] =

    for {

      (items, optionStart, optionEnd) <- getCacheStatus

      _                               <- putStrLn(s"Items: $items, Start: $optionStart, End: $optionEnd")

    } yield ()

}
JavaScript

We can see that an IntLRUCacheEnv.Service.zioRefImpl with a  capacity of 3 is provided. Also, 100 producers and 100 consumers of random integers are started in different fibers, and we have a reporter that will just print to the console the cache current status (stored items, start and end keys of the recently used items history). When we execute this, some ugly stuff happens:

  • First, more items than the defined capacity (a lot more) are being stored! And also, stored items have a lot of inconsistencies: for example you can see below that, for a given moment, the end key (the Least Recently Used key) is 97, but looking at the corresponding CacheItem we see it has other keys to its left and right (58 and 9 respectively), but… if 97 is at the end of the list, it shouldn’t have an item to its right!,Besides this, there are a lot more discrepancies among CacheItems:

Items: HashMap(5 -> CacheItem(5,Some(45),Some(6)), 84 -> CacheItem(84,Some(51),Some(91)), 69 -> CacheItem(69,Some(83),Some(36)), 0 -> CacheItem(0,None,Some(37)), 88 -> CacheItem(88,Some(82),Some(94)), 10 -> CacheItem(10,Some(37),Some(45)), 56 -> CacheItem(56,Some(54),Some(42)), 42 -> CacheItem(42,Some(6),Some(60)), 24 -> CacheItem(24,Some(30),Some(18)), 37 -> CacheItem(37,Some(0),Some(10)), 52 -> CacheItem(52,Some(70),Some(91)), 14 -> CacheItem(14,Some(72),Some(1)), 20 -> CacheItem(20,None,Some(46)), 46 -> CacheItem(46,Some(28),Some(70)), 93 -> CacheItem(93,Some(40),Some(6)), 57 -> CacheItem(57,Some(12),Some(45)), 78 -> CacheItem(78,None,Some(41)), 61 -> CacheItem(61,None,Some(26)), 1 -> CacheItem(1,Some(14),Some(2)), 74 -> CacheItem(74,None,Some(33)), 6 -> CacheItem(6,Some(5),Some(42)), 60 -> CacheItem(60,Some(42),Some(80)), 85 -> CacheItem(85,None,Some(99)), 70 -> CacheItem(70,Some(46),Some(52)), 21 -> CacheItem(21,None,Some(65)), 33 -> CacheItem(33,Some(77),Some(32)), 28 -> CacheItem(28,None,Some(46)), 38 -> CacheItem(38,Some(98),Some(68)), 92 -> CacheItem(92,Some(63),Some(0)), 65 -> CacheItem(65,Some(21),Some(51)), 97 -> CacheItem(97,Some(58),Some(9)), 9 -> CacheItem(9,Some(97),Some(99)), 53 -> CacheItem(53,None,Some(91)), 77 -> CacheItem(77,Some(27),Some(33)), 96 -> CacheItem(96,Some(3),Some(58)), 13 -> CacheItem(13,Some(14),Some(28)), 41 -> CacheItem(41,Some(78),Some(90)), 73 -> CacheItem(73,None,Some(41)), 2 -> CacheItem(2,Some(1),Some(92)), 32 -> CacheItem(32,Some(33),Some(98)), 45 -> CacheItem(45,Some(10),Some(5)), 64 -> CacheItem(64,None,Some(34)), 17 -> CacheItem(17,None,Some(35)), 22 -> CacheItem(22,None,Some(7)), 44 -> CacheItem(44,Some(79),Some(92)), 59 -> CacheItem(59,Some(15),Some(68)), 27 -> CacheItem(27,Some(4),Some(77)), 71 -> CacheItem(71,Some(46),Some(19)), 12 -> CacheItem(12,Some(75),Some(57)), 54 -> CacheItem(54,None,Some(56)), 49 -> CacheItem(49,None,Some(63)), 86 -> CacheItem(86,None,Some(43)), 81 -> CacheItem(81,Some(98),Some(1)), 76 -> CacheItem(76,None,Some(35)), 7 -> CacheItem(7,Some(22),Some(33)), 39 -> CacheItem(39,None,Some(4)), 98 -> CacheItem(98,Some(32),Some(81)), 91 -> CacheItem(91,Some(52),Some(75)), 66 -> CacheItem(66,None,Some(27)), 3 -> CacheItem(3,Some(94),Some(96)), 80 -> CacheItem(80,Some(60),Some(84)), 48 -> CacheItem(48,None,Some(9)), 63 -> CacheItem(63,Some(49),Some(3)), 18 -> CacheItem(18,Some(24),Some(26)), 95 -> CacheItem(95,None,Some(65)), 50 -> CacheItem(50,Some(68),Some(58)), 67 -> CacheItem(67,None,Some(21)), 16 -> CacheItem(16,None,Some(82)), 11 -> CacheItem(11,Some(5),Some(73)), 72 -> CacheItem(72,Some(99),Some(14)), 43 -> CacheItem(43,Some(86),Some(3)), 99 -> CacheItem(99,Some(9),Some(72)), 87 -> CacheItem(87,Some(36),Some(46)), 40 -> CacheItem(40,Some(11),Some(93)), 26 -> CacheItem(26,Some(18),Some(16)), 8 -> CacheItem(8,Some(3),Some(0)), 75 -> CacheItem(75,Some(91),Some(12)), 58 -> CacheItem(58,Some(96),Some(97)), 82 -> CacheItem(82,Some(16),Some(88)), 36 -> CacheItem(36,Some(69),Some(87)), 30 -> CacheItem(30,Some(11),Some(24)), 51 -> CacheItem(51,Some(65),Some(84)), 19 -> CacheItem(19,None,Some(83)), 4 -> CacheItem(4,Some(62),Some(27)), 79 -> CacheItem(79,None,Some(44)), 94 -> CacheItem(94,Some(88),Some(3)), 47 -> CacheItem(47,Some(35),Some(37)), 15 -> CacheItem(15,Some(68),Some(59)), 68 -> CacheItem(68,Some(38),Some(50)), 62 -> CacheItem(62,None,Some(4)), 90 -> CacheItem(90,Some(41),Some(33)), 83 -> CacheItem(83,Some(19),Some(69))), Start: Some(16), End: Some(97)

  • And, because of the issues mentioned above, we see fibers dying because of unexpected errors like this:
Fiber failed.

An unchecked error was produced.

java.lang.Error: Key does not exist: 35

at io.scalac.LRUCache.$anonfun$getExistingCacheItem$1(LRUCache.scala:113)
JavaScript

And well, it seems our current LRUCache implementation just works correctly in a single-fiber scenario, but not in a concurrent scenario, that is really bad. So now, let’s reflect on what’s happening.

Why doesn’t our implementation work in a concurrent scenario?

It may seem weird that our current implementation does not work as expected when multiple fibers use it concurrently: We have used immutable values everywhere, pure functions, purely functional mutable references (Ref[A]) that provide atomic operations on them… but wait a moment, Ref[A] provides atomic operations on SINGLE VALUES, but what happens if we need to keep consistency across MULTIPLE VALUES? Remember that in our LRUCache implementation, we have three Refs: itemsRef, startRef and endRef. So, it seems using Ref[A] is not a powerful enough solution for our use case:

  • Refs allow atomic operations on single values only.
  • Refs don’t compose! So you can’t compose two Refs to get a resulting Ref.

So, what can we do now?

Enter ZIO STM!

The solution to our problem is using ZIO STM (Software Transactional Memory). For that, ZIO provides two basic data types:

  • ZSTM[-R, +E, +A]: Represents an effect that can be performed transactionally, that requires an environment R to run and that may fail with an error E or succeed with a value A. Also, ZIO provides a type alias when no environment R is required: STM[+E, +A], which is equivalent to ZSTM[Any, E, A].
  • TRef[A]: Represents a Transactional Ref, meaning a purely functional mutable reference that can be used in the context of a transaction.

So, basically, an STM describes a bunch of operations across several TRefs.

Important things to notice:

  • STMs are composable (we can use them in for-comprehensions!)
  • All methods in TRef are very similar to the ones in Ref, but they return STM effects instead of ZIO effects.
  • To convert a STM effect to a ZIO effect, you need to commit the transaction: When you commit a transaction, all of its operations are performed in an atomic, consistent and isolated fashion, very similar to how relational databases work.

It’s also worth mentioning that we could use other classic concurrency structures from java.util.concurrent like Locks and Semaphores for solving concurrency issues, but that’s really complicated, low-level and error-prone, and race conditions or deadlocks are very likely to happen. Instead, ZIO STM replaces all of this low-level machinery with a high-level concept: transactions in memory, and we have no race conditions and no deadlocks!

Finally, ZIO STM provides other nice data structures that can participate in transactions (all of them are based in TRef):

  • TArray
  • TQueue
  • TSet
  • TMap
  • TPromise
  • TReentrantLock
  • TSemaphore

Our LRU Cache goes concurrent! Moving from ZIO Ref to ZIO STM

The concurrent version of our LRUCache will be very similar to what we had before, but we are going to make some changes to use ZIO STM:

final class ConcurrentLRUCache[K, V] private (

  private val capacity: Int,

  private val items: TMap[K, CacheItem[K, V]],

  private val startRef: TRef[Option[K]],

  private val endRef: TRef[Option[K]]

)
JavaScript

As you can see, we are changing Refs to TRefs, and instead of having items: TRef[Map[K, CacheItem[K, V]]], we are using the more convenient and efficient TMap data type that ZIO STM provides.

The smart constructor will also be very similar to the one we had before:

object ConcurrentLRUCache {

  def make[K, V](capacity: Int): IO[IllegalArgumentException, ConcurrentLRUCache[K, V]] =

    if (capacity > 0) {

      (for {

        itemsRef <- TMap.empty[K, CacheItem[K, V]]

        startRef <- TRef.make(Option.empty[K])

        endRef   <- TRef.make(Option.empty[K])

      } yield new ConcurrentLRUCache[K, V](capacity, itemsRef, startRef, endRef)).commit

    } else {

      ZIO.fail(new IllegalArgumentException("Capacity must be a positive number!"))

    }

}
JavaScript

The biggest difference in this smart constructor is that the for-comprehension returns a STM[Nothing, ConcurrentLRUCache[K, V]], and we need to commit it for getting a ZIO effect, which is what we want to return.

Next, we have the get and put methods for the ConcurrentLRUCache

def get(key: K): IO[NoSuchElementException, V] =

    (for {

      optionItem <- self.items.get(key)

      item      <- STM.fromOption(optionItem).mapError(_ => new NoSuchElementException(s"Key does not exist: $key"))

      _          <- removeKeyFromList(key) *> addKeyToStartOfList(key)

    } yield item.value).commitEither.refineToOrDie[NoSuchElementException]

  def put(key: K, value: V): UIO[Unit] =

    (for {

      optionStart <- self.startRef.get

      optionEnd   <- self.endRef.get

      _           <- STM.ifM(self.items.contains(key))(updateItem(key, value), addNewItem(key, value, optionStart, optionEnd))

    } yield ()).commitEither.orDie
JavaScript

Again, you can see these methods are very similar to the ones we had before! The only difference is that the for-comprehensions in both methods return values of type STM, so we need to commit the transactions (we are using commitEither in this case, so transactions are always committed despite errors, and failures are handled at the ZIO level).

Now, we can take a look at the same auxiliary functions we’ve seen before, but this time with ZIO STM. First, we have the removeKeyFromList function:

private def removeKeyFromList(key: K): STM[Error, Unit] =

    for {

      cacheItem      <- getExistingCacheItem(key)

      optionLeftKey  = cacheItem.left

      optionRightKey = cacheItem.right

      _ <- (optionLeftKey, optionRightKey) match {

            case (Some(l), Some(r)) =>

              updateLeftAndRightCacheItems(l, r)

            case (Some(l), None) =>

              setNewEnd(l)

            case (None, Some(r)) =>

              setNewStart(r)

            case (None, None) =>

              clearStartAndEnd

          }

    } yield ()
JavaScript

As you may have realized, the implementation is practically the same! The difference is that the function returns a STM effect instead of a ZIO effect. In this case (and the same happens for all private methods) we are not committing the transaction yet, that’s because we want to use these private functions in combination with others, to form bigger transactions that are committed in the get and put methods.

And, here is the getExistingCacheItem function implementation, again it’s very similar to the one we had before, but now an STM effect is returned, and also getting an element from the items Map is a lot easier now, thanks to TMap:

private def getExistingCacheItem(key: K): STM[Error, CacheItem[K, V]] =

    STM.require(new Error(s"Key does not exist: $key"))(self.items.get(key))
JavaScript

And for updateLeftAndRightCacheItems, putting elements into the items Map is a lot easier now too:

private def updateLeftAndRightCacheItems(l: K, r: K): STM[Error, Unit] =
    for {
      leftCacheItem  <- getExistingCacheItem(l)
      rightCacheItem <- getExistingCacheItem(r)
      _              <- self.items.put(l, leftCacheItem.copy(right = Some(r)))
      _              <- self.items.put(r, rightCacheItem.copy(left = Some(l)))
    } yield ()
JavaScript

And, we have addKeyToStartOfList, which again is very similar to the previous version:

private def addKeyToStartOfList(key: K): STM[Error, Unit] =

    for {

      oldOptionStart <- self.startRef.get

      _ <- getExistingCacheItem(key).flatMap { cacheItem =>

            self.items.put(key, cacheItem.copy(left = None, right = oldOptionStart))

          }

      _ <- oldOptionStart match {

            case Some(oldStart) =>

              getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>

                self.items.put(oldStart, oldStartCacheItem.copy(left = Some(key)))

              }

            case None => STM.unit

          }

      _ <- self.startRef.set(Some(key))

      _ <- self.endRef.updateSome { case None => Some(key) }

    } yield ()
JavaScript

Finally, before testing, let’s add a new zioStmImpl to the IntLRUCacheEnv module. This implementation should make use of the ConcurrentLRUCache we’ve just created:

object IntLRUCacheEnv {
   ...
    object Service {

      ...
      val zioStmImpl: ZLayer[Has[Int], IllegalArgumentException, IntLRUCacheEnv] =

        ZLayer.fromFunctionM { hasInt: Has[Int] =>

          ConcurrentLRUCache.make[Int, Int](hasInt.get).map { concurrentLruCache =>

            new Service {

              override def getInt(key: Int): IO[NoSuchElementException, Int] = concurrentLruCache.get(key)


              override def putInt(key: Int, value: Int): UIO[Unit] = concurrentLruCache.put(key, value)


              override val getCacheStatus: UIO[(Map[Int, CacheItem[Int, Int]], Option[Int], Option[Int])] =

                concurrentLruCache.getStatus

            }

          }

        }

    }

  }
JavaScript

Testing implementation with multiple fibers

Now that we have our ConcurrentLRUCache, let’s put it to test with the following testing code, which is practically the same one we had before (the only difference is that we are providing a IntLRUCacheEnv.Service.zioStmImpl now):

object UseConcurrentLRUCacheWithMultipleFibers extends App {

  def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =

    (for {

      fiberReporter  <- reporter.forever.fork

      fiberProducers <- ZIO.forkAll(ZIO.replicate(100)(producer.forever))

      fiberConsumers <- ZIO.forkAll(ZIO.replicate(100)(consumer.forever))

      _              <- getStrLn.orDie *> (fiberReporter <*> fiberProducers <*> fiberConsumers).interrupt

    } yield 0)

      .provideCustomLayer(ZLayer.succeed(3) >>> IntLRUCacheEnv.Service.zioStmImpl)

      .catchAll(ex => putStrLn(ex.getMessage) *> ZIO.succeed(1))

  val producer: URIO[Console with Random with IntLRUCacheEnv, Unit] =

    for {

      number <- nextInt(100)

      _      <- putStrLn(s"Producing ($number, $number)")

      _      <- putInt(number, number)

    } yield ()

  val consumer: URIO[Console with Random with IntLRUCacheEnv, Unit] =

    (for {

      key   <- nextInt(100)

      _     <- putStrLn(s"Consuming key: $key")

      value <- getInt(key)

      _     <- putStrLn(s"Consumed value: $value")

    } yield ()).catchAll(ex => putStrLn(ex.getMessage))

  val reporter: ZIO[Console with IntLRUCacheEnv, NoSuchElementException, Unit] =

    for {

      (items, optionStart, optionEnd) <- getCacheStatus

      _                               <- putStrLn(s"Items: $items, Start: $optionStart, End: $optionEnd")

    } yield ()

}
JavaScript

When we run this, everything works as it should! (and the best part is, we didn’t need to use Locks at all!) No more unexpected errors, and the reporter shows our cache keeps internal consistency, this is an example of what is printed to console for two executions of the reporter:

Items: Map(43 -> CacheItem(43,Some(16),None), 16 -> CacheItem(16,Some(32),Some(43)), 32 -> CacheItem(32,None,Some(16))), Start: Some(32), End: Some(43)

Items: Map(30 -> CacheItem(30,None,Some(69)), 53 -> CacheItem(53,Some(69),None), 69 -> CacheItem(69,Some(30),Some(53))), Start: Some(30), End: Some(53)

Writing unit tests for the Concurrent LRU Cache using ZIO Test

Finally, we can write some unit tests for our ConcurrentLRUCache using zio-test:

object ConcurrentLRUCacheTest extends DefaultRunnableSpec {

  def spec = suite("ConcurrentLRUCache")(

    testM("can't be created with non-positive capacity") {

      assertM(ConcurrentLRUCache.make[Int, Int](0).run)(

        fails(hasMessage(equalTo("Capacity must be a positive number!")))

      )

    },

    testM("works as expected") {

      val expectedOutput = Vector(

        "Putting (1, 1)\n",

        "Putting (2, 2)\n",

        "Getting key: 1\n",

        "Obtained value: 1\n",

        "Putting (3, 3)\n",

        "Getting key: 2\n",

        "Key does not exist: 2\n",

        "Putting (4, 4)\n",

        "Getting key: 1\n",

        "Key does not exist: 1\n",

        "Getting key: 3\n",

        "Obtained value: 3\n",

        "Getting key: 4\n",

        "Obtained value: 4\n"

      )

      for {

        lruCache <- ConcurrentLRUCache.make[Int, Int](2)

        _        <- put(lruCache, 1, 1)

        _        <- put(lruCache, 2, 2)

        _        <- get(lruCache, 1)

        _        <- put(lruCache, 3, 3)

        _        <- get(lruCache, 2)

        _        <- put(lruCache, 4, 4)

        _        <- get(lruCache, 1)

        _        <- get(lruCache, 3)

        _        <- get(lruCache, 4)

        output   <- TestConsole.output

      } yield {

        assert(output)(equalTo(expectedOutput))

      }

    }

  )

  private def get[K, V](concurrentLruCache: ConcurrentLRUCache[K, V], key: K): ZIO[Console, Nothing, Unit] =

    (for {

      _ <- putStrLn(s"Getting key: $key")

      v <- concurrentLruCache.get(key)

      _ <- putStrLn(s"Obtained value: $v")

    } yield ()).catchAll(ex => putStrLn(ex.getMessage))

  private def put[K, V](concurrentLruCache: ConcurrentLRUCache[K, V], key: K, value: V): ZIO[Console, Nothing, Unit] =

    putStrLn(s"Putting ($key, $value)") *> concurrentLruCache.put(key, value)

}
JavaScript

The first test is for asserting that trying to create a ConcurrentLRUCache with a non-positive capacity would result in a failure.

The second test is for asserting that the cache works as expected, for that we use the TestConsole module provided by zio-test, for asserting that the expected messages are printed to the console. 

I won’t go into more details about how zio-test works, but you can read about it in the ZIO documentation page.

Summary

In this article, we’ve seen a concrete example of writing concurrent data structures with ZIO: a concurrent LRU cache.  Because ZIO is based on functional programming principles – such as using pure functions and immutable values – it was really easy and painless to evolve our initial implementation. This didn’t support concurrency and was based on ZIO Ref, to a fully concurrent version, based on ZIO STM, without all the complicated stuff that comes when using lower-level concurrency structures such as Locks, and with no deadlocks or race conditions at all.

In addition, this was just a very specific example of what you can do with ZIO STM for writing concurrent data structures, so there’s a lot more you can do with it in your own projects, in a totally async, non-blocking and thread-safe fashion. So make sure to give it a try!

References