Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ repos:
hooks:
- id: flake8
additional_dependencies: [
'setuptools==80.10.2',
'pycodestyle==2.9.1', # E,W
'pyflakes==2.5.0', # F
'mccabe==0.7.0', # C
Expand Down
85 changes: 85 additions & 0 deletions api/api_central_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright (C) 2022-Present Indoc Systems
#
# Licensed under the GNU AFFERO GENERAL PUBLIC LICENSE,
# Version 3.0 (the "License") available at https://www.gnu.org/licenses/agpl-3.0.en.html.
# You may not use this file except in compliance with the License.

from typing import Annotated
from typing import ClassVar
from uuid import UUID
from uuid import uuid4

import httpx
from common import has_file_permission
from fastapi import APIRouter
from fastapi import Depends
from fastapi import Header
from fastapi import Response
from fastapi_utils import cbv
from pydantic import BaseModel

from api.api_dataset_rest_proxy import ProxyPass
from app.auth import jwt_required
from app.components.exceptions import APIException
from app.components.user.models import CurrentUser
from app.logger import logger
from config import Settings
from config import get_settings
from models.api_response import EAPIResponseCode
from services.meta import async_get_entity_by_id

router = APIRouter(tags=['Central Node'])


class InitFileUploadSchema(BaseModel):
file_id: UUID
session_id: str


@cbv.cbv(router)
class CopyToCentralNode(ProxyPass):
request_allowed_parameters: ClassVar[set[str]] = set()
response_allowed_headers: ClassVar[set[str]] = {'Content-Type'}

current_user: CurrentUser = Depends(jwt_required)
settings: Settings = Depends(get_settings)

@router.post('/central-node/upload', summary='Initiate file upload to the Central Node.')
async def init(self, body: InitFileUploadSchema) -> Response:
file_node = await async_get_entity_by_id(str(body.file_id))
if not await has_file_permission(self.settings.AUTH_SERVICE, file_node, 'copy', self.current_user):
raise APIException(error_msg='Permission denied', status_code=EAPIResponseCode.forbidden.value)

file_id = file_node['id']
project_code = file_node['container_code']
job_id = str(uuid4())
session_id = body.session_id
logger.info(
f'Init a file upload to central node for file_id: {file_id}, job_id: {job_id}, session_id: {session_id}'
)
async with httpx.AsyncClient(timeout=self.settings.CENTRAL_NODE_CLIENT_TIMEOUT_SECONDS) as client:
raw_response = await client.post(
f'{self.settings.DATAOPS_SERVICE}central-node/upload',
data={
'file_id': file_id,
'project_code': project_code,
'job_id': job_id,
'session_id': session_id,
'operator': self.current_user.username,
},
)

return await self.process_response(raw_response)
Comment on lines +68 to +72
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_response() calls raise_for_status(), which will turn any non-2xx response from DataOps into an UnhandledException (500) instead of forwarding the real status/body (e.g., 4xx from DataOps). For these proxy-style endpoints, return a FastAPI Response using raw_response.content, raw_response.status_code, and filtered headers without raising on status, or override raise_for_response_status() for this class to allow pass-through errors.

Copilot uses AI. Check for mistakes.

@router.get('/central-node/upload/{upload_key}', summary='Wait file upload authorization from the Central Node.')
async def wait(self, upload_key: str, authorization: Annotated[str | None, Header()] = None) -> Response:
if authorization is None:
raise APIException(error_msg='Missing Authorization header.', status_code=EAPIResponseCode.forbidden.value)

Comment on lines +74 to +78
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Missing Authorization header check will never run when the header is absent because current_user: Depends(jwt_required) is resolved before the route handler; jwt_required/get_current_identity currently decodes the token unconditionally and will raise, resulting in a 500 instead of the intended 403. Either remove jwt_required for this endpoint (if it should accept a non-JWT auth header), or make the auth dependency handle missing/invalid Authorization cleanly (e.g., raise APIException/401) and then drop this redundant check.

Copilot uses AI. Check for mistakes.
async with httpx.AsyncClient(timeout=self.settings.CENTRAL_NODE_PULL_CLIENT_TIMEOUT_SECONDS) as client:
raw_response = await client.get(
f'{self.settings.DATAOPS_SERVICE}central-node/upload/{upload_key}',
headers={'Authorization': authorization},
)

return await self.process_response(raw_response)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as in init: using process_response() will raise on DataOps 4xx/5xx and convert it into a 500 from BFF. For a long-polling/authorization wait endpoint, it’s especially important to pass through DataOps status codes (including non-200) rather than raising.

Suggested change
return await self.process_response(raw_response)
return Response(
content=raw_response.content,
status_code=raw_response.status_code,
)

Copilot uses AI. Check for mistakes.
2 changes: 2 additions & 0 deletions app/api_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from api import api_archive
from api import api_auth
from api import api_bridge
from api import api_central_node
from api import api_contact_us
from api import api_dataset_rest_proxy
from api import api_download
Expand Down Expand Up @@ -68,6 +69,7 @@ def api_registry(app: FastAPI) -> None:
app.include_router(copy_request.router, prefix='/v1')
app.include_router(data_manifest.router, prefix='/v1')
app.include_router(api_dataset_rest_proxy.router, prefix='/v1')
app.include_router(api_central_node.router, prefix='/v1')
app.include_router(api_folder.router, prefix='/v1')
app.include_router(api_invitation.router, prefix='/v1')
app.include_router(api_schema.router, prefix='/v1')
Expand Down
3 changes: 3 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class Settings(BaseSettings):

SERVICE_CLIENT_TIMEOUT: int = 5

CENTRAL_NODE_CLIENT_TIMEOUT_SECONDS: int = 30
CENTRAL_NODE_PULL_CLIENT_TIMEOUT_SECONDS: int = 300

PROJECT_NAME: str = 'Pilot'

STARTING_PROJECT_CODE: str = 'indoctestproject'
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "core"
version = "2.2.78"
version = "2.2.79"
description = ""
authors = ["Indoc Research"]

Expand Down
96 changes: 96 additions & 0 deletions tests/api/test_api_central_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (C) 2022-Present Indoc Systems
#
# Licensed under the GNU AFFERO GENERAL PUBLIC LICENSE,
# Version 3.0 (the "License") available at https://www.gnu.org/licenses/agpl-3.0.en.html.
# You may not use this file except in compliance with the License.

from app.components.user.models import CurrentUser
from models.user_type import EUserRole


class TestCopyToCentralNode:

async def test_init_makes_upload_request_to_dataops_service(
self, mocker, settings, test_async_client, httpx_mock, fake, has_permission_true
):
username = fake.user_name()
project_code = fake.project_code()
realm_roles = [f'{project_code}-{EUserRole.admin.name}']
session_id = fake.uuid4()
file_id = fake.uuid4()
file_node = {
'id': file_id,
'container_type': 'project',
'container_code': project_code,
'zone': 0,
'parent_path': 'test',
}

mocker.patch(
'app.auth.get_current_identity',
return_value=CurrentUser({'role': 'member', 'username': username, 'realm_roles': realm_roles}),
)
httpx_mock.add_response(url=f'{settings.METADATA_SERVICE}item/{file_id}/', json={'result': file_node})

httpx_mock.add_response(method='POST', url=f'{settings.DATAOPS_SERVICE}central-node/upload', json={})

headers = {'Authorization': ''}
response = await test_async_client.post(
'/v1/central-node/upload',
json={'file_id': file_id, 'session_id': session_id},
headers=headers,
)

assert response.status_code == 200

async def test_init_returns_forbidden_when_permission_denied(
self, mocker, settings, test_async_client, httpx_mock, fake, has_permission_false
):
username = fake.user_name()
project_code = fake.project_code()
realm_roles = [f'{project_code}-{EUserRole.admin.name}']
file_id = fake.uuid4()
file_node = {
'id': file_id,
'container_type': 'project',
'container_code': project_code,
'zone': 0,
'parent_path': 'test',
}

mocker.patch(
'app.auth.get_current_identity',
return_value=CurrentUser({'role': 'member', 'username': username, 'realm_roles': realm_roles}),
)
httpx_mock.add_response(url=f'{settings.METADATA_SERVICE}item/{file_id}/', json={'result': file_node})

headers = {'Authorization': ''}
response = await test_async_client.post(
'/v1/central-node/upload',
json={'file_id': file_id, 'session_id': fake.uuid4()},
headers=headers,
)

assert response.status_code == 403
assert response.json()['error_msg'] == 'Permission denied'

async def test_wait_makes_upload_request_to_dataops_service(
self, mocker, settings, test_async_client, httpx_mock, fake
):
username = fake.user_name()
project_code = fake.project_code()
realm_roles = [f'{project_code}-{EUserRole.admin.name}']
upload_key = fake.sha256()

mocker.patch(
'app.auth.get_current_identity',
return_value=CurrentUser({'role': 'member', 'username': username, 'realm_roles': realm_roles}),
)
httpx_mock.add_response(url=f'{settings.DATAOPS_SERVICE}central-node/upload/{upload_key}', json={})

response = await test_async_client.get(
f'/v1/central-node/upload/{upload_key}',
headers={'Authorization': f'Bearer {fake.sha256()}'},
)

assert response.status_code == 200
Loading