Skip to content

Commit 096a5e2

Browse files
committed
fix: address PR review for ZIP upload (#12253)
- Add BadZipFile handling in extract_flows_from_zip for defense-in-depth - Wrap blocking ZIP I/O in asyncio.to_thread() to avoid event loop blocking - Add post-read size check to guard against dishonest ZIP headers - Add 7 adversarial tests: invalid JSON, max entries, oversized entries, mixed valid/invalid, filename fallback, corrupt ZIP, batch name dedup
1 parent 2170e1c commit 096a5e2

File tree

2 files changed

+243
-13
lines changed

2 files changed

+243
-13
lines changed

src/backend/base/langflow/api/utils/zip_utils.py

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
from __future__ import annotations
44

5+
import asyncio
56
import io
67
import zipfile
8+
from dataclasses import dataclass, field
79

810
import orjson
911
from lfx.log.logger import logger
@@ -13,36 +15,71 @@
1315
MAX_ENTRY_UNCOMPRESSED_BYTES = 50 * 1024 * 1024 # 50 MB per file
1416

1517

16-
async def extract_flows_from_zip(contents: bytes) -> list[dict]:
17-
"""Extract flow JSON data from a ZIP file.
18+
@dataclass
19+
class _ZipExtractionResult:
20+
"""Result of synchronous ZIP extraction, including warnings to log after."""
21+
22+
flows: list[dict] = field(default_factory=list)
23+
warnings: list[str] = field(default_factory=list)
1824

19-
Reads all .json files from the ZIP archive and returns their parsed contents.
20-
Enforces limits on entry count and individual file size to mitigate zip bomb attacks.
25+
26+
def _extract_flows_sync(contents: bytes) -> _ZipExtractionResult:
27+
"""Synchronous helper that performs all blocking ZIP I/O.
2128
2229
Raises:
23-
ValueError: If the ZIP contains more than MAX_ZIP_ENTRIES JSON files.
30+
ValueError: If the ZIP is corrupt or contains more than MAX_ZIP_ENTRIES JSON files.
2431
"""
25-
flows: list[dict] = []
32+
result = _ZipExtractionResult()
33+
34+
try:
35+
zf = zipfile.ZipFile(io.BytesIO(contents), "r")
36+
except zipfile.BadZipFile as exc:
37+
msg = f"Uploaded file is not a valid ZIP archive: {exc}"
38+
raise ValueError(msg) from exc
2639

27-
with zipfile.ZipFile(io.BytesIO(contents), "r") as zip_file:
28-
json_entries = [info for info in zip_file.infolist() if info.filename.lower().endswith(".json")]
40+
with zf:
41+
json_entries = [info for info in zf.infolist() if info.filename.lower().endswith(".json")]
2942

3043
if len(json_entries) > MAX_ZIP_ENTRIES:
3144
msg = f"ZIP contains {len(json_entries)} JSON entries, exceeding the limit of {MAX_ZIP_ENTRIES}"
3245
raise ValueError(msg)
3346

3447
for info in json_entries:
3548
if info.file_size > MAX_ENTRY_UNCOMPRESSED_BYTES:
36-
await logger.awarning(
49+
result.warnings.append(
3750
f"Skipping ZIP entry '{info.filename}': uncompressed size "
3851
f"{info.file_size} exceeds limit of {MAX_ENTRY_UNCOMPRESSED_BYTES} bytes"
3952
)
4053
continue
4154
try:
42-
raw = zip_file.read(info.filename)
43-
flows.append(orjson.loads(raw))
55+
raw = zf.read(info.filename)
56+
if len(raw) > MAX_ENTRY_UNCOMPRESSED_BYTES:
57+
result.warnings.append(
58+
f"Skipping ZIP entry '{info.filename}': actual size "
59+
f"{len(raw)} exceeds limit of {MAX_ENTRY_UNCOMPRESSED_BYTES} bytes"
60+
)
61+
continue
62+
result.flows.append(orjson.loads(raw))
4463
except orjson.JSONDecodeError:
45-
await logger.awarning(f"Skipping ZIP entry '{info.filename}': invalid JSON")
64+
result.warnings.append(f"Skipping ZIP entry '{info.filename}': invalid JSON")
4665
continue
4766

48-
return flows
67+
return result
68+
69+
70+
async def extract_flows_from_zip(contents: bytes) -> list[dict]:
71+
"""Extract flow JSON data from a ZIP file.
72+
73+
Reads all .json files from the ZIP archive and returns their parsed contents.
74+
Enforces limits on entry count and individual file size to mitigate zip bomb attacks.
75+
Blocking I/O is offloaded to a thread to avoid blocking the event loop.
76+
77+
Raises:
78+
ValueError: If the ZIP is corrupt or contains more than MAX_ZIP_ENTRIES JSON files.
79+
"""
80+
result = await asyncio.to_thread(_extract_flows_sync, contents)
81+
82+
for warning in result.warnings:
83+
await logger.awarning(warning)
84+
85+
return result.flows

src/backend/tests/unit/test_database.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,199 @@ async def test_download_then_upload_roundtrip(client: AsyncClient, json_flow: st
697697
assert len(uploaded) == 2
698698

699699

700+
@pytest.mark.usefixtures("session")
701+
async def test_upload_zip_with_invalid_json(client: AsyncClient, json_flow: str, logged_in_headers):
702+
"""ZIP entries with invalid JSON are skipped; valid entries are still processed."""
703+
flow = orjson.loads(json_flow)
704+
data = flow["data"]
705+
valid_flow = {"name": "valid_flow", "description": "good", "data": data}
706+
707+
zip_buffer = io.BytesIO()
708+
with zipfile.ZipFile(zip_buffer, "w") as zf:
709+
zf.writestr("valid.json", json.dumps(valid_flow))
710+
zf.writestr("broken.json", "{not valid json!!!}")
711+
zip_buffer.seek(0)
712+
713+
response = await client.post(
714+
"api/v1/flows/upload/",
715+
files={"file": ("mixed.zip", zip_buffer.getvalue(), "application/zip")},
716+
headers=logged_in_headers,
717+
)
718+
assert response.status_code == 201
719+
response_data = response.json()
720+
assert len(response_data) == 1
721+
assert response_data[0]["name"] == "valid_flow"
722+
723+
724+
@pytest.mark.usefixtures("session")
725+
async def test_upload_zip_exceeding_max_entries(client: AsyncClient, json_flow: str, logged_in_headers, monkeypatch):
726+
"""ZIP with more JSON entries than the limit raises 400."""
727+
import langflow.api.utils.zip_utils as zip_utils_mod
728+
729+
monkeypatch.setattr(zip_utils_mod, "MAX_ZIP_ENTRIES", 3)
730+
731+
flow = orjson.loads(json_flow)
732+
data = flow["data"]
733+
734+
zip_buffer = io.BytesIO()
735+
with zipfile.ZipFile(zip_buffer, "w") as zf:
736+
for i in range(5):
737+
zf.writestr(f"flow_{i}.json", json.dumps({"name": f"flow_{i}", "data": data}))
738+
zip_buffer.seek(0)
739+
740+
response = await client.post(
741+
"api/v1/flows/upload/",
742+
files={"file": ("too_many.zip", zip_buffer.getvalue(), "application/zip")},
743+
headers=logged_in_headers,
744+
)
745+
assert response.status_code == 400
746+
assert "exceeding the limit" in response.json()["detail"]
747+
748+
749+
@pytest.mark.usefixtures("session")
750+
async def test_upload_zip_with_oversized_entry(client: AsyncClient, json_flow: str, logged_in_headers, monkeypatch):
751+
"""Entries exceeding size limit are skipped; smaller valid entries are processed."""
752+
import langflow.api.utils.zip_utils as zip_utils_mod
753+
754+
flow = orjson.loads(json_flow)
755+
data = flow["data"]
756+
small_flow = {"name": "small_flow", "data": {"nodes": [], "edges": []}}
757+
big_flow = {"name": "big_flow", "data": data}
758+
759+
# Build the ZIP first, then set the limit between the two entry sizes
760+
zip_buffer = io.BytesIO()
761+
with zipfile.ZipFile(zip_buffer, "w") as zf:
762+
zf.writestr("small.json", json.dumps(small_flow))
763+
zf.writestr("big.json", json.dumps(big_flow))
764+
765+
# Re-read to find the actual sizes and pick a limit between them
766+
with zipfile.ZipFile(io.BytesIO(zip_buffer.getvalue()), "r") as zf:
767+
sizes = {info.filename: info.file_size for info in zf.infolist()}
768+
limit = (sizes["small.json"] + sizes["big.json"]) // 2
769+
monkeypatch.setattr(zip_utils_mod, "MAX_ENTRY_UNCOMPRESSED_BYTES", limit)
770+
771+
zip_buffer.seek(0)
772+
response = await client.post(
773+
"api/v1/flows/upload/",
774+
files={"file": ("oversized.zip", zip_buffer.getvalue(), "application/zip")},
775+
headers=logged_in_headers,
776+
)
777+
assert response.status_code == 201
778+
response_data = response.json()
779+
assert len(response_data) == 1
780+
assert response_data[0]["name"] == "small_flow"
781+
782+
783+
@pytest.mark.usefixtures("session")
784+
async def test_upload_zip_with_mixed_valid_invalid(client: AsyncClient, json_flow: str, logged_in_headers, monkeypatch):
785+
"""Mix of valid flows, invalid JSON, and oversized entries → only valid flows returned."""
786+
import langflow.api.utils.zip_utils as zip_utils_mod
787+
788+
flow = orjson.loads(json_flow)
789+
data = flow["data"]
790+
valid_flow = {"name": "keeper", "data": {"nodes": [], "edges": []}}
791+
oversized_flow = {"name": "too_big", "data": data, "padding": "x" * 500}
792+
793+
zip_buffer = io.BytesIO()
794+
with zipfile.ZipFile(zip_buffer, "w") as zf:
795+
zf.writestr("valid.json", json.dumps(valid_flow))
796+
zf.writestr("broken.json", "NOT JSON {{{")
797+
zf.writestr("huge.json", json.dumps(oversized_flow))
798+
zf.writestr("readme.txt", "ignored non-json")
799+
800+
# Set limit between valid entry size and oversized entry size
801+
with zipfile.ZipFile(io.BytesIO(zip_buffer.getvalue()), "r") as zf:
802+
sizes = {info.filename: info.file_size for info in zf.infolist()}
803+
limit = (sizes["valid.json"] + sizes["huge.json"]) // 2
804+
monkeypatch.setattr(zip_utils_mod, "MAX_ENTRY_UNCOMPRESSED_BYTES", limit)
805+
806+
zip_buffer.seek(0)
807+
response = await client.post(
808+
"api/v1/flows/upload/",
809+
files={"file": ("mixed.zip", zip_buffer.getvalue(), "application/zip")},
810+
headers=logged_in_headers,
811+
)
812+
assert response.status_code == 201
813+
response_data = response.json()
814+
assert len(response_data) == 1
815+
assert response_data[0]["name"] == "keeper"
816+
817+
818+
@pytest.mark.usefixtures("session")
819+
async def test_upload_zip_to_projects_filename_none(client: AsyncClient, json_flow: str, logged_in_headers):
820+
"""When filename has no stem (e.g. '.zip'), the project name defaults to 'Imported Project'."""
821+
flow = orjson.loads(json_flow)
822+
data = flow["data"]
823+
flow_data = {"name": "flow_none", "data": data}
824+
825+
zip_buffer = io.BytesIO()
826+
with zipfile.ZipFile(zip_buffer, "w") as zf:
827+
zf.writestr("flow.json", json.dumps(flow_data))
828+
zip_buffer.seek(0)
829+
830+
# filename=".zip" → rsplit gives ("", "zip") → "" is falsy → "Imported Project"
831+
response = await client.post(
832+
"api/v1/projects/upload/",
833+
files={"file": (".zip", zip_buffer.getvalue(), "application/zip")},
834+
headers=logged_in_headers,
835+
)
836+
assert response.status_code == 201
837+
response_data = response.json()
838+
assert len(response_data) == 1
839+
840+
folder_id = response_data[0]["folder_id"]
841+
project_response = await client.get(f"api/v1/projects/{folder_id}", headers=logged_in_headers)
842+
assert project_response.status_code == 200
843+
assert project_response.json()["name"].startswith("Imported Project")
844+
845+
846+
@pytest.mark.usefixtures("session")
847+
async def test_upload_bad_zip_file_returns_400(client: AsyncClient, logged_in_headers):
848+
"""Uploading a corrupt/invalid ZIP file returns 400 with a descriptive error."""
849+
# Build a payload that passes zipfile.is_zipfile() but fails ZipFile() construction.
850+
# We keep only the end-of-central-directory record (last 22 bytes of a real ZIP)
851+
# prepended with garbage, so the EOCD signature is found but the central directory is invalid.
852+
buf = io.BytesIO()
853+
with zipfile.ZipFile(buf, "w") as zf:
854+
zf.writestr("dummy.json", '{"name":"x"}')
855+
valid_zip = buf.getvalue()
856+
# Minimal EOCD is 22 bytes; keep it and prepend garbage
857+
corrupt_zip = b"garbage" * 10 + valid_zip[-22:]
858+
859+
response = await client.post(
860+
"api/v1/flows/upload/",
861+
files={"file": ("corrupt.zip", corrupt_zip, "application/zip")},
862+
headers=logged_in_headers,
863+
)
864+
assert response.status_code == 400
865+
assert "not a valid ZIP" in response.json()["detail"]
866+
867+
868+
@pytest.mark.usefixtures("session")
869+
async def test_upload_zip_to_projects_batch_name_dedup(client: AsyncClient, json_flow: str, logged_in_headers):
870+
"""Multiple flows with the same name get unique names within the batch."""
871+
flow = orjson.loads(json_flow)
872+
data = flow["data"]
873+
874+
zip_buffer = io.BytesIO()
875+
with zipfile.ZipFile(zip_buffer, "w") as zf:
876+
for i in range(3):
877+
zf.writestr(f"flow_{i}.json", json.dumps({"name": "duplicate_name", "data": data}))
878+
zip_buffer.seek(0)
879+
880+
response = await client.post(
881+
"api/v1/projects/upload/",
882+
files={"file": ("dedup_test.zip", zip_buffer.getvalue(), "application/zip")},
883+
headers=logged_in_headers,
884+
)
885+
assert response.status_code == 201
886+
response_data = response.json()
887+
assert len(response_data) == 3
888+
names = [r["name"] for r in response_data]
889+
# All names must be unique
890+
assert len(set(names)) == 3
891+
892+
700893
@pytest.mark.usefixtures("active_user")
701894
async def test_create_flow_with_invalid_data(client: AsyncClient, logged_in_headers):
702895
flow = {"name": "a" * 256, "data": "Invalid flow data"}

0 commit comments

Comments
 (0)