Concurrency in Scala
- SCALA & SPARK for Managing & Analyzing BIG DATA
- The Power of Scala in Data-Intensive Applications
- Error Handling and Fault Tolerance in Scala
- Concurrency and Parallelism in Scala
- Advanced Type Classes and Implicits in Scala
- Concurrency in Scala
- Advanced Functional Programming in Scala
- Functional Programming in Scala
- Scala Basics
Concurrency in Scala: Mastering Futures, Promises, and Asynchronous Programming
Originally posted October 24, 2018 by Kinshuk Dutta
In this installment, we’re diving into concurrency in Scala, exploring how Futures and Promises simplify asynchronous programming. These features make it easier to handle complex workflows without blocking threads, an essential skill for modern applications. This blog builds on the principles from previous posts in our Scala series and introduces a sample project: Real-Time Order Processing System.
Table of Contents
- Introduction to Concurrency in Scala
- Futures and Promises in Scala
- Combining Futures for Complex Workflows
- Sample Project: Real-Time Order Processing System
- Conclusion and Next Steps
Introduction to Concurrency in Scala
In concurrent programming, the goal is to execute multiple tasks simultaneously or handle long-running processes in the background. In Scala, the primary tool for this is Future. A Future represents a value that may not yet be available, allowing you to start a computation and retrieve its result once it completes. Promises complement Futures by allowing us to set the result of a Future manually.
Futures and Promises in Scala
Futures and Promises help us manage asynchronous computations, handling processes like API requests, database queries, and batch jobs.
Working with Futures
You can create a Future using Future { ... }
, which runs asynchronously and can be transformed with map
, flatMap
, or for-comprehension
:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val futureValue = Future {// Simulate long-running computation
Thread.sleep(2000)
42 // The result
}
futureValue.map(result => println(s”Result: $result”))
Introducing Promises
Promises are used when you want to create a Future and control its completion. With a Promise, you can create a Future and fulfill it later:
import scala.concurrent.Promise
val promise = Promise[Int]()
val futureFromPromise = promise.future
// Completing the promise
promise.success(42)
futureFromPromise.map(result => println(s”Promise result: $result”))
Combining Futures for Complex Workflows
Scala’s Futures can be chained and combined for complex workflows. For example, you can run several Futures in parallel and wait for their results with Future.sequence
, or you can use recover
to handle potential failures gracefully.
val future1 = Future { 10 }
val future2 = Future { 20 }
val combinedFuture = for {result1 <- future1
result2 <- future2
} yield result1 + result2
combinedFuture.map(result => println(s”Combined result: $result”))
Sample Project: Real-Time Order Processing System
The Real-Time Order Processing System simulates a live e-commerce environment where order details are fetched, processed, and updated asynchronously. This project demonstrates Futures, Promises, and error handling in a scalable way, handling multiple requests simultaneously.
Project Structure
real-time-order-processing
│
├── src
│ ├── main
│ │ ├── scala
│ │ │ ├── ecommerce
│ │ │ │ ├── models
│ │ │ │ │ ├── Order.scala
│ │ │ │ │ ├── OrderStatus.scala
│ │ │ │ ├── services
│ │ │ │ │ ├── OrderService.scala
│ │ │ │ │ ├── AsyncOrderProcessor.scala
│ │ │ │ │ ├── OrderUpdater.scala
│ │ │ │ ├── Main.scala
│
├── test
│ ├── scala
│ │ ├── ecommerce
│ │ │ ├── OrderServiceTest.scala
│ │ │ ├── AsyncOrderProcessorTest.scala
└── build.sbt
Implementation Guide
Step 1: Define Models
In the models
directory, define an order structure and status.
OrderStatus.scala
package ecommerce.models
sealed trait OrderStatus
case object Pending extends OrderStatus
case object Processing extends OrderStatus
case object Shipped extends OrderStatus
case object Delivered extends OrderStatus
Order.scala
package ecommerce.models
case class Order(id: String, item: String, quantity: Int, status: OrderStatus)
Step 2: Create AsyncOrderProcessor
In the services
directory, AsyncOrderProcessor
simulates processing an order asynchronously and uses Promises for result handling.
AsyncOrderProcessor.scala
package ecommerce.services
import ecommerce.models._
import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
object AsyncOrderProcessor {
def processOrder(order: Order): Future[Order] = {
val promise = Promise[Order]()
Future {
Thread.sleep(1000) // Simulate processing delay
promise.success(order.copy(status = Processing))
}
promise.future
}
def shipOrder(order: Order): Future[Order] = Future {
Thread.sleep(500) // Simulate shipping delay
order.copy(status = Shipped)
}
}
Step 3: Implement OrderService to Combine Futures
OrderService.scala
package ecommerce.services
import ecommerce.models._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object OrderService {
def processAndShipOrder(order: Order): Future[Order] = {
AsyncOrderProcessor.processOrder(order).flatMap { processedOrder =>
AsyncOrderProcessor.shipOrder(processedOrder)
}
}
}
Step 4: Main Application Logic
Main.scala
package ecommerce
import ecommerce.models._
import ecommerce.services._
import scala.concurrent.Await
import scala.concurrent.duration._
object Main extends App {
val initialOrder = Order(“001”, “Laptop”, 1, Pending)
println(s”Initial order: $initialOrder”)
val processedOrder = Await.result(OrderService.processAndShipOrder(initialOrder), 5.seconds)
println(s”Processed order: $processedOrder”)
}
Testing the Project
Step 1: Add ScalaTest Dependency
Update build.sbt
:
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test
Step 2: Write Tests
AsyncOrderProcessorTest.scala
package ecommerce.services
import org.scalatest.flatspec.AnyFlatSpec
import ecommerce.models._
import scala.concurrent.Await
import scala.concurrent.duration._
class AsyncOrderProcessorTest extends AnyFlatSpec {
“AsyncOrderProcessor” should “process an order and set it to Processing” in {
val order = Order(“001”, “Laptop”, 1, Pending)
val processedOrder = Await.result(AsyncOrderProcessor.processOrder(order), 2.seconds)
assert(processedOrder.status == Processing)
}
it should “ship an order and set it to Shipped” in {
val order = Order(“001”, “Laptop”, 1, Processing)
val shippedOrder = Await.result(AsyncOrderProcessor.shipOrder(order), 1.second)
assert(shippedOrder.status == Shipped)
}
}
Step 3: Run Tests
Run the tests with:
sbt test
Conclusion and Next Steps
With Futures, Promises, and asynchronous programming in Scala, we’ve developed a robust, scalable order processing system capable of handling concurrent tasks with ease. In the next blog, we’ll dive into Scala’s type classes and implicitsfor more powerful and flexible code. Stay tuned!