Modern JVM

Multithreading

Paweł Jurczenko, 2020

About me

  • Senior Software Engineer
    at Allegro (~1000 microservices)
  • 5 years of Scala development
  • Distributed systems
  • Concurrent computing
  • Functional programming

Agenda

  1. Overview
  2. Threading models
  3. Concurrency primitives
  4. Non-blocking I/O
  5. Thread pools
  6. Best practices
  7. Async stacktraces

Overview

Layered architecture

Hardware threads

OS threads

JVM threads

Green threads

Custom threads

cooperative

scheduling

preemptive

scheduling

threading

model (M:N)

threading

model (1:1)

Green threads

Green threads

  • Threads managed in user space instead of kernel space
  • Scheduled by runtime library or virtual machine
  • Usually scheduled cooperatively
  • Examples: coroutines, goroutines, fibers

Overview

Green threads

Goals

  • Lower management costs
  • More efficient resources usage
  • Ability to block without blocking kernel threads
  • Ability to be spawned in thousands
  • Multithreading without native thread support

Green threads

Implementations

  • Continuations / Coroutines - Kotlin Coroutines, Project Loom
  • Fibers - Quasar, Project Loom, Monix, ZIO, cats-effect
  • Goroutines - Go runtime
  • Haskell threads - GHC runtime
  • Erlang processes - ERTS (Erlang RunTime System)

Concurrency primitives

Concurrency primitives

Reborn

  • Locks
  • Semaphores
  • Channels
  • Queues
  • MVars
  • STM
  • Actors

Concurrency primitives

Example

// java.util.concurrent.ArrayBlockingQueue
val queue = new ArrayBlockingQueue[String](capacity = 10)
queue.add("Allegro")   // throws "IllegalStateException" when queue is full
queue.offer("Allegro") // returns "false" when queue is full
queue.put("Allegro")   // blocks a thread when queue is full
// java.util.concurrent.ArrayBlockingQueue
val queue = new ArrayBlockingQueue[String](capacity = 10)
queue.add("Allegro")   // throws "IllegalStateException" when queue is full
queue.offer("Allegro") // returns "false" when queue is full
queue.put("Allegro")   // blocks a thread when queue is full

// monix.catnap.ConcurrentQueue
ConcurrentQueue[Future]
  .bounded[String](capacity = 10)
  .flatMap(queue => queue.offer("Allegro")) // returns "Future[Unit]"

Concurrency primitives

Example

Non-blocking I/O

  • Memory consumption
  • Context switch overhead
  • Decreased throughput
  • Increased cache misses
  • Increased number of GC roots
  • Increased risk of deadlocks

Blocking I/O

Risks of having too many threads

Non-blocking I/O

Common misconception

It's not about better I/O performance.

It's about more efficient resources usage.

Non-blocking I/O

Network I/O

  • Well supported by the operating systems
  • Well supported by the JVM
  • Examples: async-http-client, Spring WebClient, Java 11 HTTP Client
  • Well supported only by some operating systems
  • Not fully supported on Linux*
  • JVM on Linux: non-blocking file I/O doesn't exist
  • JVM on Linux: AsynchronousFileChannel is blocking
  • Affects not only JVM: libuv has the exact same problems

* might change with io_uring

Non-blocking I/O

File I/O

  • Well supported by non-relational databases
  • MongoDB: Async Driver
  • Cassandra: DataStax Java Driver
  • Redis: Lettuce
  • ...and many more!

Non-blocking I/O

Non-relational databases

JDBC (Java Database Connectivity):

  • Really old - February 19, 1997
  • Completely blocking API
  • Low-level, leaky abstractions
  • Nulls, exceptions, side-effects

Non-blocking I/O

Relational databases - problem

Low-level:

  • PostgreSQL: postgresql-async, jasync-sql
  • MySQL: mysql-async, jasync-sql
  • Generic: fiber-based JDBC 🚧

High-level:

  • PostgreSQL: Quill, Skunk 🚧
  • MySQL: Quill
  • Generic: R2DBC 🚧

Non-blocking I/O

Relational databases - solutions

Goal:

We want to execute some application-level code each time a new offer is inserted into the database.

Non-blocking I/O

Relational databases - example

