Skip to content

Commit 7b96bb8

Browse files
authored
chore: Remove ExecutorServiceFactoryProvider from ThreadPoolConfig (#2175)
1 parent 8d9450b commit 7b96bb8

File tree

4 files changed

+56
-37
lines changed

4 files changed

+56
-37
lines changed

actor/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,7 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.Un
145145
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.UnitPFBuilder.matchEquals")
146146
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.japi.pf.UnitPFBuilder.matchAny")
147147
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.japi.JAPI")
148+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.dispatch.ThreadPoolConfig")
149+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.createExecutorServiceFactory")
150+
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.ThreadPoolConfig$ThreadPoolExecutorServiceFactory")
151+

actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.util.control.NonFatal
2323

2424
import org.apache.pekko
2525
import pekko.actor._
26-
import pekko.annotation.InternalStableApi
26+
import pekko.annotation.{ InternalApi, InternalStableApi }
2727
import pekko.dispatch.affinity.AffinityPoolConfigurator
2828
import pekko.dispatch.sysmsg._
2929
import pekko.event.EventStream
@@ -464,10 +464,49 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis
464464
}
465465
}
466466

467+
/**
468+
* INTERNAL API
469+
*/
470+
@InternalApi
471+
trait ThreadPoolExecutorServiceFactoryProvider extends ExecutorServiceFactoryProvider {
472+
def threadPoolConfig: ThreadPoolConfig
473+
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
474+
class ThreadPoolExecutorServiceFactory(threadFactory: ThreadFactory) extends ExecutorServiceFactory {
475+
def createExecutorService: ExecutorService = {
476+
val config = threadPoolConfig
477+
val service: ThreadPoolExecutor = new ThreadPoolExecutor(
478+
config.corePoolSize,
479+
config.maxPoolSize,
480+
config.threadTimeout.length,
481+
config.threadTimeout.unit,
482+
config.queueFactory(),
483+
threadFactory,
484+
config.rejectionPolicy) with LoadMetrics {
485+
def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize
486+
}
487+
service.allowCoreThreadTimeOut(config.allowCorePoolTimeout)
488+
service
489+
}
490+
}
491+
492+
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
493+
val tf = threadFactory match {
494+
case m: MonitorableThreadFactory =>
495+
// add the dispatcher id to the thread names
496+
m.withName(m.name + "-" + id)
497+
case other => other
498+
}
499+
new ThreadPoolExecutorServiceFactory(tf)
500+
}
501+
createExecutorServiceFactory(id, threadFactory)
502+
}
503+
}
504+
467505
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
468-
extends ExecutorServiceConfigurator(config, prerequisites) {
506+
extends ExecutorServiceConfigurator(config, prerequisites)
507+
with ThreadPoolExecutorServiceFactoryProvider {
469508

470-
val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
509+
override val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
471510

472511
protected def createThreadPoolConfigBuilder(
473512
config: Config,
@@ -505,9 +544,6 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
505544
else
506545
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))
507546
}
508-
509-
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
510-
threadPoolConfig.createExecutorServiceFactory(id, threadFactory)
511547
}
512548

513549
class DefaultExecutorServiceConfigurator(

actor/src/main/scala/org/apache/pekko/dispatch/PinnedDispatcher.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ class PinnedDispatcher(
3636
_id,
3737
Int.MaxValue,
3838
Duration.Zero,
39-
_threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1),
39+
new ThreadPoolExecutorServiceFactoryProvider() {
40+
override def threadPoolConfig: ThreadPoolConfig = _threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1)
41+
},
4042
_shutdownTimeout) {
4143

4244
@volatile

actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package org.apache.pekko.dispatch
1515

16+
import org.apache.pekko.annotation.InternalApi
17+
1618
import java.util.Collection
1719
import java.util.concurrent.{
1820
ArrayBlockingQueue,
@@ -30,7 +32,6 @@ import java.util.concurrent.{
3032
TimeUnit
3133
}
3234
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
33-
3435
import scala.concurrent.{ BlockContext, CanAwait }
3536
import scala.concurrent.duration.Duration
3637

@@ -76,16 +77,18 @@ trait ExecutorServiceFactoryProvider {
7677
}
7778

7879
/**
79-
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
80+
* INTERNAL API
81+
*
82+
* Configuration object for ThreadPoolExecutor
8083
*/
84+
@InternalApi
8185
final case class ThreadPoolConfig(
8286
allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
8387
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
8488
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
8589
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
8690
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
87-
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy)
88-
extends ExecutorServiceFactoryProvider {
91+
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) {
8992
// Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra
9093
// context information on the config
9194
@noinline
@@ -98,32 +101,6 @@ final case class ThreadPoolConfig(
98101
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy
99102
): ThreadPoolConfig =
100103
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy)
101-
102-
class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory {
103-
def createExecutorService: ExecutorService = {
104-
val service: ThreadPoolExecutor = new ThreadPoolExecutor(
105-
corePoolSize,
106-
maxPoolSize,
107-
threadTimeout.length,
108-
threadTimeout.unit,
109-
queueFactory(),
110-
threadFactory,
111-
rejectionPolicy) with LoadMetrics {
112-
def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize
113-
}
114-
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
115-
service
116-
}
117-
}
118-
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
119-
val tf = threadFactory match {
120-
case m: MonitorableThreadFactory =>
121-
// add the dispatcher id to the thread names
122-
m.withName(m.name + "-" + id)
123-
case other => other
124-
}
125-
new ThreadPoolExecutorServiceFactory(tf)
126-
}
127104
}
128105

129106
/**

0 commit comments

Comments
 (0)