Skip to content

Add scheduled ingestion service#529

Open
niedch wants to merge 17 commits into
labsai:mainfrom
niedch:scheduled-ingest-service
Open

Add scheduled ingestion service#529
niedch wants to merge 17 commits into
labsai:mainfrom
niedch:scheduled-ingest-service

Conversation

@niedch

@niedch niedch commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds a full integration test suite for the RAG Ingestion pipeline (RagIngestionService) using an nginx Testcontainer serving static HTML test data. Also introduces a UrlValidationUtils.setValidationEnabled() toggle to allow Testcontainer-based tests to bypass SSRF private-IP validation on loopback addresses.

Type of Change

  • ✨ New feature (non-breaking change that adds functionality)
  • 🐛 Bug fix
  • 💥 Breaking change
  • 📝 Documentation update
  • ♻️ Refactoring
  • 🔧 Chore

Related Issue

Changes Made

Production code

  • UrlValidationUtils.java — Added setValidationEnabled(boolean) static flag. When disabled, validateUrl() only checks URL syntax, scheme (http/https), and hostname presence — DNS resolution and private-IP blocking are skipped. Default remains true (no behavioral change). This enables Testcontainer-based tests where the container binds to localhost (normally blocked by SSRF validation).

Test infrastructure

  • RagIngestionIT.java — New @QuarkusTest with nginx Testcontainer. 7 test scenarios covering the full ingestion pipeline: single-page ingest, multi-page BFS crawl, dedup of unchanged content, content change detection via hash, stale document marking, 404 error handling, and path-prefix scope filtering.
  • TestEmbeddingModelFactory.java@Mock @Singleton CDI bean that returns fixed 384-dim zero-vectors via embedAll(). Replaces EmbeddingModelFactory during @QuarkusTest runs so ingestion works without external LLM API keys.
  • TestEmbeddingStoreFactory.java@Mock @Singleton CDI bean that returns a per-kbId InMemoryEmbeddingStore. Replaces EmbeddingStoreFactory so no external vector DB is needed.
  • src/test/resources/tests/ingestion/ — 4 HTML test files (index.html, about.html, docs/guide.html, index_updated.html) mounted into the nginx container.

Test scenarios

Test What it verifies
shouldIngestSinglePage 1 doc fetched, converted, chunked, embedded, stored (maxPages=1 caps crawl)
shouldCrawlMultiplePages BFS follows same-domain links to ≥3 pages
shouldDedupUnchangedContent Re-ingest of identical content: 0 new, 1 unchanged, 0 chunks stored
shouldDetectContentChanges Replace file mid-test → content hash differs → re-ingested
shouldDetectStaleDocuments Delete file mid-test → markStaleDocuments() marks it stale
shouldHandle404StartUrl Non-existent start URL → 0 docs, errors > 0
shouldRespectPathPrefixScope /docs/ path prefix limits crawl to subdirectory only

How to Test

  1. docker info — ensure Docker is running (Testcontainers requires it)
  2. ./mvnw test -pl . -Dtest=RagIngestionIT — runs all 7 ingestion integration tests
  3. ./mvnw test -pl . -Dtest="SafeHttpClientTest,EmbeddingModelFactoryTest,EmbeddingStoreFactoryTest" — verify no regressions in existing tests
  4. ./mvnw test -pl . -Dtest=RagCrudIT — verify @Mock beans don't break other @QuarkusTest classes

Checklist

  • My code follows the project's code style
  • I have added tests that prove my fix/feature works
  • Existing tests pass locally (./mvnw clean verify -DskipITs) — there is a pre-existing compilation error in StructuralMatcherTest.java (unrelated AgentSourceData symbol) that prevents full verify
  • I have updated documentation if needed — changelog updated
  • My commit messages follow conventional commits
  • I have not committed any secrets, API keys, or tokens
  • This PR has a clear, focused scope (one concern per PR)

Summary by CodeRabbit

  • New Features

    • RAG ingestion: web crawling, HTML→Markdown conversion, content deduplication, stale-document detection, chunking and embedding storage.
    • Scheduling & triggers: cron schedules, direct-execution schedules, and on-demand trigger.
    • Pluggable fetcher/converter model and content-hash backends (Mongo/Postgres) for reliable dedup and lifecycle handling.
    • Workflow tasks and REST management for ingestion sources.
  • Tests

    • Integration and unit tests covering crawling, conversion, deduplication, scheduling, and content-hash stores.

@niedch niedch requested review from ginccc and rolandpickl as code owners June 9, 2026 10:22
@coderabbitai

coderabbitai Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a RAG ingestion subsystem: source models and REST/store APIs, web crawler and HTML→Markdown converter, content-hash deduplication backends (Mongo/Postgres), ingestion orchestration and embedding, schedule/direct execution support, lifecycle tasks, and tests with HTML fixtures.

Changes

RAG Ingestion Implementation