CREATE FUNCTION offer_created() RETURNS trigger as $$
  BEGIN
    PERFORM pg_notify('offers', NEW.offer_id);
    RETURN NEW;
  END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER offer_created_trigger
AFTER INSERT ON offers
FOR EACH ROW EXECUTE PROCEDURE offer_created();

Non-blocking I/O

Relational databases - example

// Skunk
val session: Session[IO] = Session.single(
  host = "localhost",
  port = 5432,
  user = "pjurczenko",
  database = "allegro"
).acquire()

val notifications = session
  .channel(id"offers")
  .listen(maxQueued = 10) // fs2.Stream[IO, Notification]
  .toUnicastPublisher()   // org.reactivestreams.Publisher[Notification]
  .toFlux()               // reactor.core.publisher.Flux[Notification]
  
// * acquire() and toFlux() are custom extension methods

Non-blocking I/O

Relational databases - example

ADBA (Asynchronous Database Access API)

  • Oracle initiative
  • Announced in 2016
  • Also known as "Asynchronous JDBC"
  • Responses wrapped in CompletableFuture
  • No streaming/backpressure capabilities
  • Not developed anymore: fiber-based JDBC
    will be used instead

R2DBC (Reactive Relational Database Connectivity)

  • Spring (Pivotal) initiative
  • Announced in 2018
  • Responses wrapped in Mono/Flux
  • Compliant with Reactive Streams
  • First released in November of 2019

Non-blocking I/O

Relational databases - generic solutions

Thread pools

Thread pools

Separate CPU-bound tasks from blocking I/O tasks.

Many applications will work fine with Scala's global thread pool. However, when we have strong performance requirements,
it's good to have at least three different thread pools:

  • one for CPU-bound tasks,
  • one for blocking I/O tasks,
  • one for non-blocking I/O tasks.

Overview

CPU-bound tasks

  • many small tasks:
    beware thread contention
    (use e.g. ForkJoinPool)
  • long-running tasks:
    use bounded pool
    (e.g. newFixedThreadPool)
  • when in doubt: benchmark

Thread pools

Multiple thread pools

Blocking I/O tasks

  • use unbounded pool
    (e.g. newCachedThreadPool)
  • provide limits at the higher,
    semantic level

Non-blocking I/O tasks

  • use bounded pool
    (e.g. newFixedThreadPool)
  • one or two threads should be enough
  • should work only as a dispatcher

CachedThreadPool

Blocking

network I/O

(e.g. OkHttp)

Blocking

database I/O

(e.g. JDBC)

Blocking

file I/O

(e.g. java.io)

e.g. max(50)

e.g. max(10)

e.g. max(20)

Thread pools

Unbounded pool for blocking I/O

Best practices

Avoid concurrency for as long as possible.

Best practices

#1

Prefer high-level concurrency over low-level concurrency.

Times when you had to use wait() and notify() are long gone.

Best practices

#2

Choose concurrency primitives carefully.

There is a reason for the existence of so many of them:

each addresses a different problem.

Best practices

#3

Know your thread pools.

Control your own thread pools and identify
thread pools from external libraries.
Otherwise this might hit you at the worst time.

Best practices

#4

Best practices

#4 - example

// OkHttp
val dispatcher = new Dispatcher()
dispatcher.setMaxRequestsPerHost(100)
dispatcher.setMaxRequests(100)
val client = new OkHttpClient()
  .newBuilder()
  .dispatcher(dispatcher)
  .build()

Best practices

#4 - example

Prefer libraries with pluggable thread pools.

  • you should be in the control of your application runtime, not the developer of some external library
  • if thread pools aren't pluggable, make sure they are at least configurable

Best practices

#5

Best practices

#5 - example

// OkHttp
val threadPool = Executors.newFixedThreadPool(100)
val dispatcher = new Dispatcher(threadPool)
dispatcher.setMaxRequestsPerHost(100)
dispatcher.setMaxRequests(100)
val client = new OkHttpClient()
  .newBuilder()
  .dispatcher(dispatcher)
  .build()

Best practices

#5 - example

Be careful with Runtime.getRuntime().availableProcessors()

There are two reasons for that:

  • it's not 100% reliable when it comes to virtualized environments
    (e.g. it might return 1 if you have 4 cores)
  • even if 1 is the correct answer, it might lead to some trivial deadlocks

Best practices

#6

Best practices

#6 - solution

// The exact minimum depends on your environment
val numCores = math.max(4, Runtime.getRuntime().availableProcessors())

Async stacktraces

Async stacktraces

Problem

object Main {
  def main(args: Array[String]): Unit =
    Await.result(Foo.foo, Duration.Inf)
}

object Foo {
  def foo: Future[Nothing] = 
    Future(123).flatMap(_ => Bar.bar)
}

object Bar {
  def bar: Future[Nothing] =
    Future(throw new IllegalArgumentException("Test exception"))
}
Exception in thread "main" java.lang.IllegalArgumentException: Test exception
  at Bar$.$anonfun$bar$1(Main.scala:19)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:654)
  at scala.util.Success.$anonfun$map$1(Try.scala:251)
  at scala.util.Success.map(Try.scala:209)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:288)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
  at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1425)
  at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
  at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
  at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
  at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

Async stacktraces

Problem

  • Reactor's onOperatorDebug()
  • Reactor Debug Agent
  • Kotlin Coroutines (e.g. Debug Mode)
  • IntelliJ Async Stack Traces
  • ZIO

Async stacktraces

Solution attempts

// ZIO
object Main {
  def main(args: Array[String]): Unit = {
    val runtime = new DefaultRuntime {}
    runtime.unsafeRun(Foo.foo)
  }
}

object Foo {
  def foo: Task[Nothing] =
    Task(123).flatMap(_ => Bar.bar)
}

object Bar {
  def bar: Task[Nothing] =
    Task(throw new IllegalArgumentException("Test exception"))
}

Async stacktraces

Example

Fiber failed.
A checked error was not handled.
java.lang.IllegalArgumentException: Test exception
  at Bar$.$anonfun$bar$1(Main.scala:19)
  at scalaz.zio.internal.FiberContext.evaluateNow(FiberContext.scala:384)
  at scalaz.zio.Runtime.unsafeRunAsync(Runtime.scala:92)
  at scalaz.zio.Runtime.unsafeRunAsync$(Runtime.scala:78)
  at Main$$anon$1.unsafeRunAsync(Main.scala:6)
  at scalaz.zio.Runtime.unsafeRunSync(Runtime.scala:67)
  at scalaz.zio.Runtime.unsafeRunSync$(Runtime.scala:64)
  at Main$$anon$1.unsafeRunSync(Main.scala:6)
  at scalaz.zio.Runtime.unsafeRun(Runtime.scala:56)
  at scalaz.zio.Runtime.unsafeRun$(Runtime.scala:55)
  at Main$$anon$1.unsafeRun(Main.scala:6)
  at Main$.main(Main.scala:7)
  at Main.main(Main.scala)

Async stacktraces

Example

Fiber failed.
A checked error was not handled.
java.lang.IllegalArgumentException: Test exception
  at Bar$.$anonfun$bar$1(Main.scala:19)
  at scalaz.zio.internal.FiberContext.evaluateNow(FiberContext.scala:384)
  at scalaz.zio.Runtime.unsafeRunAsync(Runtime.scala:92)
  at scalaz.zio.Runtime.unsafeRunAsync$(Runtime.scala:78)
  at Main$$anon$1.unsafeRunAsync(Main.scala:6)
  at scalaz.zio.Runtime.unsafeRunSync(Runtime.scala:67)
  at scalaz.zio.Runtime.unsafeRunSync$(Runtime.scala:64)
  at Main$$anon$1.unsafeRunSync(Main.scala:6)
  at scalaz.zio.Runtime.unsafeRun(Runtime.scala:56)
  at scalaz.zio.Runtime.unsafeRun$(Runtime.scala:55)
  at Main$$anon$1.unsafeRun(Main.scala:6)
  at Main$.main(Main.scala:7)
  at Main.main(Main.scala)

Fiber:0 execution trace:
  at Bar$.bar(Main.scala:19)
  at Foo$.foo(Main.scala:14)

Async stacktraces

Example

Summary

  • E-mail: pawel.jurczenko@gmail.com
  • Twitter: @pawel_jurczenko

Contact