Scala Futures, Traverse and Side Effects

Scala Futures, Traverse and Side Effects

A month ago I was developing some virtual file system. My code had should read a config file, take a path and create all folders from that path. I had an asynchronous API for that purpose.

My code was like this:


val list = (
  "/a",
  "/a/b",
  "/a/b/c",
  "/a/b/c/d",
  "/a/b/c/d/e"
)

def isDirExists(str: String): Future[Boolean] = ???
def createDir(str: String): Future[Unit] = ???

Future.traverse(list){dir =>
     isDirExists(dir).flatMap{ isExists =>
       if (isExists) Future.successful(())
       else  createDir(dir)
     }
}

The list consists of paths of folders to be sequentially created. The folder creation starts from the most upper folder. If it doesn’t exist it will be created. All is clear and easy. But it doesn’t work as expected.

The problem was that suddenly an exception was being thrown, telling that the folder can’t be created because the parent folder doesn’t exist. Why? It MUST exists!

It took me some time to find problem location:

Scala Future is eager by nature. This means that Future execution starts in the very moment it has been defined. It’s not a problem if you have no side effects inside your Futures. For instance: you have some lengthy calculations wrapped if Futures. All will work fine.

But imagine that during this calculations some data are being put to database and some data are being retrieved. Now we have side effects, and in some situations we Must care about order of execution of these side effects.

Let’s take a very simple example:


  private val sideEffect: ListBuffer[Int] = ListBuffer.empty[Int]
  val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9)


  def withSideEffect(n: Int) = Future{
    Thread.sleep(Random.nextInt(100))
    sideEffect += n
    n
  }

  def getSideEffect() = sideEffect

We have a function withSideEffect , we have a List[Int], and we need to apply this function to every element of the list. We also need that side effects to be executed sequentially, in some order.

Future.traverse

Of cause, it’s the first that we must try:


import examples._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

val future = Future.traverse(list) {
    withSideEffect
}

val result = Await.result(future, 12 seconds)

//side effects are not in right order:
println (s"Side Effect: ${getSideEffect.mkString(", ")}")

// but the result is in right order:
println (s"result: ${result.mkString(", ")}")

The result:


Side Effect: 1, 2, 5, 3, 7, 4, 6, 9, 8
result: 1, 2, 3, 4, 5, 6, 7, 8, 9

Unfortunately, the list of side effects is not in the right order.

Cats Traverse

May be Cats implementation of traverse helps us?


import cats.implicits._

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import examples._


val res: List[Int] =  Await.result(list.traverse(withSideEffect), 10.seconds)

println("sideEffect = " + getSideEffect.mkString(", "))
println("result = " + res.mkString(", "))

The result:


sideEffect = 2, 5, 4, 1, 8, 3, 6, 7, 9
result = 1, 2, 3, 4, 5, 6, 7, 8, 9

Unfortunately, not!

Let’s analyze the code

In the book Scala with Cats by Noel Welsh and Dave Gurnell I found this code (I modified it a bit):

If we ignore distractions like CanBuildFrom and ExecutionContext, the implementation of Future.traverse in the standard library looks like this:


import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import examples._
import scala.concurrent.ExecutionContext.Implicits.global

def traverse[A, B](values: List[A])
                  (func: A => Future[B]): Future[List[B]] =
  values.foldLeft(Future.successful(List.empty[B])) { (accum, host) =>
    val item = func(host)
    for {
      a <- accum
      i <- item
    } yield (a :+ i)
  }

val result = Await.result(traverse(list)(withSideEffect), 12 seconds)

//side effects are not in right order:
println (s"Side Effect: ${getSideEffect.mkString(", ")}")

// but the result is in right order:
println (s"result: ${result.mkString(", ")}")

Does it work?


Side Effect: 2, 3, 6, 7, 4, 1, 9, 5, 8
result: 1, 2, 3, 4, 5, 6, 7, 8, 9

No, it doesn’t!

Why? The approach with the method foldLeft MUST give us what we want! What’s wrong?

Look at this line:


val item = func(host)

Why the function which returns future is called here?

Custom traverse method

Let’s start this line with lazy keyword:



def traverse[A, B](values: List[A])
                  (func: A => Future[B]): Future[List[B]] =
  values.foldLeft(Future.successful(List.empty[B])) { (accum, host) =>
    lazy val item = func(host)
    for {
      a <- accum
      i <- item
    } yield (a :+ i)
  }

And It works! At last!


Side Effect: 1, 2, 3, 4, 5, 6, 7, 8, 9
result: 1, 2, 3, 4, 5, 6, 7, 8, 9

So, we found appropriate decision. You can use our traverse method instead of Future.traverse. Furthermore, You get a sequence method for free:


def sequence[B](futures: List[Future[B]]): Future[List[B]] =
  traverse(futures)(identity)

Monix Task

If you don’t want to invent a bicycle, you can use Monix Tasks instead of Futures.

Task is a data type for controlling possibly lazy & asynchronous computations, useful for controlling side-effects, avoiding nondeterminism and callback-hell.


import monix.execution.Scheduler.Implicits.global
import monix.eval.Task
import examples._
import scala.concurrent.duration._
import scala.concurrent.Await

val future = Task.traverse(list) {x =>
  Task.deferFuture(withSideEffect(x))
}.runAsync

val result = Await.result(future, 12 seconds)

//side effects are in right order:
println (s"Side Effect: ${getSideEffect.mkString(", ")}")

// and the result is in right order:
println (s"result: ${result.mkString(", ")}")

The result is correct:


Side Effect: 1, 2, 3, 4, 5, 6, 7, 8, 9
result: 1, 2, 3, 4, 5, 6, 7, 8, 9

 

 

See the code on GitHub

 

 

About Alexandre Kremlianski

Scala / Scala.js / JavaScript programmer

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.