Layer / File(s) Summary
Source models and REST/store contracts
src/main/java/ai/labs/eddi/configs/ingestion/model/SourceConfig.java, WebSourceConfig.java, RagIngestionSource.java, src/main/java/ai/labs/eddi/configs/ingestion/IRagIngestionSourceStore.java, IRestRagIngestionSourceStore.java
Introduces sealed SourceConfig, WebSourceConfig with Scope and CrawlSettings, RagIngestionSource with ingestion settings and schedule, store interface with paginated lookup, and REST API contract for schema and versioned CRUD/trigger endpoints.
Mongo store and REST lifecycle
src/main/java/ai/labs/eddi/configs/ingestion/mongo/MongoRagIngestionSourceStore.java, src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java, src/main/java/ai/labs/eddi/datastore/DataStoreProducers.java
Mongo-backed store with ragConfigUri lookup and pagination; REST-backed lifecycle wiring with schedule create/update/delete helpers, async trigger, and CDI producer wiring for content-hash store selection.
Content fetching and web crawling
src/main/java/ai/labs/eddi/modules/ingestion/ContentFetcher.java, WebContentFetcher.java, src/main/java/ai/labs/eddi/modules/llm/tools/UrlValidationUtils.java, FetchResult.java
Adds ContentFetcher contract and FetchResult model; implements BFS WebContentFetcher with scope rules, exclude-patterns, request delay/timeout, HTML fetch/parse, link extraction, and a URL-validation toggle for tests.
HTML-to-Markdown conversion and unit tests
src/main/java/ai/labs/eddi/modules/ingestion/ContentConverter.java, HtmlToMarkdownConverter.java, src/test/java/ai/labs/eddi/modules/ingestion/HtmlToMarkdownConverterTest.java
Adds ContentConverter and HtmlToMarkdownConverter: JSoup-based DOM-to-Markdown rendering, noise removal, title handling, link/image resolution, lists/tables/code/blockquote support, escaping, truncation; extensive converter unit tests.
Content-hash tracking with MongoDB and PostgreSQL backends
src/main/java/ai/labs/eddi/modules/ingestion/IContentHashStore.java, MongoContentHashStore.java, src/main/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStore.java, src/test/java/ai/labs/eddi/datastore/mongo/MongoContentHashStoreTest.java, PostgresContentHashStoreTest.java, DataStoreProducers.java
Adds IContentHashStore interface and Mongo/Postgres implementations with SHA-256 hashing, atomic upsert semantics, stale-document marking, clearing, and integration tests validating dedup, stale lifecycle, and hashing.
RAG ingestion service and tests
src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionService.java, FetchResult.java, src/test/java/ai/labs/eddi/modules/llm/impl/TestEmbeddingModelFactory.java, TestEmbeddingStoreFactory.java, src/test/java/ai/labs/eddi/integration/RagIngestionIT.java, src/test/resources/tests/ingestion/*.html
Implements RagIngestionService orchestration: fetch → convert → optional dedup → mark stale → embed & store; introduces IngestionResult; provides deterministic test embedding factories and integration tests exercising crawl, dedup, change detection, stale detection, 404 handling, and scoping.
Lifecycle tasks and bootstrap
src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionTask.java, src/main/java/ai/labs/eddi/modules/rag/RagTask.java, bootstrap/RagModule.java
Adds RagIngestionTask and RagTask lifecycle implementations with configuration parsing, execute wiring, extension descriptors, and RagModule bootstrap that registers the task provider.
Direct schedule execution for ingestion
src/main/java/ai/labs/eddi/engine/schedule/DirectScheduleExecutor.java, src/main/java/ai/labs/eddi/engine/schedule/model/ScheduleConfiguration.java, src/main/java/ai/labs/eddi/engine/runtime/internal/ScheduleFireExecutor.java, src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionDirectExecutor.java
Adds DirectScheduleExecutor contract and ScheduleConfiguration.directExecutionType, updates ScheduleFireExecutor to dispatch to direct executors when configured (bypassing conversations), and implements RagIngestionDirectExecutor to validate metadata and invoke ingestion directly.
Tests and fixtures
src/test/resources/tests/ingestion/*.html, src/test/java/.../*Test.java, src/test/java/.../RagIngestionIT.java
Adds HTML fixtures and new unit/integration tests covering converter behavior, content-hash stores (Mongo/Postgres), ingestion end-to-end flows, and test factories for deterministic embedding behavior.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 I hopped through HTML and trimmed each tag,

fetched pages, hashed, and stored each doc in a bag.
Schedules call — direct or by chat,
chunks and vectors put neatly in a vat.
A rabbit cheers: ingestion on the map!

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (5)
src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java (1)

228-228: ⚖️ Poor tradeoff

O(n) schedule lookup degrades with scale.

Both updateScheduleForSource and deleteScheduleForSource call readAllSchedules(1000) and scan linearly for a matching sourceId. This is slow when there are many schedules, and the 1000-record limit means schedules beyond that won't be found.

If IScheduleStore supports indexed queries (e.g., by metadata fields), use that instead of scanning. Otherwise, document the 1000-schedule limit and consider adding an index in a future refactor.

Also applies to: 265-265

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java`
at line 228, The current linear scan using scheduleStore.readAllSchedules(1000)
in updateScheduleForSource and deleteScheduleForSource is O(n) and misses
schedules beyond 1000; replace the scan with an indexed lookup on sourceId (use
an existing IScheduleStore method like findByMetadata/findBySourceId or add one)
so you query schedules filtered by metadata/sourceId instead of reading all; if
the store truly lacks indexed queries, implement pagination or iterate
readAllSchedules with offset/limit until exhausted (or explicitly document the
1000-record limitation) and add a TODO to add an indexed query in IScheduleStore
for future refactoring.
src/main/java/ai/labs/eddi/modules/llm/tools/UrlValidationUtils.java (1)

27-45: ⚡ Quick win

Documentation should acknowledge both @BeforeAll and @BeforeEach reset patterns.

The javadoc at lines 34–35 recommends resetting the flag in @BeforeEach / @AfterEach, but RagIngestionIT uses @BeforeAll / @AfterAll because the nginx container is expensive to start and all tests in that class require validation disabled. Both patterns are valid depending on whether individual tests or an entire test class needs the setting.

📝 Suggested javadoc clarification
  * Set via {`@link` `#setValidationEnabled`(boolean)} before test execution. This is
  * intentionally a static flag (not a config property) because
  * {`@code` UrlValidationUtils} is a stateless utility class. Test frameworks
- * should reset this in {`@code` `@BeforeEach`} / {`@code` `@AfterEach`}.
+ * should reset this in {`@code` `@BeforeEach`} / {`@code` `@AfterEach`} for per-test
+ * isolation, or {`@code` `@BeforeAll`} / {`@code` `@AfterAll`} when all tests in a
+ * class require the same setting.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/modules/llm/tools/UrlValidationUtils.java` around
lines 27 - 45, Update the Javadoc for the static flag to note both reset
patterns: explain that UrlValidationUtils.validationEnabled can be toggled via
UrlValidationUtils.setValidationEnabled(boolean) and tests should reset it using
either `@BeforeEach/`@AfterEach for per-test resets or `@BeforeAll/`@AfterAll for
class-scoped resets (e.g., expensive fixtures like an nginx container), so both
approaches are supported and acceptable depending on test fixture scope.
src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionService.java (2)

318-321: ⚖️ Poor tradeoff

Chunk count estimation is approximate and may mislead metrics.

The chunk count is estimated by dividing markdown length by chunk size, but actual chunk counts can differ significantly due to:

  • Chunk overlap (chunkOverlap)
  • Boundary-respecting splits (word/sentence boundaries)
  • Variable text density

Consider either:

  • Accepting the approximation and documenting it clearly in metrics/logs
  • Tracking actual chunk count by counting segments returned from the splitter
  • Using a separate estimated vs. actual counter
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionService.java` around
lines 318 - 321, Current code uses an approximate estimate (int estimatedChunks
= Math.max(1, doc.markdown().length() / ragConfig.getChunkSize())) which can
misrepresent real metrics; instead call the actual splitter used for ingestion
(e.g., splitter.split(doc.markdown()) or the method that returns segments) and
increment totalChunks by the real segments.size(); if you need to keep the fast
approximation, maintain two counters (estimatedChunks and actualChunks) and log
both via processLogger (or metrics emitter) so you record estimated vs actual
for comparison; update references to estimatedChunks, totalChunks,
ragConfig.getChunkSize(), and the splitter.split(...) call accordingly.

211-211: ⚡ Quick win

Counter name chunksSkippedCounter is misleading for unchanged documents.

When a document is unchanged (dedup), no chunks are created or skipped. Incrementing chunksSkippedCounter here suggests chunks were evaluated and skipped, but in reality the document was never converted to chunks. Consider renaming to documentsSkippedCounter or similar, or increment a different counter like documentsUnchangedCounter.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionService.java` at
line 211, The counter chunksSkippedCounter is misleading because it is
incremented for unchanged/deduplicated documents; change this to a clearer
counter name (e.g., documentsSkippedCounter or documentsUnchangedCounter) and
update all references where chunksSkippedCounter is declared and incremented
(rename the field/metric and its usages) so that the metric accurately reflects
"documents unchanged" instead of "chunks skipped"; ensure any metric
registration code (where chunksSkippedCounter is created) and any
telemetry/monitoring labels are updated to match the new name.
src/main/java/ai/labs/eddi/modules/ingestion/WebContentFetcher.java (1)

120-127: ⚖️ Poor tradeoff

Thread.sleep blocks the calling thread.

The Thread.sleep(settings.requestDelayMs()) call blocks the executing thread. If requestDelayMs is large or the crawl has many pages, this can block for extended periods. As per coding guidelines, code must be non-blocking.

Consider using non-blocking delay mechanisms (e.g., CompletableFuture.delayedExecutor() or Quarkus scheduler primitives) or accept that this task should only be invoked on virtual threads.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/modules/ingestion/WebContentFetcher.java` around
lines 120 - 127, The blocking Thread.sleep call in WebContentFetcher (inside the
loop that checks settings.requestDelayMs()) must be replaced with a non-blocking
delay; locate the sleep call in class WebContentFetcher and change the logic to
schedule the next fetch asynchronously (e.g., use
CompletableFuture.delayedExecutor(settings.requestDelayMs(),
TimeUnit.MILLISECONDS) or a shared ScheduledExecutorService or Quarkus
scheduling primitive) so the current thread is not blocked, ensure interruption
handling/concurrency semantics are preserved, and update the surrounding method
(the fetch loop) to await or chain the delayed completion instead of sleeping.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@src/main/java/ai/labs/eddi/configs/ingestion/mongo/MongoRagIngestionSourceStore.java`:
- Around line 65-66: Replace the hard-coded resource URI in
MongoRagIngestionSourceStore (the map.put("resource", ... ) call) with the
interface constant from IRestRagIngestionSourceStore (use
IRestRagIngestionSourceStore.resourceURI as the base) and then append the
resourceId.getId() and "?version=" + resourceId.getVersion(); ensure the
concatenation preserves any required slashes so the final URI matches the
interface constant format exactly.
- Around line 47-49: The current escaping in MongoRagIngestionSourceStore
(variable escaped used to build the regex for new
IResourceFilter.QueryFilter("ragConfigUri", ...)) is incomplete and allows regex
injection; replace the manual replace logic with a proper literal-escaping
mechanism (e.g., use java.util.regex.Pattern.quote on ragConfigUri) and build
the filter pattern as "^" + Pattern.quote(ragConfigUri) + "$" so the QueryFilter
matches the exact literal string safely.

In
`@src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java`:
- Around line 280-284: computeNextFire currently ignores the cronExpression and
returns now+60s; replace this placeholder in computeNextFire(String
cronExpression) with real cron parsing (e.g., using com.cronutils:cron-utils or
the project's scheduler utilities): parse the cronExpression into a Cron object,
create an ExecutionTime for it, compute the next execution after Instant.now(),
and return that Instant (or handle the absent next execution by throwing an
IllegalArgumentException or returning null per project conventions). Ensure you
reference the cronExpression parameter in computeNextFire and surface parsing
errors (invalid expressions) with clear exceptions or logs.

In `@src/main/java/ai/labs/eddi/modules/ingestion/ContentHashTracker.java`:
- Around line 163-165: In ContentHashTracker (the block that builds the update
Document setting "stale" and "staleAt"), replace the string timestamp usage
Instant.now().toString() with an Instant instance so "staleAt" is stored as an
Instant like "ingestedAt"/"updatedAt"; update the Document.append("staleAt",
...) to use Instant.now() (or the same Instant variable used for other
timestamps) to ensure consistent BSON type and querying across the class.

In `@src/main/java/ai/labs/eddi/modules/ingestion/WebContentFetcher.java`:
- Around line 258-262: The current fragment-only detection using pathAndQuery
(computed from uri.getPath() and uri.getQuery()) is too naive and rejects valid
URLs; update the logic in WebContentFetcher to detect fragment-only links by
comparing the normalized URI without fragment to the current page URI (e.g.,
compute a copy of uri with fragment removed or build normalizedWithoutFragment
from uri.getScheme(), getHost(), getPort(), getPath(), getQuery() and compare it
to the base/current page URL), and replace/remove the existing
pathAndQuery.isEmpty() || pathAndQuery.equals("/") check so only links that
differ from the current page only by fragment are treated as fragment-only and
skipped.

---

Nitpick comments:
In
`@src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java`:
- Line 228: The current linear scan using scheduleStore.readAllSchedules(1000)
in updateScheduleForSource and deleteScheduleForSource is O(n) and misses
schedules beyond 1000; replace the scan with an indexed lookup on sourceId (use
an existing IScheduleStore method like findByMetadata/findBySourceId or add one)
so you query schedules filtered by metadata/sourceId instead of reading all; if
the store truly lacks indexed queries, implement pagination or iterate
readAllSchedules with offset/limit until exhausted (or explicitly document the
1000-record limitation) and add a TODO to add an indexed query in IScheduleStore
for future refactoring.

In `@src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionService.java`:
- Around line 318-321: Current code uses an approximate estimate (int
estimatedChunks = Math.max(1, doc.markdown().length() /
ragConfig.getChunkSize())) which can misrepresent real metrics; instead call the
actual splitter used for ingestion (e.g., splitter.split(doc.markdown()) or the
method that returns segments) and increment totalChunks by the real
segments.size(); if you need to keep the fast approximation, maintain two
counters (estimatedChunks and actualChunks) and log both via processLogger (or
metrics emitter) so you record estimated vs actual for comparison; update
references to estimatedChunks, totalChunks, ragConfig.getChunkSize(), and the
splitter.split(...) call accordingly.
- Line 211: The counter chunksSkippedCounter is misleading because it is
incremented for unchanged/deduplicated documents; change this to a clearer
counter name (e.g., documentsSkippedCounter or documentsUnchangedCounter) and
update all references where chunksSkippedCounter is declared and incremented
(rename the field/metric and its usages) so that the metric accurately reflects
"documents unchanged" instead of "chunks skipped"; ensure any metric
registration code (where chunksSkippedCounter is created) and any
telemetry/monitoring labels are updated to match the new name.

In `@src/main/java/ai/labs/eddi/modules/ingestion/WebContentFetcher.java`:
- Around line 120-127: The blocking Thread.sleep call in WebContentFetcher
(inside the loop that checks settings.requestDelayMs()) must be replaced with a
non-blocking delay; locate the sleep call in class WebContentFetcher and change
the logic to schedule the next fetch asynchronously (e.g., use
CompletableFuture.delayedExecutor(settings.requestDelayMs(),
TimeUnit.MILLISECONDS) or a shared ScheduledExecutorService or Quarkus
scheduling primitive) so the current thread is not blocked, ensure interruption
handling/concurrency semantics are preserved, and update the surrounding method
(the fetch loop) to await or chain the delayed completion instead of sleeping.

In `@src/main/java/ai/labs/eddi/modules/llm/tools/UrlValidationUtils.java`:
- Around line 27-45: Update the Javadoc for the static flag to note both reset
patterns: explain that UrlValidationUtils.validationEnabled can be toggled via
UrlValidationUtils.setValidationEnabled(boolean) and tests should reset it using
either `@BeforeEach/`@AfterEach for per-test resets or `@BeforeAll/`@AfterAll for
class-scoped resets (e.g., expensive fixtures like an nginx container), so both
approaches are supported and acceptable depending on test fixture scope.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f7bb4752-3a9b-4f78-b5ee-ce753d6b6e09

📥 Commits

Reviewing files that changed from the base of the PR and between e35c57e and 519a838.

📒 Files selected for processing (31)
  • src/main/java/ai/labs/eddi/configs/ingestion/IRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/configs/ingestion/IRestRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/configs/ingestion/model/RagIngestionSource.java
  • src/main/java/ai/labs/eddi/configs/ingestion/model/SourceConfig.java
  • src/main/java/ai/labs/eddi/configs/ingestion/model/WebSourceConfig.java
  • src/main/java/ai/labs/eddi/configs/ingestion/mongo/MongoRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/engine/runtime/internal/ScheduleFireExecutor.java
  • src/main/java/ai/labs/eddi/engine/schedule/DirectScheduleExecutor.java
  • src/main/java/ai/labs/eddi/engine/schedule/model/ScheduleConfiguration.java
  • src/main/java/ai/labs/eddi/modules/ingestion/ContentConverter.java
  • src/main/java/ai/labs/eddi/modules/ingestion/ContentFetcher.java
  • src/main/java/ai/labs/eddi/modules/ingestion/ContentHashTracker.java
  • src/main/java/ai/labs/eddi/modules/ingestion/FetchResult.java
  • src/main/java/ai/labs/eddi/modules/ingestion/HtmlToMarkdownConverter.java
  • src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionDirectExecutor.java
  • src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionService.java
  • src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionTask.java
  • src/main/java/ai/labs/eddi/modules/ingestion/WebContentFetcher.java
  • src/main/java/ai/labs/eddi/modules/llm/tools/UrlValidationUtils.java
  • src/main/java/ai/labs/eddi/modules/rag/RagTask.java
  • src/main/java/ai/labs/eddi/modules/rag/bootstrap/RagModule.java
  • src/test/java/ai/labs/eddi/datastore/mongo/ContentHashTrackerTest.java
  • src/test/java/ai/labs/eddi/integration/RagIngestionIT.java
  • src/test/java/ai/labs/eddi/modules/ingestion/HtmlToMarkdownConverterTest.java
  • src/test/java/ai/labs/eddi/modules/llm/impl/TestEmbeddingModelFactory.java
  • src/test/java/ai/labs/eddi/modules/llm/impl/TestEmbeddingStoreFactory.java
  • src/test/resources/tests/ingestion/about.html
  • src/test/resources/tests/ingestion/docs/guide.html
  • src/test/resources/tests/ingestion/index.html
  • src/test/resources/tests/ingestion/index_updated.html

Comment on lines +163 to +165
new Document("$set", new Document()
.append("stale", true)
.append("staleAt", Instant.now().toString())));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Inconsistent timestamp storage format.

staleAt is stored as a String (Instant.now().toString()) on line 165, but ingestedAt and updatedAt are stored as Instant objects (lines 104, 118, 130). This inconsistency complicates querying and type safety.

🔧 Proposed fix

Store staleAt as an Instant for consistency:

                 new Document("$set", new Document()
                         .append("stale", true)
-                        .append("staleAt", Instant.now().toString())));
+                        .append("staleAt", Instant.now())));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
new Document("$set", new Document()
.append("stale", true)
.append("staleAt", Instant.now().toString())));
new Document("$set", new Document()
.append("stale", true)
.append("staleAt", Instant.now())));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/modules/ingestion/ContentHashTracker.java` around
lines 163 - 165, In ContentHashTracker (the block that builds the update
Document setting "stale" and "staleAt"), replace the string timestamp usage
Instant.now().toString() with an Instant instance so "staleAt" is stored as an
Instant like "ingestedAt"/"updatedAt"; update the Document.append("staleAt",
...) to use Instant.now() (or the same Instant variable used for other
timestamps) to ensure consistent BSON type and querying across the class.

Comment on lines +258 to +262
String pathAndQuery = uri.getPath() + (uri.getQuery() != null ? "?" + uri.getQuery() : "");
if (pathAndQuery.isEmpty() || pathAndQuery.equals("/")) {
// This is likely just a fragment link to the same page
return false;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fragment-only link detection is incomplete.

The check pathAndQuery.isEmpty() || pathAndQuery.equals("/") will reject valid links like https://example.com/ or links with only a query string (e.g., ?page=2). A fragment-only link should be detected by checking if the URL differs from the base URL only by the fragment, not by the presence of a path.

For example, if the current page is https://example.com/docs, a link to https://example.com/ is not a fragment-only link—it's a different page.

🔧 Proposed fix

Remove the fragment-only check or refine it to compare the normalized URL (without fragment) against the current page URL:

-            // Check fragment-only links (same page anchors)
-            String pathAndQuery = uri.getPath() + (uri.getQuery() != null ? "?" + uri.getQuery() : "");
-            if (pathAndQuery.isEmpty() || pathAndQuery.equals("/")) {
-                // This is likely just a fragment link to the same page
-                return false;
-            }
-
             return true;

Or, if you want to keep it, compare the stripped URL against the base page:

             // Check fragment-only links (same page anchors)
-            String pathAndQuery = uri.getPath() + (uri.getQuery() != null ? "?" + uri.getQuery() : "");
-            if (pathAndQuery.isEmpty() || pathAndQuery.equals("/")) {
-                // This is likely just a fragment link to the same page
-                return false;
-            }
+            // Fragment-only links are already stripped by stripFragment(), so this is not needed
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/modules/ingestion/WebContentFetcher.java` around
lines 258 - 262, The current fragment-only detection using pathAndQuery
(computed from uri.getPath() and uri.getQuery()) is too naive and rejects valid
URLs; update the logic in WebContentFetcher to detect fragment-only links by
comparing the normalized URI without fragment to the current page URI (e.g.,
compute a copy of uri with fragment removed or build normalizedWithoutFragment
from uri.getScheme(), getHost(), getPort(), getPath(), getQuery() and compare it
to the base/current page URL), and replace/remove the existing
pathAndQuery.isEmpty() || pathAndQuery.equals("/") check so only links that
differ from the current page only by fragment are treated as fragment-only and
skipped.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/main/java/ai/labs/eddi/modules/ingestion/ContentHashTracker.java (1)

83-105: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Normalize documentId before the upsert key is built.

markStaleDocuments() compares normalized IDs, but shouldIngest() stores and matches the raw documentId. If a URL flips between /page and /page/ or drops a fragment across runs, this will create a second hash row and falsely stale the first one even though it's the same document.

Suggested fix
 public boolean shouldIngest(String sourceId, String documentId, String content) {
-    if (documentId == null || documentId.isEmpty()) {
+    String normalizedDocumentId = normalizeId(documentId);
+    if (normalizedDocumentId.isEmpty()) {
         LOGGER.warn("Skipping document with null/empty ID");
         return false;
     }
     if (content == null || content.isEmpty()) {
-        LOGGER.warnf("Skipping document with null/empty content: %s", sanitize(documentId));
+        LOGGER.warnf("Skipping document with null/empty content: %s", sanitize(normalizedDocumentId));
         return false;
     }
     String hash = computeHash(content);
     Instant now = Instant.now();

     Bson filter = Filters.and(
             Filters.eq("sourceId", sourceId),
-            Filters.eq("documentId", documentId));
+            Filters.eq("documentId", normalizedDocumentId));

     Bson update = Updates.combine(
             Updates.set("hash", hash),
             Updates.set("ingestedAt", now),
             Updates.set("updatedAt", now),
             Updates.set("stale", false),
             Updates.setOnInsert("sourceId", sourceId),
-            Updates.setOnInsert("documentId", documentId));
+            Updates.setOnInsert("documentId", normalizedDocumentId));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/modules/ingestion/ContentHashTracker.java` around
lines 83 - 105, Normalize the documentId in shouldIngest before building the
upsert filter and update (so it matches how markStaleDocuments compares IDs);
call the same normalization routine used elsewhere (e.g., the method used by
markStaleDocuments or a shared normalizeDocumentId helper) to produce a
normalizedId, use that normalizedId in Filters.eq("documentId", ...) and
Updates.setOnInsert("documentId", ...), and update any log calls
(sanitize(documentId)) to use the normalized value where appropriate to ensure
consistent storage and comparisons between shouldIngest and markStaleDocuments.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java`:
- Around line 119-127: restVersionInfo.createDocument(source) currently commits
the new source before calling createScheduleForSource(...), and any exception
from CronParser.validate or scheduleStore.createSchedule is logged as a warning
so the API still returns 201; change this to make creation atomic by either (a)
propagating the exception so the caller returns an error (rethrow the caught
exception after logging) or (b) rolling back the committed source on failure
(call restVersionInfo.deleteDocument(resourceId.getId()) or equivalent) and then
rethrow; update the block around restVersionInfo.createDocument(source) /
createScheduleForSource(...) to ensure failures in createScheduleForSource
(which may throw from CronParser.validate or scheduleStore.createSchedule) do
not result in a successful response.
- Around line 236-239: Validate the cron expression before persisting the new
source version: move the CronParser.validate(source.schedule().cronExpression())
check (guarded by source.schedule().enabled()) to execute before calling
restVersionInfo.update(...), so the request fails early on invalid cron instead
of persisting and later swallowing the error in updateScheduleForSource(...);
ensure the validation throws/returns an error response and prevents calling
restVersionInfo.update(...) when invalid.

---

Outside diff comments:
In `@src/main/java/ai/labs/eddi/modules/ingestion/ContentHashTracker.java`:
- Around line 83-105: Normalize the documentId in shouldIngest before building
the upsert filter and update (so it matches how markStaleDocuments compares
IDs); call the same normalization routine used elsewhere (e.g., the method used
by markStaleDocuments or a shared normalizeDocumentId helper) to produce a
normalizedId, use that normalizedId in Filters.eq("documentId", ...) and
Updates.setOnInsert("documentId", ...), and update any log calls
(sanitize(documentId)) to use the normalized value where appropriate to ensure
consistent storage and comparisons between shouldIngest and markStaleDocuments.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 861a769e-5e6b-4b88-9f6f-78f1717853f3

📥 Commits

Reviewing files that changed from the base of the PR and between 519a838 and ccc5b2f.

📒 Files selected for processing (5)
  • src/main/java/ai/labs/eddi/configs/ingestion/IRestRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/configs/ingestion/mongo/MongoRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/modules/ingestion/ContentHashTracker.java
  • src/main/java/ai/labs/eddi/utils/StringUtilities.java
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/main/java/ai/labs/eddi/configs/ingestion/mongo/MongoRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/configs/ingestion/IRestRagIngestionSourceStore.java

Comment on lines +236 to +239
// Validate cron expression if schedule is enabled
if (source.schedule().enabled()) {
CronParser.validate(source.schedule().cronExpression());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate the cron before writing the new source version.

The validation is too late here. restVersionInfo.update(...) has already persisted the new version on Line 103, and updateScheduleForSource(...) swallows the validation failure at Lines 267-268. That means an invalid enabled cron can still return 200 OK, while the stored schedule remains stale. Move the validation ahead of restVersionInfo.update(...) and fail the request instead.

Suggested minimal fix
 public Response updateIngestionSource(String id, Integer version, RagIngestionSource source) {
+    if (source.schedule().enabled()) {
+        CronParser.validate(source.schedule().cronExpression());
+    }
     Response response = restVersionInfo.update(id, version, source);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java`
around lines 236 - 239, Validate the cron expression before persisting the new
source version: move the CronParser.validate(source.schedule().cronExpression())
check (guarded by source.schedule().enabled()) to execute before calling
restVersionInfo.update(...), so the request fails early on invalid cron instead
of persisting and later swallowing the error in updateScheduleForSource(...);
ensure the validation throws/returns an error response and prevents calling
restVersionInfo.update(...) when invalid.

.toList();

if (normalizedIds.isEmpty()) {
LOGGER.debugf("All document IDs were empty after normalization for source '%s' — skipping stale marking", sourceId);
}
int markedStale = ps.executeUpdate();
if (markedStale > 0) {
LOGGER.infof("Marked %d documents as stale for source '%s'", markedStale, sourceId);

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/main/java/ai/labs/eddi/modules/ingestion/MongoContentHashStore.java (1)

93-103: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Normalize documentId in shouldIngest to match stale-marking logic.

markStaleDocuments compares against normalized IDs, but shouldIngest persists raw IDs. This can mark active documents as stale when IDs contain fragments or trailing slashes.

Proposed fix
     public boolean shouldIngest(String sourceId, String documentId, String content) {
-        if (documentId == null || documentId.isEmpty()) {
+        String normalizedDocumentId = normalizeId(documentId);
+        if (normalizedDocumentId.isEmpty()) {
             LOGGER.warn("Skipping document with null/empty ID");
             return false;
         }
@@
         Bson filter = Filters.and(
                 Filters.eq("sourceId", sourceId),
-                Filters.eq("documentId", documentId));
+                Filters.eq("documentId", normalizedDocumentId));
@@
                 Updates.set("stale", false),
                 Updates.setOnInsert("sourceId", sourceId),
-                Updates.setOnInsert("documentId", documentId));
+                Updates.setOnInsert("documentId", normalizedDocumentId));

Also applies to: 125-138

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/modules/ingestion/MongoContentHashStore.java`
around lines 93 - 103, Normalize the documentId in shouldIngest so it matches
the normalization used by markStaleDocuments: call the same normalization helper
(e.g., normalizeDocumentId or the method used in markStaleDocuments) on the
incoming documentId before building the Bson filter and the
Updates.setOnInsert("documentId", ...) values; apply the same change to the
other occurrence (the second block at lines 125-138) so both persistence and
stale-marking use identical normalized IDs.
🧹 Nitpick comments (1)
src/test/java/ai/labs/eddi/datastore/mongo/MongoContentHashStoreTest.java (1)

30-32: ⚡ Quick win

Avoid dropping the collection in @BeforeEach; it removes indexes under test.

Current cleanup wipes the unique/index setup created at init, so most tests no longer exercise indexed behavior.

Proposed fix
     `@BeforeEach`
     void clean() {
-        dropCollections("rag_ingestion_hashes");
+        getDatabase()
+                .getCollection("rag_ingestion_hashes")
+                .deleteMany(new org.bson.Document());
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/test/java/ai/labs/eddi/datastore/mongo/MongoContentHashStoreTest.java`
around lines 30 - 32, The clean() helper in MongoContentHashStoreTest currently
calls dropCollections("rag_ingestion_hashes") which removes indexes under test;
change it to only remove documents while preserving indexes (e.g. call the
collection's deleteMany/clear method or use the test's MongoTemplate to remove
all documents from "rag_ingestion_hashes") so tests keep the unique/index setup;
update the clean() implementation (replace dropCollections(...) with a
deleteAllDocuments approach) and keep the collection name "rag_ingestion_hashes"
referenced.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/main/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStore.java`:
- Around line 109-113: The code inconsistently normalizes document IDs:
markStaleDocuments normalizes fetched IDs but shouldIngest saves the raw
documentId, causing mismatches; update shouldIngest (the method that prepares
and executes the INSERT/SELECT with ps.setString(2, documentId)) to apply the
same normalization function used in markStaleDocuments to documentId before any
DB operations, and likewise ensure any other places that read/write document IDs
(including the code paths around lines 173-191) use the same normalizer so
storage and stale-mark checks use identical IDs.
- Around line 108-153: The SELECT...FOR UPDATE/read-then-INSERT flow in
PostgresContentHashStore leads to race conditions causing unique-key violations;
replace this logic (the block that queries rag_ingestion_hashes and then INSERTs
or UPDATEs) with a single atomic upsert using "INSERT ... ON CONFLICT
(source_id, document_id) DO UPDATE" that sets hash, stale, ingested_at,
updated_at appropriately and determine the boolean changed from the upsert
result (e.g., via RETURNING or by comparing excluded.hash to existing row), or
alternatively catch SQLState 23505 around the INSERT and retry the read/update
path; update the method in PostgresContentHashStore that manipulates changed so
concurrent first-ingest no longer throws unique-constraint errors.

In
`@src/test/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStoreTest.java`:
- Around line 213-227: The concurrency test currently spawns raw Threads, uses
join(5000) (which can silently timeout) and only asserts results.contains(true),
so uncaught exceptions or timed-out threads can make the test falsely pass;
modify the test in PostgresContentHashStoreTest to use an ExecutorService (or
wrap each Runnable with try/catch) and collect futures/exceptions so failures
surface, wait for task completion deterministically (e.g., executor.invokeAll or
join without silent timeouts and then assert all tasks completed), assert no
exceptions were recorded (fail the test if any were), and finally assert that
results.size() matches the expected thread count and results.contains(true) to
ensure at least one successful insertion.

---

Outside diff comments:
In `@src/main/java/ai/labs/eddi/modules/ingestion/MongoContentHashStore.java`:
- Around line 93-103: Normalize the documentId in shouldIngest so it matches the
normalization used by markStaleDocuments: call the same normalization helper
(e.g., normalizeDocumentId or the method used in markStaleDocuments) on the
incoming documentId before building the Bson filter and the
Updates.setOnInsert("documentId", ...) values; apply the same change to the
other occurrence (the second block at lines 125-138) so both persistence and
stale-marking use identical normalized IDs.

---

Nitpick comments:
In `@src/test/java/ai/labs/eddi/datastore/mongo/MongoContentHashStoreTest.java`:
- Around line 30-32: The clean() helper in MongoContentHashStoreTest currently
calls dropCollections("rag_ingestion_hashes") which removes indexes under test;
change it to only remove documents while preserving indexes (e.g. call the
collection's deleteMany/clear method or use the test's MongoTemplate to remove
all documents from "rag_ingestion_hashes") so tests keep the unique/index setup;
update the clean() implementation (replace dropCollections(...) with a
deleteAllDocuments approach) and keep the collection name "rag_ingestion_hashes"
referenced.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 041a0546-1fee-4a6e-a247-f383f3f8ce07

📥 Commits

Reviewing files that changed from the base of the PR and between ccc5b2f and 9546168.

📒 Files selected for processing (8)
  • src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java
  • src/main/java/ai/labs/eddi/datastore/DataStoreProducers.java
  • src/main/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStore.java
  • src/main/java/ai/labs/eddi/modules/ingestion/IContentHashStore.java
  • src/main/java/ai/labs/eddi/modules/ingestion/MongoContentHashStore.java
  • src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionService.java
  • src/test/java/ai/labs/eddi/datastore/mongo/MongoContentHashStoreTest.java
  • src/test/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStoreTest.java
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/main/java/ai/labs/eddi/modules/ingestion/RagIngestionService.java
  • src/main/java/ai/labs/eddi/configs/ingestion/rest/RestRagIngestionSourceStore.java

Comment thread src/main/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStore.java Outdated
Comment thread src/main/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStore.java Outdated
Comment on lines +213 to +227
Thread t = new Thread(() -> {
boolean result = store.shouldIngest("src-1", "same-doc", "same-content");
results.add(result);
});
threads.add(t);
t.start();
}

for (Thread t : threads) {
t.join(5000);
}

// At least one thread should have returned true (first insertion wins)
assertTrue(results.contains(true), "At least one concurrent caller should detect new content");
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Concurrency test misses worker failures and can false-pass.

The test only checks that one true exists; uncaught thread exceptions and timed-out joins don’t fail the test.

Proposed fix
         void concurrentShouldIngest() throws Exception {
             int threadCount = 10;
             var results = java.util.concurrent.ConcurrentHashMap.<Boolean>newKeySet();
+            var failures = new java.util.concurrent.ConcurrentLinkedQueue<Throwable>();
             var threads = new java.util.ArrayList<Thread>();

             for (int i = 0; i < threadCount; i++) {
                 Thread t = new Thread(() -> {
-                    boolean result = store.shouldIngest("src-1", "same-doc", "same-content");
-                    results.add(result);
+                    try {
+                        boolean result = store.shouldIngest("src-1", "same-doc", "same-content");
+                        results.add(result);
+                    } catch (Throwable t1) {
+                        failures.add(t1);
+                    }
                 });
                 threads.add(t);
                 t.start();
             }

             for (Thread t : threads) {
                 t.join(5000);
+                assertFalse(t.isAlive(), "A worker thread did not complete in time");
             }

+            assertTrue(failures.isEmpty(), "No worker thread should fail");
             // At least one thread should have returned true (first insertion wins)
             assertTrue(results.contains(true), "At least one concurrent caller should detect new content");
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Thread t = new Thread(() -> {
boolean result = store.shouldIngest("src-1", "same-doc", "same-content");
results.add(result);
});
threads.add(t);
t.start();
}
for (Thread t : threads) {
t.join(5000);
}
// At least one thread should have returned true (first insertion wins)
assertTrue(results.contains(true), "At least one concurrent caller should detect new content");
}
void concurrentShouldIngest() throws Exception {
int threadCount = 10;
var results = java.util.concurrent.ConcurrentHashMap.<Boolean>newKeySet();
var failures = new java.util.concurrent.ConcurrentLinkedQueue<Throwable>();
var threads = new java.util.ArrayList<Thread>();
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(() -> {
try {
boolean result = store.shouldIngest("src-1", "same-doc", "same-content");
results.add(result);
} catch (Throwable t1) {
failures.add(t1);
}
});
threads.add(t);
t.start();
}
for (Thread t : threads) {
t.join(5000);
assertFalse(t.isAlive(), "A worker thread did not complete in time");
}
assertTrue(failures.isEmpty(), "No worker thread should fail");
// At least one thread should have returned true (first insertion wins)
assertTrue(results.contains(true), "At least one concurrent caller should detect new content");
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/test/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStoreTest.java`
around lines 213 - 227, The concurrency test currently spawns raw Threads, uses
join(5000) (which can silently timeout) and only asserts results.contains(true),
so uncaught exceptions or timed-out threads can make the test falsely pass;
modify the test in PostgresContentHashStoreTest to use an ExecutorService (or
wrap each Runnable with try/catch) and collect futures/exceptions so failures
surface, wait for task completion deterministically (e.g., executor.invokeAll or
join without silent timeouts and then assert all tasks completed), assert no
exceptions were recorded (fail the test if any were), and finally assert that
results.size() matches the expected thread count and results.contains(true) to
ensure at least one successful insertion.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/main/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStore.java`:
- Around line 110-115: The ON CONFLICT DO UPDATE branch in
PostgresContentHashStore leaves stale_at intact when reactivating a document;
update the SQL in the conflict UPDATE (the string built in
PostgresContentHashStore where the clause starts with "ON CONFLICT (source_id,
document_id) DO UPDATE SET") to include "stale_at = NULL" alongside "stale =
false" so stale timestamps are cleared when a row is reactivated; ensure you add
that assignment in the same SET list (no new placeholders needed) and keep the
existing ingested_at/updated_at logic unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 153352c4-3515-46ad-bea7-490457c23922

📥 Commits

Reviewing files that changed from the base of the PR and between 5bb4eb1 and 357cf21.

📒 Files selected for processing (4)
  • src/main/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStore.java
  • src/main/java/ai/labs/eddi/modules/ingestion/MongoContentHashStore.java
  • src/test/java/ai/labs/eddi/datastore/mongo/MongoContentHashStoreTest.java
  • src/test/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStoreTest.java
💤 Files with no reviewable changes (1)
  • src/test/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStoreTest.java
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/main/java/ai/labs/eddi/modules/ingestion/MongoContentHashStore.java
  • src/test/java/ai/labs/eddi/datastore/mongo/MongoContentHashStoreTest.java

Comment on lines +110 to +115
"ON CONFLICT (source_id, document_id) DO UPDATE SET " +
" hash = EXCLUDED.hash, " +
" stale = false, " +
" ingested_at = CASE WHEN rag_ingestion_hashes.hash = EXCLUDED.hash THEN rag_ingestion_hashes.ingested_at ELSE ? END, "
+
" updated_at = ? " +

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Clear stale_at when reactivating a document.

The conflict-update path sets stale = false but leaves stale_at untouched. That can persist stale timestamps on active rows and create inconsistent state for downstream consumers.

Proposed fix
 "ON CONFLICT (source_id, document_id) DO UPDATE SET " +
 "  hash = EXCLUDED.hash, " +
 "  stale = false, " +
+"  stale_at = NULL, " +
 "  ingested_at = CASE WHEN rag_ingestion_hashes.hash = EXCLUDED.hash THEN rag_ingestion_hashes.ingested_at ELSE ? END, "
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"ON CONFLICT (source_id, document_id) DO UPDATE SET " +
" hash = EXCLUDED.hash, " +
" stale = false, " +
" ingested_at = CASE WHEN rag_ingestion_hashes.hash = EXCLUDED.hash THEN rag_ingestion_hashes.ingested_at ELSE ? END, "
+
" updated_at = ? " +
"ON CONFLICT (source_id, document_id) DO UPDATE SET " +
" hash = EXCLUDED.hash, " +
" stale = false, " +
" stale_at = NULL, " +
" ingested_at = CASE WHEN rag_ingestion_hashes.hash = EXCLUDED.hash THEN rag_ingestion_hashes.ingested_at ELSE ? END, "
" updated_at = ? " +
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/ai/labs/eddi/datastore/postgres/PostgresContentHashStore.java`
around lines 110 - 115, The ON CONFLICT DO UPDATE branch in
PostgresContentHashStore leaves stale_at intact when reactivating a document;
update the SQL in the conflict UPDATE (the string built in
PostgresContentHashStore where the clause starts with "ON CONFLICT (source_id,
document_id) DO UPDATE SET") to include "stale_at = NULL" alongside "stale =
false" so stale timestamps are cleared when a row is reactivated; ensure you add
that assignment in the same SET list (no new placeholders needed) and keep the
existing ingested_at/updated_at logic unchanged.

* partial JSON (missing fields get Java's primitive defaults: 0, false, null).
*/
public Scope {
if (maxPages <= 0) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the system should not allow wrong (pointless) configurations that will never surface, instead better is to throw illegal argument exception or some other error. Defaults should only kick in when no value was set via configs (null value). (In addition we should also apply frontend validation on the manager for this)


private static final Logger LOGGER = Logger.getLogger(RagIngestionDirectExecutor.class);

private static final String OBJECTID_PATTERN = "^[0-9a-fA-F]{24}$";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make sense to pre-compile this regex for better performance

}
}

