Skip to content

Commit 28fabdd

Browse files
authored
feat: Add support for rest scan planning (#2864)
related to #2775 # Rationale for this change Adds **synchornous** client-side support for REST server side scan planning, allowing for scanning if the rest catalog supports it. This PR cherry-picks and builds on two WIP PRs: - Rest Models #2861 - Endpoints PR #2848 Currently scanning is enable with rest-scan-planning-enabled=true in catalog properties. TODO: spec handling ## Are these changes tested? Integration tests added with manual testing ## Are there any user-facing changes? yes
1 parent 2cdfede commit 28fabdd

File tree

8 files changed

+898
-172
lines changed

8 files changed

+898
-172
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,10 @@ def namespace_to_string(identifier: str | Identifier, err: type[ValueError] | ty
722722

723723
return ".".join(segment.strip() for segment in tuple_identifier)
724724

725+
def supports_server_side_planning(self) -> bool:
726+
"""Check if the catalog supports server-side scan planning."""
727+
return False
728+
725729
@staticmethod
726730
def identifier_to_database(
727731
identifier: str | Identifier, err: type[ValueError] | type[NoSuchNamespaceError] = ValueError

pyiceberg/catalog/rest/__init__.py

Lines changed: 120 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from collections import deque
1718
from enum import Enum
1819
from typing import (
1920
TYPE_CHECKING,
2021
Any,
2122
Union,
2223
)
2324

24-
from pydantic import ConfigDict, Field, field_validator
25+
from pydantic import ConfigDict, Field, TypeAdapter, field_validator
2526
from requests import HTTPError, Session
2627
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
2728

@@ -36,6 +37,16 @@
3637
)
3738
from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
3839
from pyiceberg.catalog.rest.response import _handle_non_200_response
40+
from pyiceberg.catalog.rest.scan_planning import (
41+
FetchScanTasksRequest,
42+
PlanCancelled,
43+
PlanCompleted,
44+
PlanFailed,
45+
PlanningResponse,
46+
PlanSubmitted,
47+
PlanTableScanRequest,
48+
ScanTasks,
49+
)
3950
from pyiceberg.exceptions import (
4051
AuthorizationExpiredError,
4152
CommitFailedException,
@@ -44,6 +55,7 @@
4455
NamespaceNotEmptyError,
4556
NoSuchIdentifierError,
4657
NoSuchNamespaceError,
58+
NoSuchPlanTaskError,
4759
NoSuchTableError,
4860
NoSuchViewError,
4961
TableAlreadyExistsError,
@@ -56,6 +68,7 @@
5668
CommitTableRequest,
5769
CommitTableResponse,
5870
CreateTableTransaction,
71+
FileScanTask,
5972
StagedTable,
6073
Table,
6174
TableIdentifier,
@@ -316,6 +329,9 @@ class ListViewsResponse(IcebergBaseModel):
316329
identifiers: list[ListViewResponseEntry] = Field()
317330

318331

332+
_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)
333+
334+
319335
class RestCatalog(Catalog):
320336
uri: str
321337
_session: Session
@@ -375,15 +391,113 @@ def _create_session(self) -> Session:
375391

376392
return session
377393

378-
def is_rest_scan_planning_enabled(self) -> bool:
379-
"""Check if rest server-side scan planning is enabled.
394+
def supports_server_side_planning(self) -> bool:
395+
"""Check if the catalog supports server-side scan planning."""
396+
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool(
397+
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
398+
)
399+
400+
@retry(**_RETRY_ARGS)
401+
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:
402+
"""Submit a scan plan request to the REST server.
403+
404+
Args:
405+
identifier: Table identifier.
406+
request: The scan plan request parameters.
380407
381408
Returns:
382-
True if enabled, False otherwise.
409+
PlanningResponse the result of the scan plan request representing the status
410+
411+
Raises:
412+
NoSuchTableError: If a table with the given identifier does not exist.
383413
"""
384-
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool(
385-
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
414+
self._check_endpoint(Capability.V1_SUBMIT_TABLE_SCAN_PLAN)
415+
response = self._session.post(
416+
self.url(Endpoints.plan_table_scan, prefixed=True, **self._split_identifier_for_path(identifier)),
417+
data=request.model_dump_json(by_alias=True, exclude_none=True).encode(UTF8),
386418
)
419+
try:
420+
response.raise_for_status()
421+
except HTTPError as exc:
422+
_handle_non_200_response(exc, {404: NoSuchTableError})
423+
424+
return _PLANNING_RESPONSE_ADAPTER.validate_json(response.text)
425+
426+
@retry(**_RETRY_ARGS)
427+
def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) -> ScanTasks:
428+
"""Fetch additional scan tasks using a plan task token.
429+
430+
Args:
431+
identifier: Table identifier.
432+
plan_task: The plan task token from a previous response.
433+
434+
Returns:
435+
ScanTasks containing file scan tasks and possibly more plan-task tokens.
436+
437+
Raises:
438+
NoSuchPlanTaskError: If a plan task with the given identifier or task does not exist.
439+
"""
440+
self._check_endpoint(Capability.V1_TABLE_SCAN_PLAN_TASKS)
441+
request = FetchScanTasksRequest(plan_task=plan_task)
442+
response = self._session.post(
443+
self.url(Endpoints.fetch_scan_tasks, prefixed=True, **self._split_identifier_for_path(identifier)),
444+
data=request.model_dump_json(by_alias=True).encode(UTF8),
445+
)
446+
try:
447+
response.raise_for_status()
448+
except HTTPError as exc:
449+
_handle_non_200_response(exc, {404: NoSuchPlanTaskError})
450+
451+
return ScanTasks.model_validate_json(response.text)
452+
453+
def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> list[FileScanTask]:
454+
"""Plan a table scan and return FileScanTasks.
455+
456+
Handles the full scan planning lifecycle including pagination.
457+
458+
Args:
459+
identifier: Table identifier.
460+
request: The scan plan request parameters.
461+
462+
Returns:
463+
List of FileScanTask objects ready for execution.
464+
465+
Raises:
466+
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
467+
NotImplementedError: If async planning is required but not yet supported.
468+
"""
469+
response = self._plan_table_scan(identifier, request)
470+
471+
if isinstance(response, PlanFailed):
472+
error_msg = response.error.message if response.error else "unknown error"
473+
raise RuntimeError(f"Received status: failed: {error_msg}")
474+
475+
if isinstance(response, PlanCancelled):
476+
raise RuntimeError("Received status: cancelled")
477+
478+
if isinstance(response, PlanSubmitted):
479+
# TODO: implement polling for async planning
480+
raise NotImplementedError(f"Async scan planning not yet supported for planId: {response.plan_id}")
481+
482+
if not isinstance(response, PlanCompleted):
483+
raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}")
484+
485+
tasks: list[FileScanTask] = []
486+
487+
# Collect tasks from initial response
488+
for task in response.file_scan_tasks:
489+
tasks.append(FileScanTask.from_rest_response(task, response.delete_files))
490+
491+
# Fetch and collect from additional batches
492+
pending_tasks = deque(response.plan_tasks)
493+
while pending_tasks:
494+
plan_task = pending_tasks.popleft()
495+
batch = self._fetch_scan_tasks(identifier, plan_task)
496+
for task in batch.file_scan_tasks:
497+
tasks.append(FileScanTask.from_rest_response(task, batch.delete_files))
498+
pending_tasks.extend(batch.plan_tasks)
499+
500+
return tasks
387501

388502
def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
389503
"""Create the LegacyOAuth2AuthManager by fetching required properties.

pyiceberg/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ class NoSuchNamespaceError(Exception):
5252
"""Raised when a referenced name-space is not found."""
5353

5454

55+
class NoSuchPlanTaskError(Exception):
56+
"""Raised when a scan plan task is not found."""
57+
58+
5559
class RESTError(Exception):
5660
"""Raises when there is an unknown response from the REST Catalog."""
5761

pyiceberg/manifest.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,28 @@ def __repr__(self) -> str:
6969
"""Return the string representation of the DataFileContent class."""
7070
return f"DataFileContent.{self.name}"
7171

72+
@staticmethod
73+
def from_rest_type(content_type: str) -> DataFileContent:
74+
"""Convert REST API content type string to DataFileContent.
75+
76+
Args:
77+
content_type: REST API content type.
78+
79+
Returns:
80+
The corresponding DataFileContent enum value.
81+
82+
Raises:
83+
ValueError: If the content type is unknown.
84+
"""
85+
mapping = {
86+
"data": DataFileContent.DATA,
87+
"position-deletes": DataFileContent.POSITION_DELETES,
88+
"equality-deletes": DataFileContent.EQUALITY_DELETES,
89+
}
90+
if content_type not in mapping:
91+
raise ValueError(f"Invalid file content value: {content_type}")
92+
return mapping[content_type]
93+
7294

7395
class ManifestContent(int, Enum):
7496
DATA = 0

0 commit comments

Comments
 (0)