Skip to content

Commit fee207b

Browse files
authored
feat: support collection export (#1448)
1 parent f4426ec commit fee207b

File tree

27 files changed

+2098
-1192
lines changed

27 files changed

+2098
-1192
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
exportTaskResponse:
2+
type: object
3+
required:
4+
- export_task_id
5+
- status
6+
properties:
7+
export_task_id:
8+
type: string
9+
description: Unique ID of the export task
10+
status:
11+
type: string
12+
enum:
13+
- PENDING
14+
- PROCESSING
15+
- COMPLETED
16+
- FAILED
17+
- EXPIRED
18+
description: Current status of the export task
19+
progress:
20+
type: integer
21+
minimum: 0
22+
maximum: 100
23+
description: Progress percentage (0-100)
24+
message:
25+
type: string
26+
nullable: true
27+
description: Human-readable status message
28+
error_message:
29+
type: string
30+
nullable: true
31+
description: Error detail when status is FAILED
32+
download_url:
33+
type: string
34+
nullable: true
35+
description: URL to download the ZIP file (only set when status is COMPLETED)
36+
file_size:
37+
type: integer
38+
nullable: true
39+
description: Size of the ZIP file in bytes
40+
gmt_created:
41+
type: string
42+
format: date-time
43+
nullable: true
44+
gmt_completed:
45+
type: string
46+
format: date-time
47+
nullable: true
48+
gmt_expires:
49+
type: string
50+
format: date-time
51+
nullable: true
52+
description: Time when the export file will be automatically deleted (7 days after creation)

aperag/api/openapi.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,11 @@ paths:
238238

239239
/agent/message:
240240
$ref: './paths/agent.yaml'
241+
242+
# export
243+
/collections/{collection_id}/export:
244+
$ref: './paths/export.yaml#/createExportTask'
245+
/export-tasks/{task_id}:
246+
$ref: './paths/export.yaml#/getExportTask'
247+
/export-tasks/{task_id}/download:
248+
$ref: './paths/export.yaml#/downloadExport'

aperag/api/paths/export.yaml

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
createExportTask:
2+
post:
3+
summary: Create a knowledge base export task
4+
description: |
5+
Asynchronously package all object-store files under the collection into a ZIP file.
6+
Only the collection owner can initiate an export.
7+
operationId: createExportTask
8+
security:
9+
- BearerAuth: []
10+
parameters:
11+
- name: collection_id
12+
in: path
13+
required: true
14+
schema:
15+
type: string
16+
description: Collection ID
17+
responses:
18+
'202':
19+
description: Export task created and queued
20+
content:
21+
application/json:
22+
schema:
23+
$ref: '../components/schemas/export.yaml#/exportTaskResponse'
24+
'403':
25+
description: Permission denied (not collection owner)
26+
content:
27+
application/json:
28+
schema:
29+
$ref: '../components/schemas/common.yaml#/failResponse'
30+
'404':
31+
description: Collection not found
32+
content:
33+
application/json:
34+
schema:
35+
$ref: '../components/schemas/common.yaml#/failResponse'
36+
'429':
37+
description: Too many concurrent export tasks
38+
content:
39+
application/json:
40+
schema:
41+
$ref: '../components/schemas/common.yaml#/failResponse'
42+
43+
getExportTask:
44+
get:
45+
summary: Get export task status
46+
description: Query the status and progress of an export task.
47+
operationId: getExportTask
48+
security:
49+
- BearerAuth: []
50+
parameters:
51+
- name: task_id
52+
in: path
53+
required: true
54+
schema:
55+
type: string
56+
description: Export task ID
57+
responses:
58+
'200':
59+
description: Export task details
60+
content:
61+
application/json:
62+
schema:
63+
$ref: '../components/schemas/export.yaml#/exportTaskResponse'
64+
'404':
65+
description: Export task not found
66+
content:
67+
application/json:
68+
schema:
69+
$ref: '../components/schemas/common.yaml#/failResponse'
70+
71+
downloadExport:
72+
get:
73+
summary: Download export ZIP
74+
description: Stream the completed export ZIP file to the client.
75+
operationId: downloadExport
76+
security:
77+
- BearerAuth: []
78+
parameters:
79+
- name: task_id
80+
in: path
81+
required: true
82+
schema:
83+
type: string
84+
description: Export task ID
85+
responses:
86+
'200':
87+
description: ZIP file stream
88+
content:
89+
application/zip:
90+
schema:
91+
type: string
92+
format: binary
93+
'400':
94+
description: Export task is not ready for download
95+
content:
96+
application/json:
97+
schema:
98+
$ref: '../components/schemas/common.yaml#/failResponse'
99+
'404':
100+
description: Export task not found
101+
content:
102+
application/json:
103+
schema:
104+
$ref: '../components/schemas/common.yaml#/failResponse'
105+
'410':
106+
description: Export file has expired
107+
content:
108+
application/json:
109+
schema:
110+
$ref: '../components/schemas/common.yaml#/failResponse'

aperag/app.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from aperag.views.collections import router as collections_router
4747
from aperag.views.config import router as config_router
4848
from aperag.views.evaluation import router as evaluation_router
49+
from aperag.views.export import router as export_router
4950
from aperag.views.flow import router as flow_router
5051
from aperag.views.graph import router as graph_router
5152
from aperag.views.llm import router as llm_router
@@ -98,6 +99,7 @@ async def health_check():
9899
app.include_router(auth_router, prefix="/api/v1")
99100
app.include_router(main_router, prefix="/api/v1")
100101
app.include_router(collections_router, prefix="/api/v1") # Add collections router
102+
app.include_router(export_router, prefix="/api/v1") # Add export router
101103
app.include_router(api_key_router, prefix="/api/v1")
102104
app.include_router(audit_router, prefix="/api/v1") # Add audit router
103105
app.include_router(flow_router, prefix="/api/v1")

aperag/db/models.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,39 @@ class Setting(Base):
11321132
gmt_deleted = Column(DateTime(timezone=True), nullable=True)
11331133

11341134

1135+
class ExportTaskStatus(str, Enum):
1136+
PENDING = "PENDING"
1137+
PROCESSING = "PROCESSING"
1138+
COMPLETED = "COMPLETED"
1139+
FAILED = "FAILED"
1140+
EXPIRED = "EXPIRED"
1141+
1142+
1143+
class ExportTask(Base):
1144+
__tablename__ = "export_task"
1145+
__table_args__ = (
1146+
Index("idx_export_task_user_status", "user", "status"),
1147+
Index("idx_export_task_expires", "status", "gmt_expires"),
1148+
)
1149+
1150+
id = Column(String(24), primary_key=True, default=lambda: "export" + random_id()[:16])
1151+
user = Column(String(256), nullable=False, index=True)
1152+
collection_id = Column(String(24), nullable=False, index=True)
1153+
1154+
status = Column(EnumColumn(ExportTaskStatus), nullable=False, default=ExportTaskStatus.PENDING)
1155+
progress = Column(Integer, default=0)
1156+
message = Column(Text, nullable=True)
1157+
error_message = Column(Text, nullable=True)
1158+
1159+
object_store_path = Column(Text, nullable=True)
1160+
file_size = Column(BigInteger, nullable=True)
1161+
1162+
gmt_created = Column(DateTime(timezone=True), default=utc_now, nullable=False)
1163+
gmt_updated = Column(DateTime(timezone=True), default=utc_now, nullable=False)
1164+
gmt_completed = Column(DateTime(timezone=True), nullable=True)
1165+
gmt_expires = Column(DateTime(timezone=True), nullable=True)
1166+
1167+
11351168
class PromptTemplate(Base):
11361169
__tablename__ = "prompt_template"
11371170

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""add export_task table
2+
3+
Revision ID: a1b2c3d4e5f6
4+
Revises: 8abbaf1aa10d
5+
Create Date: 2026-03-04 12:00:00.000000
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
import sqlalchemy as sa
11+
from alembic import op
12+
13+
revision: str = 'a1b2c3d4e5f6'
14+
down_revision: Union[str, None] = '8abbaf1aa10d'
15+
branch_labels: Union[str, Sequence[str], None] = None
16+
depends_on: Union[str, Sequence[str], None] = None
17+
18+
19+
def upgrade() -> None:
20+
"""Upgrade schema."""
21+
op.create_table(
22+
'export_task',
23+
sa.Column('id', sa.String(length=24), nullable=False),
24+
sa.Column('user', sa.String(length=256), nullable=False),
25+
sa.Column('collection_id', sa.String(length=24), nullable=False),
26+
sa.Column('status', sa.String(length=50), nullable=False),
27+
sa.Column('progress', sa.Integer(), nullable=True, server_default='0'),
28+
sa.Column('message', sa.Text(), nullable=True),
29+
sa.Column('error_message', sa.Text(), nullable=True),
30+
sa.Column('object_store_path', sa.Text(), nullable=True),
31+
sa.Column('file_size', sa.BigInteger(), nullable=True),
32+
sa.Column('gmt_created', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('NOW()')),
33+
sa.Column('gmt_updated', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('NOW()')),
34+
sa.Column('gmt_completed', sa.DateTime(timezone=True), nullable=True),
35+
sa.Column('gmt_expires', sa.DateTime(timezone=True), nullable=True),
36+
sa.PrimaryKeyConstraint('id', name=op.f('pk_export_task')),
37+
)
38+
39+
op.create_index(op.f('ix_export_task_user'), 'export_task', ['user'], unique=False)
40+
op.create_index(op.f('ix_export_task_collection_id'), 'export_task', ['collection_id'], unique=False)
41+
op.create_index('idx_export_task_user_status', 'export_task', ['user', 'status'], unique=False)
42+
op.create_index('idx_export_task_expires', 'export_task', ['status', 'gmt_expires'], unique=False)
43+
44+
45+
def downgrade() -> None:
46+
"""Downgrade schema."""
47+
op.drop_index('idx_export_task_expires', table_name='export_task')
48+
op.drop_index('idx_export_task_user_status', table_name='export_task')
49+
op.drop_index(op.f('ix_export_task_collection_id'), table_name='export_task')
50+
op.drop_index(op.f('ix_export_task_user'), table_name='export_task')
51+
op.drop_table('export_task')

aperag/objectstore/base.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,19 @@ def delete_objects_by_prefix(self, path_prefix: str):
111111
"""
112112
...
113113

114+
@abstractmethod
115+
def list_objects_by_prefix(self, path_prefix: str) -> list[str]:
116+
"""
117+
Lists all object paths that start with the given prefix.
118+
119+
Args:
120+
path_prefix: The prefix to match.
121+
122+
Returns:
123+
A list of object paths (relative to the store root) that match the prefix.
124+
"""
125+
...
126+
114127

115128
class AsyncObjectStore(ABC):
116129
"""Abstract base class for asynchronous object storage operations."""
@@ -208,6 +221,19 @@ async def delete_objects_by_prefix(self, path_prefix: str):
208221
"""
209222
...
210223

224+
@abstractmethod
225+
async def list_objects_by_prefix(self, path_prefix: str) -> list[str]:
226+
"""
227+
Asynchronously lists all object paths that start with the given prefix.
228+
229+
Args:
230+
path_prefix: The prefix to match.
231+
232+
Returns:
233+
A list of object paths (relative to the store root) that match the prefix.
234+
"""
235+
...
236+
211237

212238
def get_object_store() -> ObjectStore:
213239
"""

aperag/objectstore/local.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,23 @@ def delete_objects_by_prefix(self, path_prefix: str):
343343
logger.error(f"Error during deletion of objects with prefix '{path_prefix}': {e}")
344344
raise IOError(f"Error during deletion of objects with prefix '{path_prefix}'") from e
345345

346+
def list_objects_by_prefix(self, path_prefix: str) -> list[str]:
347+
normalized_prefix = path_prefix.lstrip("/").replace("\\", "/")
348+
result = []
349+
try:
350+
for item_path in self._base_storage_path.rglob("*"):
351+
if item_path.is_file():
352+
try:
353+
relative = str(item_path.relative_to(self._base_storage_path)).replace("\\", "/")
354+
if relative.startswith(normalized_prefix):
355+
result.append(relative)
356+
except ValueError:
357+
logger.debug(f"Item {item_path} not relative to {self._base_storage_path}, skipping.")
358+
except Exception as e:
359+
logger.error(f"Error listing objects with prefix '{path_prefix}': {e}")
360+
raise IOError(f"Error listing objects with prefix '{path_prefix}'") from e
361+
return result
362+
346363

347364
class AsyncLocal(AsyncObjectStore):
348365
"""Asynchronous wrapper for the Local object store."""
@@ -412,3 +429,6 @@ async def delete(self, path: str):
412429

413430
async def delete_objects_by_prefix(self, path_prefix: str):
414431
return await sync_to_async(self._sync_store.delete_objects_by_prefix)(path_prefix=path_prefix)
432+
433+
async def list_objects_by_prefix(self, path_prefix: str) -> list[str]:
434+
return await sync_to_async(self._sync_store.list_objects_by_prefix)(path_prefix=path_prefix)

0 commit comments

Comments
 (0)