Skip to content

Commit b118aba

Browse files
committed
Limit default concurrency to prevent allocation overshoot. Fixes #23.
1 parent 234f900 commit b118aba

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

lib/async/pool/controller.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ def self.wrap(**options, &block)
2727
#
2828
# @parameter constructor [Proc] A block which creates a new resource.
2929
# @parameter limit [Integer | Nil] The maximum number of resources that this pool can have at any given time. If nil, the pool can have an unlimited number of resources.
30-
# @parameter concurrency [Integer] The maximum number of concurrent tasks that can be creating a new resource.
30+
# @parameter concurrency [Integer] The maximum number of concurrent tasks that can be creating a new resource. Defaults to 1 to ensure the pool limit is enforced. Higher values may result in more resources being created than the limit under high load.
3131
# @parameter policy [Policy] The pool policy.
32-
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
32+
def initialize(constructor, limit: nil, concurrency: 1, policy: nil, tags: nil)
3333
@constructor = constructor
3434
@limit = limit
3535

test/async/pool/controller.rb

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,4 +443,47 @@ def failures(repeats: 500, time_scale: 0.001, &block)
443443
expect(pool.resources).to be(:empty?)
444444
end
445445
end
446+
447+
with "high concurrency with limit" do
448+
it "does not exceed the configured limit" do
449+
# Track the maximum pool size we observe
450+
max_size = 0
451+
mutex = Thread::Mutex.new
452+
453+
constructor = proc do
454+
# Introduce a small delay during resource creation to widen the race window
455+
sleep(0.001)
456+
Async::Pool::Resource.new
457+
end
458+
459+
# Default concurrency is limit (10), which is the issue
460+
pool = subject.new(constructor, limit: 10)
461+
462+
tasks = []
463+
464+
# Create many concurrent tasks that all try to acquire at once
465+
100.times do
466+
tasks << Async do
467+
pool.acquire do |resource|
468+
# Track the maximum size of the pool thread-safely
469+
mutex.synchronize do
470+
current_size = pool.size
471+
max_size = current_size if current_size > max_size
472+
end
473+
474+
# Simulate brief work
475+
sleep(0.001)
476+
end
477+
end
478+
end
479+
480+
# Wait for all tasks to complete
481+
tasks.each(&:wait)
482+
483+
# The pool should never have exceeded the limit
484+
expect(max_size).to be <= 10
485+
486+
pool.close
487+
end
488+
end
446489
end

0 commit comments

Comments
 (0)