private String extractSourceId(String uriStr) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be replaced with existing utility method RestUtilities.extractResourceId

@ginccc ginccc requested a review from Copilot June 9, 2026 20:09

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error: Your billing is not configured or you have Copilot licenses from multiple standalone organizations or enterprises. To use premium requests, select a billing entity via the GitHub site, under Settings > Copilot > Features.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 36 out of 36 changed files in this pull request and generated 12 comments.

Comment on lines +39 to +45
/**
* Enables or disables SSRF validation. When disabled, only URL syntax and
* scheme (http/https) are checked — no DNS resolution or private-IP blocking.
*/
public static void setValidationEnabled(boolean enabled) {
validationEnabled = enabled;
}
Comment on lines +257 to +262
// Check fragment-only links (same page anchors)
String pathAndQuery = uri.getPath() + (uri.getQuery() != null ? "?" + uri.getQuery() : "");
if (pathAndQuery.isEmpty() || pathAndQuery.equals("/")) {
// This is likely just a fragment link to the same page
return false;
}
Comment on lines +305 to +313
private boolean matchesGlob(String text, String pattern) {
// Simple glob matching: * matches any sequence, ? matches single char
String regex = pattern
.replace(".", "\\.")
.replace("**", ".*")
.replace("*", "[^/]*")
.replace("?", ".");
return text.matches(regex);
}
Comment on lines +15 to +18
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
Comment on lines +7 to +8
import ai.labs.eddi.configs.ingestion.model.SourceConfig;

Comment on lines +66 to +69
String directType = schedule.getDirectExecutionType();
if (directType != null) {
return executeDirectly(schedule, instanceId, attemptNumber, directType);
}
Comment on lines +150 to +152
RagConfiguration ragConfig = resolveRagConfiguration(sourceConfig.ragConfigUri());
String kbId = sourceConfig.name(); // Use source name as knowledge base ID

Comment on lines +304 to +308
// Create splitter
var splitter = DocumentSplitters.recursive(
ragConfig.getChunkSize(),
ragConfig.getChunkOverlap());

Comment on lines +33 to +36
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

Comment on lines +108 to +114
/**
* Compact constructor — provides safe defaults for Jackson partial JSON.
*/
public CrawlSettings {
if (requestDelayMs <= 0) {
requestDelayMs = 500;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants