Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,8 @@ def _fetch_request():
else:
raise

if hasattr(self.resource_manager, "scheduler_unhandled_request_num"):
self.resource_manager.scheduler_unhandled_request_num = self._get_scheduler_unhandled_request_num()
# 2. Schedule requests
tasks, error_tasks = self.resource_manager.schedule()

Expand Down Expand Up @@ -1063,6 +1065,20 @@ def _fetch_request():
err_msg = "Error happend while insert task to engine: {}, {}.".format(e, str(traceback.format_exc()))
self.llm_logger.error(err_msg)

def _get_scheduler_unhandled_request_num(self) -> int:
"""
Get scheduler-level pending request count when supported.
"""
get_unhandled = getattr(self.scheduler, "get_unhandled_request_num", None)
if not callable(get_unhandled):
return 0
try:
unhandled = int(get_unhandled())
except Exception as e:
self.llm_logger.debug(f"Failed to get scheduler unhandled request num: {e}")
return 0
return max(unhandled, 0)

def start_zmq_service(self, api_server_pid=None):
if api_server_pid is None:
return
Expand Down
106 changes: 56 additions & 50 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
self.current_reserve_output_block_num = self.init_reserve_output_block_num
self.current_reserve_output_block_num_float = self.init_reserve_output_block_num
self.can_relax_prefill_strategy = True
# Scheduler-side requests that have not been moved into resource manager waiting queue yet.
self.scheduler_unhandled_request_num = 0

def allocated_slots(self, request: Request):
return len(request.block_tables) * self.config.cache_config.block_size
Expand Down Expand Up @@ -957,56 +959,7 @@ def _allocate_decode_and_extend():
if self.current_reserve_output_block_num == 0:
self.can_relax_prefill_strategy = True

if (
hasattr(self, "scheduler_metrics_logger")
and self.scheduler_metrics_logger is not None
and envs.FD_CONSOLE_SCHEDULER_METRICS
):
total_blocks = self.total_block_number()
free_blocks = self.available_block_num()
used_blocks = max(total_blocks - free_blocks, 0)
tokens_used = used_blocks * self.config.cache_config.block_size
token_usage = used_blocks / total_blocks if total_blocks > 0 else 0.0
running_cnt = len(self.running)
queue_cnt = len(self.waiting)

prefill_reqs = [
r for r in scheduled_reqs if isinstance(r, Request) and r.task_type == RequestType.PREFILL
]
has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in scheduled_reqs)

self.scheduler_metrics_logger.log_prefill_batch(
prefill_reqs=prefill_reqs,
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
)
if has_decode:
has_prefill = len(prefill_reqs) > 0
graph_opt_cfg = self.config.graph_opt_config
use_cudagraph_cfg = bool(getattr(graph_opt_cfg, "use_cudagraph", False))
graph_opt_level = int(getattr(graph_opt_cfg, "graph_opt_level", 0) or 0)
full_cuda_graph = bool(getattr(graph_opt_cfg, "full_cuda_graph", True))
cudagraph_only_prefill = bool(getattr(graph_opt_cfg, "cudagraph_only_prefill", False))
use_decode_cudagraph = (
has_decode
and use_cudagraph_cfg
and (
# Reference PR https://github.com/PaddlePaddle/FastDeploy/pull/6196
# Static split graph mode: Prefill+Mixed and Decode can use CUDAGraph.
(graph_opt_level > 0 and not full_cuda_graph)
# Dynamic / static-full modes: decode-only can use CUDAGraph.
or (not has_prefill and not cudagraph_only_prefill)
)
)
self.scheduler_metrics_logger.log_decode_batch(
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
use_cudagraph=use_decode_cudagraph,
)
self._log_console_scheduler_metrics(scheduled_reqs)

self.update_metrics()

Expand Down Expand Up @@ -1451,3 +1404,56 @@ def log_status(self):
f"requests={self.requests}, "
f")"
)

def _log_console_scheduler_metrics(self, scheduled_reqs: list[Request | ScheduledDecodeTask]) -> None:
if not (
hasattr(self, "scheduler_metrics_logger")
and self.scheduler_metrics_logger is not None
and envs.FD_CONSOLE_SCHEDULER_METRICS
):
return

total_blocks = self.total_block_number()
free_blocks = self.available_block_num()
used_blocks = max(total_blocks - free_blocks, 0)
tokens_used = used_blocks * self.config.cache_config.block_size
token_usage = used_blocks / total_blocks if total_blocks > 0 else 0.0
running_cnt = len(self.running)
scheduler_queue_cnt = max(int(getattr(self, "scheduler_unhandled_request_num", 0) or 0), 0)
queue_cnt = len(self.waiting) + scheduler_queue_cnt

prefill_reqs = [r for r in scheduled_reqs if isinstance(r, Request) and r.task_type == RequestType.PREFILL]
has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in scheduled_reqs)

self.scheduler_metrics_logger.log_prefill_batch(
prefill_reqs=prefill_reqs,
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
)
if has_decode:
has_prefill = len(prefill_reqs) > 0
graph_opt_cfg = self.config.graph_opt_config
use_cudagraph_cfg = bool(getattr(graph_opt_cfg, "use_cudagraph", False))
graph_opt_level = int(getattr(graph_opt_cfg, "graph_opt_level", 0) or 0)
full_cuda_graph = bool(getattr(graph_opt_cfg, "full_cuda_graph", True))
cudagraph_only_prefill = bool(getattr(graph_opt_cfg, "cudagraph_only_prefill", False))
use_decode_cudagraph = (
has_decode
and use_cudagraph_cfg
and (
# Reference PR https://github.com/PaddlePaddle/FastDeploy/pull/6196
# Static split graph mode: Prefill+Mixed and Decode can use CUDAGraph.
(graph_opt_level > 0 and not full_cuda_graph)
# Dynamic / static-full modes: decode-only can use CUDAGraph.
or (not has_prefill and not cudagraph_only_prefill)
)
)
self.scheduler_metrics_logger.log_decode_batch(
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
use_cudagraph=use_decode_cudagraph,
)
24 changes: 24 additions & 0 deletions tests/engine/test_common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,3 +848,27 @@ def __init__(self, *a, **k):
eng._finalizer.detach()
except Exception:
pass

def test_get_scheduler_unhandled_request_num(self):
"""Cover _get_scheduler_unhandled_request_num normal/fallback paths."""
eng = EngineService.__new__(EngineService)
eng.llm_logger = Mock()

# Scheduler does not provide API -> fallback 0
eng.scheduler = object()
self.assertEqual(eng._get_scheduler_unhandled_request_num(), 0)

# Positive value -> return int value
eng.scheduler = type("SchedOK", (), {"get_unhandled_request_num": lambda self: "3"})()
self.assertEqual(eng._get_scheduler_unhandled_request_num(), 3)

# Negative value -> clamp to 0
eng.scheduler = type("SchedNeg", (), {"get_unhandled_request_num": lambda self: -5})()
self.assertEqual(eng._get_scheduler_unhandled_request_num(), 0)

# Exception -> debug log + fallback 0
eng.scheduler = type(
"SchedErr", (), {"get_unhandled_request_num": lambda self: (_ for _ in ()).throw(RuntimeError("boom"))}
)()
self.assertEqual(eng._get_scheduler_unhandled_request_num(), 0)
eng.llm_logger.debug.assert_called()
Loading