Big Data, iPaaS, SCALA

Concurrency in Scala

This entry is part 6 of 9 in the series Scala Series

Concurrency in Scala: Mastering Futures, Promises, and Asynchronous Programming
Originally posted October 24, 2018 by Kinshuk Dutta


In this installment, we’re diving into concurrency in Scala, exploring how Futures and Promises simplify asynchronous programming. These features make it easier to handle complex workflows without blocking threads, an essential skill for modern applications. This blog builds on the principles from previous posts in our Scala series and introduces a sample project: Real-Time Order Processing System.


Table of Contents

  1. Introduction to Concurrency in Scala
  2. Futures and Promises in Scala
  3. Combining Futures for Complex Workflows
  4. Sample Project: Real-Time Order Processing System
  5. Conclusion and Next Steps

Introduction to Concurrency in Scala

In concurrent programming, the goal is to execute multiple tasks simultaneously or handle long-running processes in the background. In Scala, the primary tool for this is Future. A Future represents a value that may not yet be available, allowing you to start a computation and retrieve its result once it completes. Promises complement Futures by allowing us to set the result of a Future manually.


Futures and Promises in Scala

Futures and Promises help us manage asynchronous computations, handling processes like API requests, database queries, and batch jobs.

Working with Futures

You can create a Future using Future { ... }, which runs asynchronously and can be transformed with map, flatMap, or for-comprehension:

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val futureValue = Future {
// Simulate long-running computation
Thread.sleep(2000)
42 // The result
}futureValue.map(result => println(s”Result: $result”))

Introducing Promises

Promises are used when you want to create a Future and control its completion. With a Promise, you can create a Future and fulfill it later:

scala

import scala.concurrent.Promise

val promise = Promise[Int]()
val futureFromPromise = promise.future

// Completing the promise
promise.success(42)
futureFromPromise.map(result => println(s”Promise result: $result”))


Combining Futures for Complex Workflows

Scala’s Futures can be chained and combined for complex workflows. For example, you can run several Futures in parallel and wait for their results with Future.sequence, or you can use recover to handle potential failures gracefully.

scala
val future1 = Future { 10 }
val future2 = Future { 20 }
val combinedFuture = for {
result1 <- future1
result2 <- future2
} yield result1 + result2combinedFuture.map(result => println(s”Combined result: $result”))


Sample Project: Real-Time Order Processing System

The Real-Time Order Processing System simulates a live e-commerce environment where order details are fetched, processed, and updated asynchronously. This project demonstrates Futures, Promises, and error handling in a scalable way, handling multiple requests simultaneously.

Project Structure

plaintext
real-time-order-processing

├── src
│ ├── main
│ │ ├── scala
│ │ │ ├── ecommerce
│ │ │ │ ├── models
│ │ │ │ │ ├── Order.scala
│ │ │ │ │ ├── OrderStatus.scala
│ │ │ │ ├── services
│ │ │ │ │ ├── OrderService.scala
│ │ │ │ │ ├── AsyncOrderProcessor.scala
│ │ │ │ │ ├── OrderUpdater.scala
│ │ │ │ ├── Main.scala

├── test
│ ├── scala
│ │ ├── ecommerce
│ │ │ ├── OrderServiceTest.scala
│ │ │ ├── AsyncOrderProcessorTest.scala
└── build.sbt

Implementation Guide

Step 1: Define Models

In the models directory, define an order structure and status.

OrderStatus.scala

scala

package ecommerce.models

sealed trait OrderStatus
case object Pending extends OrderStatus
case object Processing extends OrderStatus
case object Shipped extends OrderStatus
case object Delivered extends OrderStatus

Order.scala

scala

package ecommerce.models

case class Order(id: String, item: String, quantity: Int, status: OrderStatus)

Step 2: Create AsyncOrderProcessor

In the services directory, AsyncOrderProcessor simulates processing an order asynchronously and uses Promises for result handling.

AsyncOrderProcessor.scala

scala

package ecommerce.services

import ecommerce.models._
import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global

object AsyncOrderProcessor {

def processOrder(order: Order): Future[Order] = {
val promise = Promise[Order]()
Future {
Thread.sleep(1000) // Simulate processing delay
promise.success(order.copy(status = Processing))
}
promise.future
}

def shipOrder(order: Order): Future[Order] = Future {
Thread.sleep(500) // Simulate shipping delay
order.copy(status = Shipped)
}
}

Step 3: Implement OrderService to Combine Futures

OrderService.scala

scala

package ecommerce.services

import ecommerce.models._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object OrderService {

def processAndShipOrder(order: Order): Future[Order] = {
AsyncOrderProcessor.processOrder(order).flatMap { processedOrder =>
AsyncOrderProcessor.shipOrder(processedOrder)
}
}
}

Step 4: Main Application Logic

Main.scala

scala

package ecommerce

import ecommerce.models._
import ecommerce.services._

import scala.concurrent.Await
import scala.concurrent.duration._

object Main extends App {
val initialOrder = Order(“001”, “Laptop”, 1, Pending)
println(s”Initial order: $initialOrder”)

val processedOrder = Await.result(OrderService.processAndShipOrder(initialOrder), 5.seconds)
println(s”Processed order: $processedOrder”)
}

Testing the Project

Step 1: Add ScalaTest Dependency

Update build.sbt:

scala
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test
Step 2: Write Tests

AsyncOrderProcessorTest.scala

scala

package ecommerce.services

import org.scalatest.flatspec.AnyFlatSpec
import ecommerce.models._
import scala.concurrent.Await
import scala.concurrent.duration._

class AsyncOrderProcessorTest extends AnyFlatSpec {

“AsyncOrderProcessor” should “process an order and set it to Processing” in {
val order = Order(“001”, “Laptop”, 1, Pending)
val processedOrder = Await.result(AsyncOrderProcessor.processOrder(order), 2.seconds)
assert(processedOrder.status == Processing)
}

it should “ship an order and set it to Shipped” in {
val order = Order(“001”, “Laptop”, 1, Processing)
val shippedOrder = Await.result(AsyncOrderProcessor.shipOrder(order), 1.second)
assert(shippedOrder.status == Shipped)
}
}

Step 3: Run Tests

Run the tests with:

bash
sbt test

Conclusion and Next Steps

With Futures, Promises, and asynchronous programming in Scala, we’ve developed a robust, scalable order processing system capable of handling concurrent tasks with ease. In the next blog, we’ll dive into Scala’s type classes and implicitsfor more powerful and flexible code. Stay tuned!

 

Series Navigation<< Advanced Type Classes and Implicits in ScalaAdvanced Functional Programming in Scala >>