iPaaS, SCALA

Concurrency and Parallelism in Scala

This entry is part 4 of 9 in the series Scala Series

Concurrency and Parallelism in Scala: Mastering Futures and Promises

Originally posted December 5, 2018 by Kinshuk Dutta


Welcome back to our Scala series! Now that we’ve covered type classes and implicits, we’re ready to dive into concurrency and parallelism using Futures and Promises in Scala. These tools allow us to handle asynchronous tasks gracefully and are foundational to building scalable, real-time applications in Scala.

In this blog, we’ll explore concurrency basics, then dive into a sample project: a Real-Time Stock Market Notifier. This project will demonstrate how to fetch and process stock prices asynchronously, alerting users to any sudden price changes.


Table of Contents

  1. Concurrency and Parallelism Basics
  2. Understanding Futures and Promises
  3. Real-Time Stock Market Notifier Project
  4. Conclusion and Next Steps

Concurrency and Parallelism Basics

Concurrency and parallelism are core concepts in modern programming:

  • Concurrency is the ability to handle multiple tasks simultaneously, enabling one task to proceed while others wait for resources.
  • Parallelism involves executing multiple tasks at the same time across multiple processors or cores.

In Scala, Futures and Promises provide a powerful framework for asynchronous computation, allowing us to write non-blocking code that performs tasks concurrently.


Understanding Futures and Promises

In Scala:

  • A Future represents a value or result that will be available in the future. Futures are useful for handling long-running tasks without blocking the main thread.
  • A Promise is a placeholder for a future value that can be completed (or failed) at a later time.

Here’s a quick example to illustrate their basic usage:

scala
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global

val promise = Promise[Int]()
val future = promise.future

// Simulate async computation
Future {
Thread.sleep(1000)
promise.success(42)
}

future.map(value => println(s"Promise completed with value: $value"))


Real-Time Stock Market Notifier Project

For this project, we’ll create a Real-Time Stock Market Notifier. The system will asynchronously fetch stock prices, monitor changes, and notify users of sudden price movements. This example demonstrates handling multiple asynchronous tasks concurrently, using Futures and Promises.

Project Structure

plaintext
real-time-stock-market-notifier

├── src
│ ├── main
│ │ ├── scala
│ │ │ ├── notifier
│ │ │ │ ├── models
│ │ │ │ │ ├── Stock.scala
│ │ │ │ │ ├── User.scala
│ │ │ │ ├── services
│ │ │ │ │ ├── StockService.scala
│ │ │ │ │ ├── NotificationService.scala
│ │ │ │ ├── Main.scala

├── test
│ ├── scala
│ │ ├── notifier
│ │ │ ├── StockServiceTest.scala
│ │ │ ├── NotificationServiceTest.scala
└── build.sbt

Implementation Guide

Step 1: Define Models

Create the Stock and User models in the models directory.

Stock.scala

scala
package notifier.models

case class Stock(symbol: String, price: Double)

User.scala

scala
package notifier.models

case class User(id: String, name: String, preferredStock: String)

Step 2: Create Services

Our project includes two primary services:

  1. StockService – Fetches and monitors stock prices.
  2. NotificationService – Sends notifications based on price changes.

StockService.scala

scala
package notifier.services

import notifier.models.Stock
import scala.concurrent.{Future, ExecutionContext}
import scala.util.Random

object StockService {
def fetchStockPrice(symbol: String)(implicit ec: ExecutionContext): Future[Stock] = Future {
// Simulate a network call with random stock price
Thread.sleep(500)
Stock(symbol, Random.nextDouble() * 1000)
}

def monitorStockPrice(symbol: String, threshold: Double)(implicit ec: ExecutionContext): Future[Unit] = {
fetchStockPrice(symbol).flatMap { stock =>
if (stock.price >= threshold) {
println(s"Alert! Stock ${stock.symbol} has reached the threshold: ${stock.price}")
}
Future.unit
}
}
}

NotificationService.scala

scala
package notifier.services

import notifier.models.{User, Stock}
import scala.concurrent.{Future, ExecutionContext}

object NotificationService {
def sendNotification(user: User, stock: Stock)(implicit ec: ExecutionContext): Future[Unit] = Future {
println(s"Sending notification to ${user.name}: Stock ${stock.symbol} has changed to ${stock.price}")
}
}

Step 3: Create the Main Application

Main.scala

scala
package notifier

import notifier.models.{User, Stock}
import notifier.services.{StockService, NotificationService}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object Main extends App {
val user = User("U001", "John Doe", "AAPL")

// Monitor stock price and notify user
val monitoringFuture = StockService.monitorStockPrice(user.preferredStock, threshold = 500)
monitoringFuture.onComplete(_ => println("Monitoring complete"))
}


Testing the Project

Step 1: Add ScalaTest Dependencies

Add ScalaTest to build.sbt for unit testing:

scala
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test

Step 2: Write Tests

Create test classes for StockService and NotificationService.

StockServiceTest.scala

scala
package notifier.services

import org.scalatest.flatspec.AnyFlatSpec
import notifier.models.Stock
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._

class StockServiceTest extends AnyFlatSpec {
"StockService" should "fetch stock prices asynchronously" in {
val stockFuture = StockService.fetchStockPrice("AAPL")
val stock = Await.result(stockFuture, 1.second)
assert(stock.symbol == "AAPL")
}
}

NotificationServiceTest.scala

scala
package notifier.services

import org.scalatest.flatspec.AnyFlatSpec
import notifier.models.{User, Stock}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.Await

class NotificationServiceTest extends AnyFlatSpec {
"NotificationService" should "send notification to user asynchronously" in {
val user = User("U001", "John Doe", "AAPL")
val stock = Stock("AAPL", 700)
val notificationFuture = NotificationService.sendNotification(user, stock)
Await.result(notificationFuture, 1.second)
assert(true) // Check for successful completion
}
}

Step 3: Run Tests

Execute the tests with:

bash
sbt test

Conclusion and Next Steps

In this blog, we explored Futures and Promises in Scala, demonstrating how they empower concurrent programming for scalable applications. With the Real-Time Stock Market Notifier, we illustrated how to manage asynchronous tasks like monitoring stock prices and notifying users of significant price changes.

In the next blog, we’ll dive into error handling and fault tolerance in Scala using Try, Either, and Option to write more resilient and robust applications. Stay tuned as we continue our journey through advanced Scala!

Series Navigation<< Error Handling and Fault Tolerance in ScalaAdvanced Type Classes and Implicits in Scala >>