Skip to content

Commit 458ceed

Browse files
He-Pinpjfanning
andauthored
feat: Add virtualize support for thread-pool-executor (#2169)
* feat: Add virtualize support for thread-pool-executor * Update actor/src/main/resources/reference.conf Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com> * Update actor/src/main/resources/reference.conf Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com> * Update actor/src/main/resources/reference.conf Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com> --------- Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>
1 parent 7b96bb8 commit 458ceed

File tree

8 files changed

+167
-50
lines changed

8 files changed

+167
-50
lines changed

actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import com.typesafe.config.ConfigFactory
2222
import org.apache.pekko
2323
import pekko.actor.{ Actor, Props }
2424
import pekko.testkit.{ ImplicitSender, PekkoSpec }
25-
import pekko.util.JavaVersion
2625

2726
object ForkJoinPoolVirtualThreadSpec {
2827
val config = ConfigFactory.parseString("""
2928
|custom {
3029
| task-dispatcher {
3130
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
3231
| throughput = 5
32+
| executor = "fork-join-executor"
3333
| fork-join-executor {
3434
| parallelism-factor = 2
3535
| parallelism-max = 2
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import com.typesafe.config.ConfigFactory
21+
import org.apache.pekko
22+
import pekko.actor.{ Actor, Props }
23+
import pekko.testkit.{ ImplicitSender, PekkoSpec }
24+
25+
object ThreadPoolVirtualThreadSpec {
26+
val config = ConfigFactory.parseString("""
27+
|custom {
28+
| task-dispatcher {
29+
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
30+
| throughput = 5
31+
| executor = "thread-pool-executor"
32+
| thread-pool-executor {
33+
| fixed-pool-size = 4
34+
| virtualize = on
35+
| }
36+
| }
37+
|}
38+
""".stripMargin)
39+
40+
class ThreadNameActor extends Actor {
41+
42+
override def receive = {
43+
case "ping" =>
44+
sender() ! Thread.currentThread().getName
45+
}
46+
}
47+
48+
}
49+
50+
class ThreadPoolVirtualThreadSpec extends PekkoSpec(ThreadPoolVirtualThreadSpec.config) with ImplicitSender {
51+
import ThreadPoolVirtualThreadSpec._
52+
53+
"ThreadPool" must {
54+
55+
"support virtualization with Virtual Thread" in {
56+
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("custom.task-dispatcher"))
57+
for (_ <- 1 to 1000) {
58+
actor ! "ping"
59+
expectMsgPF() { case name: String =>
60+
name should include("ThreadPoolVirtualThreadSpec-custom.task-dispatcher-virtual-thread-")
61+
}
62+
}
63+
}
64+
65+
}
66+
}

actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,8 @@ ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.japi.JAPI")
148148
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.dispatch.ThreadPoolConfig")
149149
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.createExecutorServiceFactory")
150150
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory")
151-
151+
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.unapply")
152+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.apply")
153+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.copy")
154+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.this")
155+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig#ThreadPoolExecutorServiceFactory.this")

actor/src/main/resources/reference.conf

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ pekko {
482482
maximum-pool-size = 32767
483483

484484
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
485-
# When set to `on` but underlying runtime does not support virtual threads, an Exception will throw.
485+
# When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown.
486486
# Virtualize this dispatcher as a virtual-thread-executor
487487
# Valid values are: `on`, `off`
488488
#
@@ -543,6 +543,18 @@ pekko {
543543

544544
# Allow core threads to time out
545545
allow-core-timeout = on
546+
547+
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
548+
# When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown.
549+
# Virtualize this dispatcher as a virtual-thread-executor
550+
# Valid values are: `on`, `off`
551+
#
552+
# Requirements:
553+
# 1. JDK 21+
554+
# 2. add options to the JVM:
555+
# --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
556+
# --add-opens=java.base/java.lang=ALL-UNNAMED
557+
virtualize = off
546558
}
547559

548560
# This will be used if you have set "executor = "virtual-thread-executor"
@@ -600,6 +612,17 @@ pekko {
600612

601613
thread-pool-executor {
602614
fixed-pool-size = 16
615+
# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
616+
# When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown.
617+
# Virtualize this dispatcher as a virtual-thread-executor
618+
# Valid values are: `on`, `off`
619+
#
620+
# Requirements:
621+
# 1. JDK 21+
622+
# 2. add options to the JVM:
623+
# --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
624+
# --add-opens=java.base/java.lang=ALL-UNNAMED
625+
virtualize = off
603626
}
604627
}
605628

actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import pekko.dispatch.affinity.AffinityPoolConfigurator
2828
import pekko.dispatch.sysmsg._
2929
import pekko.event.EventStream
3030
import pekko.event.Logging.{ emptyMDC, Debug, Error, LogEventException, Warning }
31-
import pekko.util.{ unused, Index }
31+
import pekko.util.{ unused, Index, JavaVersion }
3232

3333
import com.typesafe.config.Config
3434

@@ -471,42 +471,49 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis
471471
trait ThreadPoolExecutorServiceFactoryProvider extends ExecutorServiceFactoryProvider {
472472
def threadPoolConfig: ThreadPoolConfig
473473
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
474-
class ThreadPoolExecutorServiceFactory(threadFactory: ThreadFactory) extends ExecutorServiceFactory {
474+
475+
object ThreadPoolExecutorServiceFactory extends ExecutorServiceFactory {
475476
def createExecutorService: ExecutorService = {
477+
val tf = threadFactory match {
478+
case m: MonitorableThreadFactory => m.withName(m.name + "-" + id)
479+
case _ => threadFactory
480+
}
481+
val poolThreadFactory = tf match {
482+
case m: MonitorableThreadFactory if isVirtualized => m.withName(m.name + "-" + "CarrierThread")
483+
case _ => tf
484+
}
485+
476486
val config = threadPoolConfig
477-
val service: ThreadPoolExecutor = new ThreadPoolExecutor(
487+
val pool = new ThreadPoolExecutor(
478488
config.corePoolSize,
479489
config.maxPoolSize,
480490
config.threadTimeout.length,
481491
config.threadTimeout.unit,
482492
config.queueFactory(),
483-
threadFactory,
493+
poolThreadFactory,
484494
config.rejectionPolicy) with LoadMetrics {
485495
def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize
486496
}
487-
service.allowCoreThreadTimeOut(config.allowCorePoolTimeout)
488-
service
489-
}
490-
}
497+
pool.allowCoreThreadTimeOut(config.allowCorePoolTimeout)
491498

492-
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
493-
val tf = threadFactory match {
494-
case m: MonitorableThreadFactory =>
495-
// add the dispatcher id to the thread names
496-
m.withName(m.name + "-" + id)
497-
case other => other
499+
if (isVirtualized) {
500+
val prefixName = threadFactory match {
501+
case m: MonitorableThreadFactory => m.name + "-" + id
502+
case _ => id
503+
}
504+
createVirtualized(tf, pool, prefixName)
505+
} else pool
498506
}
499-
new ThreadPoolExecutorServiceFactory(tf)
500507
}
501-
createExecutorServiceFactory(id, threadFactory)
508+
ThreadPoolExecutorServiceFactory
502509
}
503510
}
504511

505512
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
506513
extends ExecutorServiceConfigurator(config, prerequisites)
507514
with ThreadPoolExecutorServiceFactoryProvider {
508-
509515
override val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
516+
override val isVirtualized: Boolean = threadPoolConfig.isVirtualized && JavaVersion.majorVersion >= 21
510517

511518
protected def createThreadPoolConfigBuilder(
512519
config: Config,
@@ -516,6 +523,7 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
516523
ThreadPoolConfigBuilder(ThreadPoolConfig())
517524
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
518525
.setAllowCoreThreadTimeout(config.getBoolean("allow-core-timeout"))
526+
.isVirtualized(config.getBoolean("virtualize"))
519527
.configure(Some(config.getInt("task-queue-size")).flatMap {
520528
case size if size > 0 =>
521529
Some(config.getString("task-queue-type"))

actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ package org.apache.pekko.dispatch
1515

1616
import com.typesafe.config.Config
1717
import org.apache.pekko
18-
import pekko.dispatch.VirtualThreadSupport.newVirtualThreadFactory
1918
import pekko.util.JavaVersion
2019

21-
import java.util.concurrent.{ Executor, ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory, TimeUnit }
20+
import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory, TimeUnit }
2221

2322
object ForkJoinExecutorConfigurator {
2423

@@ -114,28 +113,8 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
114113
val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode)
115114

116115
if (isVirtualized) {
117-
// when virtualized, we need enhanced thread factory
118-
val factory: ThreadFactory = threadFactory match {
119-
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
120-
new ThreadFactory {
121-
private val vtFactory = newVirtualThreadFactory(name, pool) // use the pool as the scheduler
122-
123-
override def newThread(r: Runnable): Thread = {
124-
val vt = vtFactory.newThread(r)
125-
vt.setUncaughtExceptionHandler(exceptionHandler)
126-
contextClassLoader.foreach(vt.setContextClassLoader)
127-
vt
128-
}
129-
}
130-
case _ => newVirtualThreadFactory(prerequisites.settings.name, pool); // use the pool as the scheduler
131-
}
132-
// wrap the pool with virtualized executor service
133-
new VirtualizedExecutorService(
134-
factory, // the virtual thread factory
135-
pool, // the underlying pool
136-
(_: Executor) => pool.atFullThrottle(), // the load metrics provider, we use the pool itself
137-
cascadeShutdown = true // cascade shutdown
138-
)
116+
// we need to cast here,
117+
createVirtualized(threadFactory.asInstanceOf[ThreadFactory], pool, id)
139118
} else {
140119
pool
141120
}

actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@
1313

1414
package org.apache.pekko.dispatch
1515

16-
import org.apache.pekko.annotation.InternalApi
16+
import org.apache.pekko
17+
import pekko.annotation.InternalApi
18+
import pekko.dispatch.VirtualThreadSupport.newVirtualThreadFactory
1719

1820
import java.util.Collection
1921
import java.util.concurrent.{
2022
ArrayBlockingQueue,
2123
BlockingQueue,
2224
Callable,
25+
Executor,
2326
ExecutorService,
2427
ForkJoinPool,
2528
ForkJoinWorkerThread,
@@ -74,6 +77,34 @@ trait ExecutorServiceFactory {
7477
trait ExecutorServiceFactoryProvider {
7578
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory
7679
def isVirtualized: Boolean = false // can be overridden by implementations
80+
81+
protected def createVirtualized(
82+
threadFactory: ThreadFactory,
83+
pool: ExecutorService with LoadMetrics,
84+
prefixName: String): ExecutorService = {
85+
// when virtualized, we need enhanced thread factory
86+
val factory: ThreadFactory = threadFactory match {
87+
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
88+
new ThreadFactory {
89+
private val vtFactory = newVirtualThreadFactory(name, pool) // use the pool as the scheduler
90+
91+
override def newThread(r: Runnable): Thread = {
92+
val vt = vtFactory.newThread(r)
93+
vt.setUncaughtExceptionHandler(exceptionHandler)
94+
contextClassLoader.foreach(vt.setContextClassLoader)
95+
vt
96+
}
97+
}
98+
case _ => newVirtualThreadFactory(prefixName, pool); // use the pool as the scheduler
99+
}
100+
// wrap the pool with virtualized executor service
101+
new VirtualizedExecutorService(
102+
factory, // the virtual thread factory
103+
pool, // the underlying pool
104+
(_: Executor) => pool.atFullThrottle(), // the load metrics provider, we use the pool itself
105+
cascadeShutdown = true // cascade shutdown
106+
)
107+
}
77108
}
78109

79110
/**
@@ -88,7 +119,8 @@ final case class ThreadPoolConfig(
88119
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
89120
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
90121
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
91-
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) {
122+
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy,
123+
isVirtualized: Boolean = false) {
92124
// Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra
93125
// context information on the config
94126
@noinline
@@ -98,9 +130,11 @@ final case class ThreadPoolConfig(
98130
maxPoolSize: Int = maxPoolSize,
99131
threadTimeout: Duration = threadTimeout,
100132
queueFactory: ThreadPoolConfig.QueueFactory = queueFactory,
101-
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy
133+
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy,
134+
isVirtualized: Boolean = isVirtualized
102135
): ThreadPoolConfig =
103-
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy)
136+
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
137+
isVirtualized)
104138
}
105139

106140
/**
@@ -156,6 +190,9 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
156190
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder =
157191
this.copy(config = config.copy(queueFactory = newQueueFactory))
158192

193+
def isVirtualized(isVirtualized: Boolean): ThreadPoolConfigBuilder =
194+
this.copy(config = config.copy(isVirtualized = isVirtualized))
195+
159196
def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder =
160197
fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c))
161198
}

actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
2727

2828
@InternalApi
2929
private[dispatch] object VirtualThreadSupport {
30+
val zero = java.lang.Long.valueOf(0L)
3031
private val lookup = MethodHandles.publicLookup()
3132

3233
/**
@@ -67,7 +68,7 @@ private[dispatch] object VirtualThreadSupport {
6768
MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
6869
// TODO support replace scheduler when we drop Java 8 support
6970
val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory]))
70-
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L)
71+
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
7172
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
7273
} catch {
7374
case NonFatal(e) =>
@@ -93,7 +94,6 @@ private[dispatch] object VirtualThreadSupport {
9394
}
9495
val nameMethod = ofVirtualClass.getDeclaredMethod("name", classOf[String], classOf[Long])
9596
val factoryMethod = builderClass.getDeclaredMethod("factory")
96-
val zero = java.lang.Long.valueOf(0L)
9797
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
9898
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
9999
} catch {

0 commit comments

Comments
 (0)