NCBI FTP Integration#133
Conversation
There was a problem hiding this comment.
Pull request overview
Introduces an NCBI FTP-based ingestion pipeline to support periodic Lakehouse updates using NCBI assembly summary diffs, with supporting utilities, notebooks, and tests (unit + optional MinIO-backed integration).
Changes:
- Added Phase 1 (manifest/diff), Phase 2 (parallel FTP download CLI), and Phase 3 (S3 promote + archive + manifest trimming) components for NCBI FTP sync.
- Introduced shared utilities for FTP resiliency (keepalive/retry/thread-local) and local checksum helpers (MD5 + CRC64/NVME).
- Expanded test coverage (unit + smoke tests for notebooks + integration tests against MinIO/real NCBI FTP) and wired a new
ncbi_ftp_syncentrypoint.
Reviewed changes
Copilot reviewed 29 out of 32 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
tests/utils/test_s3.py |
Extends S3 utility tests for new metadata/copy/head helpers and minor refactors. |
tests/utils/test_ftp_client.py |
Adds tests for new FTP client utilities (keepalive/retry/thread-local). |
tests/utils/test_checksums.py |
Adds tests for new checksum utilities (MD5 + optional CRC64/NVME). |
tests/pipelines/test_ncbi_ftp_download.py |
Adds tests for Phase 2 settings + batch orchestration. |
tests/ncbi_ftp/test_promote.py |
Adds unit tests for Phase 3 promote/archive/trim behavior using moto. |
tests/ncbi_ftp/test_notebooks.py |
Adds smoke tests ensuring notebooks parse + imports resolve. |
tests/ncbi_ftp/test_manifest.py |
Adds unit tests for Phase 1 parsing/diffing/filtering/writing + candidate verification. |
tests/ncbi_ftp/test_assembly.py |
Adds unit tests for assembly path helpers, filters, and md5checksums parsing. |
tests/ncbi_ftp/conftest.py |
Adds shared moto S3 fixtures + sample assembly summary content for ncbi_ftp tests. |
tests/ncbi_ftp/__init__.py |
Marks the ncbi_ftp test package. |
tests/integration/test_promote_e2e.py |
Adds MinIO-backed end-to-end promote/archive/trim tests. |
tests/integration/test_manifest_e2e.py |
Adds MinIO/FTP-backed end-to-end manifest/diff/verify tests (skippable). |
tests/integration/test_full_pipeline.py |
Adds end-to-end Phase 1→2→3 pipeline test on a small batch. |
tests/integration/test_download_e2e.py |
Adds end-to-end Phase 2 download tests against real NCBI FTP (skippable). |
tests/integration/conftest.py |
Adds MinIO client + per-test bucket management + auto-skip integration tests when unavailable. |
tests/integration/__init__.py |
Marks the integration test package. |
src/cdm_data_loaders/utils/s3.py |
Adds metadata-aware upload, head-object helper, and metadata-replacing copy helper; tightens some exception handling. |
src/cdm_data_loaders/utils/ftp_client.py |
Adds resilient FTP connection helpers (TCP keepalive, NOOP keepalive, retries, thread-local pool). |
src/cdm_data_loaders/utils/checksums.py |
Adds checksum primitives (MD5 + CRC64/NVME). |
src/cdm_data_loaders/pipelines/ncbi_ftp_download.py |
Implements Phase 2 CLI/settings + threaded batch download orchestration + report writing. |
src/cdm_data_loaders/ncbi_ftp/promote.py |
Implements Phase 3 promotion from staging to Lakehouse, archival, and manifest trimming. |
src/cdm_data_loaders/ncbi_ftp/manifest.py |
Implements Phase 1 summary download/parsing, diffing, candidate verification, and manifest outputs. |
src/cdm_data_loaders/ncbi_ftp/assembly.py |
Implements assembly-domain helpers + single-assembly download with MD5 verification + sidecars. |
src/cdm_data_loaders/ncbi_ftp/__init__.py |
Marks the ncbi_ftp package. |
scripts/s3_local.py |
Adds a minimal local S3 CLI helper for MinIO-based workflows. |
scripts/entrypoint.sh |
Adds ncbi_ftp_sync container command routing to the new CLI. |
pyproject.toml |
Registers ncbi_ftp_sync console entrypoint; adds pytest marker + ruff per-file ignores. |
notebooks/ncbi_ftp_promote.ipynb |
Adds Phase 3 notebook for promote/archive/trim operations. |
notebooks/ncbi_ftp_manifest.ipynb |
Adds Phase 1 notebook for summary diffing + manifest generation + optional verification. |
docs/ncbi_ftp_e2e_walkthrough.md |
Adds detailed local end-to-end walkthrough (MinIO + notebooks + container download). |
README.md |
Documents integration tests and local MinIO setup at a high level. |
Dockerfile |
Adjusts entrypoint handling for podman compatibility and wires entrypoint path. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #133 +/- ##
===========================================
+ Coverage 60.80% 66.43% +5.62%
===========================================
Files 64 71 +7
Lines 3368 4308 +940
===========================================
+ Hits 2048 2862 +814
- Misses 1320 1446 +126
Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Introduces an NCBI FTP-based ingestion pipeline to support periodic Lakehouse updates driven by NCBI “assembly summary” releases, including manifest generation (Phase 1), parallel FTP download to staging (Phase 2), and promotion/archival into final Lakehouse paths (Phase 3).
Changes:
- Add new
ncbi_ftp_*modules +ncbi_ftp_syncCLI for downloading assemblies listed in a transfer manifest. - Add S3 helpers for metadata-bearing uploads, metadata-replacing copies, and lightweight object introspection; add resilient FTP + checksum utilities.
- Add extensive unit + MinIO-backed integration tests, notebooks, and walkthrough documentation; update container entrypoint and Trivy workflow.
Reviewed changes
Copilot reviewed 30 out of 33 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/cdm_data_loaders/utils/s3.py |
Adds upload_file_with_metadata, head_object, copy_object_with_metadata; tightens error handling in existing helpers. |
src/cdm_data_loaders/utils/ftp_client.py |
New resilient FTP utilities (keepalive, NOOP ping, retries, thread-local pool). |
src/cdm_data_loaders/utils/checksums.py |
New local MD5 + CRC64/NVME checksum helpers. |
src/cdm_data_loaders/ncbi_ftp/assembly.py |
Assembly-specific parsing, filtering, and single-assembly download with MD5 verification + sidecars. |
src/cdm_data_loaders/ncbi_ftp/manifest.py |
Phase 1: download/parse summary, compute diff, optional S3 checksum pruning, write manifests + summary. |
src/cdm_data_loaders/ncbi_ftp/promote.py |
Phase 3: promote staged files to final paths with MD5 metadata; archive removed/updated; trim transfer manifest. |
src/cdm_data_loaders/pipelines/ncbi_ftp_download.py |
Phase 2 orchestration: settings, parallel batch downloads, report JSON, CTS/CLI entrypoint. |
pyproject.toml |
Registers ncbi_ftp_sync console script; adds pytest marker and ruff per-file ignores for new tests. |
scripts/entrypoint.sh |
Adds ncbi_ftp_sync dispatch option and improves usage messaging. |
Dockerfile |
Adjusts entrypoint executable handling for podman; points ENTRYPOINT at ./scripts/entrypoint.sh. |
scripts/s3_local.py |
Adds a minimal local S3/MinIO helper CLI (mb/cp/ls/head) for walkthrough/testing. |
docs/ncbi_ftp_e2e_walkthrough.md |
Step-by-step local E2E walkthrough (MinIO + container + notebooks). |
README.md |
Documents integration tests (MinIO + real NCBI FTP) and how they’re skipped when MinIO isn’t available. |
.github/workflows/trivy.yaml |
Adds a timeout to Trivy scan invocation. |
tests/utils/test_s3.py |
Expands S3 utility test coverage for new metadata/copy/head helpers and minor fixture cleanup. |
tests/utils/test_ftp_client.py |
New unit tests for FTP keepalive/retry/thread-local connection handling. |
tests/utils/test_checksums.py |
New unit tests for checksum utilities (MD5 + optional CRC64/NVME). |
tests/pipelines/test_ncbi_ftp_download.py |
New unit tests for download settings validation and download_batch orchestration/reporting. |
tests/ncbi_ftp/conftest.py |
Shared moto-S3 fixtures + sample assembly summary content for unit tests. |
tests/ncbi_ftp/test_manifest.py |
Unit tests for summary parsing, diffing, manifest writing, URL helpers, and S3 verification pruning. |
tests/ncbi_ftp/test_assembly.py |
Unit tests for assembly path helpers, filters, and md5checksums parsing. |
tests/ncbi_ftp/test_promote.py |
Unit tests for promotion, archiving, and manifest trimming behavior. |
tests/ncbi_ftp/test_notebooks.py |
Smoke tests to ensure notebook code cells parse and imports resolve. |
tests/integration/conftest.py |
MinIO-backed integration test harness with auto-skip when MinIO is unreachable. |
tests/integration/test_manifest_e2e.py |
Phase 1 end-to-end tests against real NCBI FTP (and optional MinIO). |
tests/integration/test_download_e2e.py |
Phase 2 end-to-end tests downloading real assemblies. |
tests/integration/test_promote_e2e.py |
Phase 3 end-to-end tests promoting/archiving in MinIO. |
tests/integration/test_full_pipeline.py |
Full pipeline end-to-end coverage (Phase 1→2→3) for a small batch. |
tests/ncbi_ftp/__init__.py |
Package marker for new unit test namespace. |
tests/integration/__init__.py |
Package marker for new integration test namespace. |
src/cdm_data_loaders/ncbi_ftp/__init__.py |
Package marker for new NCBI FTP modules. |
notebooks/ncbi_ftp_manifest.ipynb |
Phase 1 notebook for manifest generation/diffing and optional S3 verification/staging. |
notebooks/ncbi_ftp_promote.ipynb |
Phase 3 notebook for promotion/archival/manifest trimming. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| bucket: str, | ||
| key_prefix: str, | ||
| release_tag: str, | ||
| *, |
There was a problem hiding this comment.
Are you asking about the *? As I understand it, it's to restrict arguments that follow it to be keyword args only (not positional) (https://docs.python.org/3/tutorial/controlflow.html#special-parameters). Seems useful to me (although I don't write that much Python code). I've seen bugs pop up in other languages that don't have this ability. But, I don't mind removing it, it's not strictly necessary.
| _ = copy_object( | ||
| f"{bucket}/{source_key}", | ||
| f"{bucket}/{archive_key}", | ||
| ) |
There was a problem hiding this comment.
is it worth capturing the return value if you're not going to use it?
There was a problem hiding this comment.
do you mean by using the _ as a throw-away variable? I like this because it makes clear that there is a return value and we're intentionally ignoring it. But, no strong preferences on my part; I'm happy to remove it if you prefer. (Not sure if the linter will care or not though)
| ncbi_release: str | None = None, | ||
| manifest_s3_key: str | None = None, | ||
| lakehouse_key_prefix: str = DEFAULT_LAKEHOUSE_KEY_PREFIX, | ||
| *, |
There was a problem hiding this comment.
(see response to similar comment above)
| try: | ||
| resp = s3.head_object(Bucket=bucket, Key=key, ChecksumMode="ENABLED") | ||
| except Exception as e: # noqa: BLE001 | ||
| if e.response["Error"]["Code"] == "404": # type: ignore[union-attr] | ||
| return None | ||
| raise | ||
| return { | ||
| "size": resp["ContentLength"], | ||
| "metadata": resp.get("Metadata", {}), | ||
| "checksum_crc64nvme": resp.get("ChecksumCRC64NVME"), | ||
| } |
There was a problem hiding this comment.
Could you return the default response from the s3 api and do the reshaping of the output in a wrapper or in the function that calls this one?
There was a problem hiding this comment.
You commented on this same block above, and just in case you didn't see it yet, this was my response:
We can if you would like. I preferred this approach because it doesn't create downstream dependencies on the S3 response structure. We essentially extract specifically what we need here and return it. If the response schema changes in the future, there is only one part of the code that needs to be updated. But, no strong opinion. Happy to change it if you prefer.
I can definitely change it back if you prefer. Just not sure if you saw this or not. Let me know!
| s3 = get_s3_client() | ||
|
|
||
| (bucket, key) = split_s3_path(s3_path) | ||
| try: | ||
| head_object(s3_path) | ||
| except Exception as e: | ||
| s3.head_object(Bucket=bucket, Key=key) |
There was a problem hiding this comment.
if the head_object function is reverted to its original form, this bit doesn't need to be changed
There was a problem hiding this comment.
(see comment above)
| CopySource={"Bucket": current_s3_bucket, "Key": current_s3_key}, | ||
| Bucket=new_s3_bucket, | ||
| Key=new_s3_key, | ||
| **DEFAULT_EXTRA_ARGS, |
There was a problem hiding this comment.
can you add these back in or add a command line param to allow them to be specified? I use these functions to transfer data to the CTS s3 bucket, which requires everything to have CRC64NVME checksums, which is why I added the extra args as default.
| logger.exception("Error initialising boto3 client") | ||
| raise | ||
| except Exception: | ||
| except (ModuleNotFoundError, ImportError, NameError) as e: |
There was a problem hiding this comment.
don't need e if you're not doing anything with the exception object
| logger = get_cdm_logger() | ||
|
|
||
|
|
||
| # ── Constants ──────────────────────────────────────────────────────────── |
There was a problem hiding this comment.
this is extremely petty and picky, but could you remove these lines (of the form # -- <some word here> --------------------)? I have a strong aversion to them as it makes it obvious that the code was generated by an LLM, rather than being hand-crafted bespoke artisanal home-baked coding goodness!
There was a problem hiding this comment.
haha, sure thing! my hand-crafted comments can usually be identified by the incredible number of typos they contain
There was a problem hiding this comment.
I think they're all removed now
| s3 = get_s3_client() | ||
| paginator = s3.get_paginator("list_objects_v2") | ||
| normalized_staging_key_prefix = staging_key_prefix.rstrip("/") + "/" | ||
|
|
||
| # Collect all objects under the staging prefix | ||
| staged_objects: list[str] = [] | ||
| for page in paginator.paginate(Bucket=staging_bucket, Prefix=normalized_staging_key_prefix): | ||
| staged_objects.extend(obj["Key"] for obj in page.get("Contents", [])) |
There was a problem hiding this comment.
can you use the existing list_matching_objects function, which deals with pagination automatically, here?
| for raw_line in text.strip().splitlines(): | ||
| stripped = raw_line.strip() | ||
| if not stripped: | ||
| continue | ||
| parts = stripped.split(" ", maxsplit=1) | ||
| if len(parts) == 2: # noqa: PLR2004 | ||
| md5_hash, filename = parts |
There was a problem hiding this comment.
| for raw_line in text.strip().splitlines(): | |
| stripped = raw_line.strip() | |
| if not stripped: | |
| continue | |
| parts = stripped.split(" ", maxsplit=1) | |
| if len(parts) == 2: # noqa: PLR2004 | |
| md5_hash, filename = parts | |
| for raw_line in text.strip().splitlines(): | |
| parts = raw_line.strip().split(" ", maxsplit=1) | |
| if len(parts) == 2: # noqa: PLR2004 | |
| md5_hash, filename = parts |
| if previous_assemblies is not None: | ||
| known = set(previous_assemblies.keys()) | ||
| elif previous_accessions is not None: | ||
| known = previous_accessions | ||
| else: | ||
| known = set() |
There was a problem hiding this comment.
| if previous_assemblies is not None: | |
| known = set(previous_assemblies.keys()) | |
| elif previous_accessions is not None: | |
| known = previous_accessions | |
| else: | |
| known = set() | |
| known = set(previous_assemblies) or previous_accessions or set() |
There was a problem hiding this comment.
done! (slightly modified to allow empty previous_assemblies args)
| MINIO_ENDPOINT_URL = os.environ["MINIO_ENDPOINT_URL"] | ||
| MINIO_ACCESS_KEY = os.environ["MINIO_ACCESS_KEY"] |
There was a problem hiding this comment.
since minio is going away, I wonder if it would be better to shift to using the generic AWS_* env var names instead.
There was a problem hiding this comment.
also worth considering how difficult it will be to shift over to using a CEPH image (the proposed replacement) instead of minio, a generic s3 image, or converting these tests to use a moto mock server.
There was a problem hiding this comment.
I'll look into swapping in the CEPH image for testing.
db33640 to
0862f1b
Compare
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Description of PR purpose/changes
This PR introduces a pipeline for ingesting NCBI data via the NCBI FTP site. It is intended to allow periodic updates of Lakehouse data as new NCBI versions are released. The logic relies on the assembly summary text files published alongside the datasets for each version.
I've tested the manifest generation in the Lakehouse and it seemed to work well. I think this PR is ready for review now.
The workflow comprises three steps:
Minor changes:
Dockerfileto work with podman (changed howchmodis used)No additional dependencies are needed.
I used Copilot for much of the implementation under careful supervision. I verified the implementation by reviewing the code, running the test suite, and following the instructions for end-to-end testing locally. I will test in the Lakehouse with limited records before doing a full transfer.
Testing Instructions
Testing strategy:
docs/ncbi_ftp_e2e_walkthrough.mddescribes how to do end-to-end testing using a local MinIO store, or in the Lakehouse with limited recordsDetails for how to test the PR:
Follow README instructions for spinning up a MinIO test container
Run the full test suite
Follow instructions in
docs/ncbi_ftp_e2e_walkthrough.mdfor end-to-end testing locally and in the LakehouseTests pass locally and in GitHub Actions
Dev Checklist:
formatto format my codecheckand fixed any errors that it uncoveredUpdating Version and Release Notes (if applicable)