Paweł Jurczenko, 2020
Hardware threads
OS threads
JVM threads
Green threads
Custom threads
cooperative
scheduling
preemptive
scheduling
threading
model (M:N)
threading
model (1:1)
Overview
Goals
Implementations
Reborn
Example
// java.util.concurrent.ArrayBlockingQueue
val queue = new ArrayBlockingQueue[String](capacity = 10)
queue.add("Allegro") // throws "IllegalStateException" when queue is full
queue.offer("Allegro") // returns "false" when queue is full
queue.put("Allegro") // blocks a thread when queue is full
// java.util.concurrent.ArrayBlockingQueue
val queue = new ArrayBlockingQueue[String](capacity = 10)
queue.add("Allegro") // throws "IllegalStateException" when queue is full
queue.offer("Allegro") // returns "false" when queue is full
queue.put("Allegro") // blocks a thread when queue is full
// monix.catnap.ConcurrentQueue
ConcurrentQueue[Future]
.bounded[String](capacity = 10)
.flatMap(queue => queue.offer("Allegro")) // returns "Future[Unit]"
Example
Risks of having too many threads
Common misconception
It's not about better I/O performance.
It's about more efficient resources usage.
Network I/O
* might change with io_uring
File I/O
Non-relational databases
JDBC (Java Database Connectivity):
Relational databases - problem
Low-level:
High-level:
Relational databases - solutions
Goal:
We want to execute some application-level code each time a new offer is inserted into the database.
Relational databases - example
CREATE FUNCTION offer_created() RETURNS trigger as $$
BEGIN
PERFORM pg_notify('offers', NEW.offer_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER offer_created_trigger
AFTER INSERT ON offers
FOR EACH ROW EXECUTE PROCEDURE offer_created();
Relational databases - example
// Skunk
val session: Session[IO] = Session.single(
host = "localhost",
port = 5432,
user = "pjurczenko",
database = "allegro"
).acquire()
val notifications = session
.channel(id"offers")
.listen(maxQueued = 10) // fs2.Stream[IO, Notification]
.toUnicastPublisher() // org.reactivestreams.Publisher[Notification]
.toFlux() // reactor.core.publisher.Flux[Notification]
// * acquire() and toFlux() are custom extension methods
Relational databases - example
ADBA (Asynchronous Database Access API)
R2DBC (Reactive Relational Database Connectivity)
Relational databases - generic solutions
Separate CPU-bound tasks from blocking I/O tasks.
Many applications will work fine with Scala's global thread pool. However, when we have strong performance requirements,
it's good to have at least three different thread pools:
Overview
CPU-bound tasks
Multiple thread pools
Blocking I/O tasks
Non-blocking I/O tasks
CachedThreadPool
Blocking
network I/O
(e.g. OkHttp)
Blocking
database I/O
(e.g. JDBC)
Blocking
file I/O
(e.g. java.io)
e.g. max(50)
e.g. max(10)
e.g. max(20)
Unbounded pool for blocking I/O
Avoid concurrency for as long as possible.
#1
Prefer high-level concurrency over low-level concurrency.
Times when you had to use wait() and notify() are long gone.
#2
Choose concurrency primitives carefully.
There is a reason for the existence of so many of them:
each addresses a different problem.
#3
Know your thread pools.
Control your own thread pools and identify
thread pools from external libraries.
Otherwise this might hit you at the worst time.
#4
#4 - example
// OkHttp
val dispatcher = new Dispatcher()
dispatcher.setMaxRequestsPerHost(100)
dispatcher.setMaxRequests(100)
val client = new OkHttpClient()
.newBuilder()
.dispatcher(dispatcher)
.build()
#4 - example
Prefer libraries with pluggable thread pools.
#5
#5 - example
// OkHttp
val threadPool = Executors.newFixedThreadPool(100)
val dispatcher = new Dispatcher(threadPool)
dispatcher.setMaxRequestsPerHost(100)
dispatcher.setMaxRequests(100)
val client = new OkHttpClient()
.newBuilder()
.dispatcher(dispatcher)
.build()
#5 - example
Be careful with Runtime.getRuntime().availableProcessors()
There are two reasons for that:
#6
#6 - solution
// The exact minimum depends on your environment
val numCores = math.max(4, Runtime.getRuntime().availableProcessors())
Problem
object Main {
def main(args: Array[String]): Unit =
Await.result(Foo.foo, Duration.Inf)
}
object Foo {
def foo: Future[Nothing] =
Future(123).flatMap(_ => Bar.bar)
}
object Bar {
def bar: Future[Nothing] =
Future(throw new IllegalArgumentException("Test exception"))
}
Exception in thread "main" java.lang.IllegalArgumentException: Test exception
at Bar$.$anonfun$bar$1(Main.scala:19)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:654)
at scala.util.Success.$anonfun$map$1(Try.scala:251)
at scala.util.Success.map(Try.scala:209)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:288)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1425)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Problem
Solution attempts
// ZIO
object Main {
def main(args: Array[String]): Unit = {
val runtime = new DefaultRuntime {}
runtime.unsafeRun(Foo.foo)
}
}
object Foo {
def foo: Task[Nothing] =
Task(123).flatMap(_ => Bar.bar)
}
object Bar {
def bar: Task[Nothing] =
Task(throw new IllegalArgumentException("Test exception"))
}
Example
Fiber failed.
A checked error was not handled.
java.lang.IllegalArgumentException: Test exception
at Bar$.$anonfun$bar$1(Main.scala:19)
at scalaz.zio.internal.FiberContext.evaluateNow(FiberContext.scala:384)
at scalaz.zio.Runtime.unsafeRunAsync(Runtime.scala:92)
at scalaz.zio.Runtime.unsafeRunAsync$(Runtime.scala:78)
at Main$$anon$1.unsafeRunAsync(Main.scala:6)
at scalaz.zio.Runtime.unsafeRunSync(Runtime.scala:67)
at scalaz.zio.Runtime.unsafeRunSync$(Runtime.scala:64)
at Main$$anon$1.unsafeRunSync(Main.scala:6)
at scalaz.zio.Runtime.unsafeRun(Runtime.scala:56)
at scalaz.zio.Runtime.unsafeRun$(Runtime.scala:55)
at Main$$anon$1.unsafeRun(Main.scala:6)
at Main$.main(Main.scala:7)
at Main.main(Main.scala)
Example
Fiber failed.
A checked error was not handled.
java.lang.IllegalArgumentException: Test exception
at Bar$.$anonfun$bar$1(Main.scala:19)
at scalaz.zio.internal.FiberContext.evaluateNow(FiberContext.scala:384)
at scalaz.zio.Runtime.unsafeRunAsync(Runtime.scala:92)
at scalaz.zio.Runtime.unsafeRunAsync$(Runtime.scala:78)
at Main$$anon$1.unsafeRunAsync(Main.scala:6)
at scalaz.zio.Runtime.unsafeRunSync(Runtime.scala:67)
at scalaz.zio.Runtime.unsafeRunSync$(Runtime.scala:64)
at Main$$anon$1.unsafeRunSync(Main.scala:6)
at scalaz.zio.Runtime.unsafeRun(Runtime.scala:56)
at scalaz.zio.Runtime.unsafeRun$(Runtime.scala:55)
at Main$$anon$1.unsafeRun(Main.scala:6)
at Main$.main(Main.scala:7)
at Main.main(Main.scala)
Fiber:0 execution trace:
at Bar$.bar(Main.scala:19)
at Foo$.foo(Main.scala:14)
Example
Contact