Skip to content

[REF-1086]Feat/tools/file lambda#2008

Closed
alchemistklk wants to merge 4 commits intomainfrom
feat/tools/file-lambda
Closed

[REF-1086]Feat/tools/file lambda#2008
alchemistklk wants to merge 4 commits intomainfrom
feat/tools/file-lambda

Conversation

@alchemistklk
Copy link
Contributor

@alchemistklk alchemistklk commented Jan 7, 2026

This pull request introduces a major infrastructure for integrating AWS Lambda-based file processing into the API. It adds support for dispatching and tracking asynchronous document, image, and video processing jobs via SQS and S3, and lays out the configuration, database schema, DTOs, and service wiring needed for this system.

The most important changes are:

Infrastructure & Service Integration:

  • Added new dependencies for AWS S3 and SQS SDKs to enable communication with AWS services. (apps/api/package.json)
  • Introduced the LambdaModule with all necessary service providers, BullMQ queue registrations, and bridge services for Lambda job processing. (apps/api/src/modules/lambda/lambda.module.ts)
  • Wired the new LambdaModule into the DriveModule to enable Lambda-based processing from drive-related endpoints. (apps/api/src/modules/drive/drive.module.ts)

Configuration:

  • Extended application configuration to support Lambda job processing, including AWS region, Lambda function ARNs, SQS queue URLs, S3 bucket settings, polling intervals, and timeouts. (apps/api/src/modules/config/app.config.ts)
  • Added a new config parameter to control the maximum number of tokens stored for parsed content. (apps/api/src/modules/config/app.config.ts)

Database Schema:

  • Added a new LambdaJob model to the Prisma schema to track async Lambda job executions, including job status, type, file/result references, error information, and metadata. (apps/api/prisma/schema.prisma)
  • Extended the DriveFileParseCache model to include a metadata field for storing Lambda job IDs and related metadata. (apps/api/prisma/schema.prisma)

Data Transfer Objects (DTOs) & Processing Logic:

  • Defined comprehensive DTOs for Lambda task and result envelopes, job records, and payloads for document ingest, image transform, document render, and video analyze operations. Also included utility types for job status and result handling. (apps/api/src/modules/lambda/lambda.dto.ts)
  • Added a BullMQ processor (LambdaResultProcessor) for handling Lambda job results, delegating result processing to a dedicated service, and supporting reliable retries. (apps/api/src/modules/lambda/lambda.processor.ts)

Summary by CodeRabbit

Release Notes

  • New Features
    • Asynchronous file processing now available for documents, images, and videos
    • Added document ingestion, image transformation, document rendering (PDF/DOCX), and video analysis capabilities
    • File operations execute in the background with real-time job tracking and status monitoring for improved performance

✏️ Tip: You can customize this high-level summary in your review settings.

…integration

- Added LambdaModule to encapsulate Lambda processing logic.
- Created LambdaService for dispatching tasks to SQS.
- Implemented LambdaResultProcessor for processing results from SQS.
- Developed LambdaResultHandlerService to handle results and update job statuses.
- Introduced SqsBridgeService for polling SQS and enqueuing messages to BullMQ.
- Defined constants for Lambda job types, statuses, and queues.
- Added error handling and logging throughout the services.
@coderabbitai
Copy link

coderabbitai bot commented Jan 7, 2026

📝 Walkthrough

Walkthrough

This PR introduces a comprehensive Lambda-based asynchronous processing infrastructure for document ingestion, image transformation, document rendering, and video analysis. It adds new Prisma models, AWS SDK integrations, SQS-to-BullMQ bridging, result handling, and extends the drive service to dispatch tasks with caching and status tracking.

Changes

Cohort / File(s) Summary
Configuration & Dependencies
.gitignore, apps/api/package.json, apps/api/src/modules/config/app.config.ts
Added .ai/ ignore rule; imported @aws-sdk/client-s3 and @aws-sdk/client-sqs; extended app.config.ts with comprehensive Lambda configuration (region, function ARNs, SQS queue URLs, S3 buckets, polling intervals, timeouts, and video options).
Database Schema
apps/api/prisma/schema.prisma
Added optional metadata field to DriveFileParseCache for JSON storage; introduced new LambdaJob model with fields for job tracking (status, storage type, user, file/result linkage, timing, metadata) and multiple indices.
Lambda Module & Core Services
apps/api/src/modules/lambda/lambda.dto.ts, apps/api/src/modules/lambda/lambda.module.ts, apps/api/src/modules/lambda/lambda.service.ts, apps/api/src/modules/lambda/lambda.processor.ts
Introduced comprehensive Lambda DTO definitions (task/result envelopes, payloads for each task type, dispatch params, status helpers); created NestJS module with conditional BullMQ queue registration; implemented core LambdaService for job dispatch, SQS messaging, and S3 output retrieval; added BullMQ result processor for async job handling.
Result Processing & SQS Integration
apps/api/src/modules/lambda/result-handler.service.ts, apps/api/src/modules/lambda/sqs-bridge.service.ts
Implemented LambdaResultHandlerService to process Lambda results per task type, persist content to cache/storage, and handle failures; created SqsBridgeService to poll SQS, enqueue results to BullMQ with deduplication and lifecycle management.
Drive Service Integration
apps/api/src/modules/drive/drive.module.ts, apps/api/src/modules/drive/drive.service.ts
Added LambdaModule to DriveModule imports; extended DriveService with Lambda dispatch methods (preCreateDriveFileForLambda, dispatchLambdaImageTransform, dispatchLambdaDocumentRender, getLambdaJobStatus), Lambda-eligible content type detection, Lambda-based parsing with polling and fallback logic, and async parsing triggers integrated into file creation.
Constants
apps/api/src/utils/const.ts
Added Lambda queue identifiers (DOC_INGEST, IMAGE_TRANSFORM, DOC_RENDER, VIDEO_ANALYZE, RESULT), job type constants, job status constants (PENDING, PROCESSING, SUCCESS, FAILED), and storage type constants (TEMPORARY, PERMANENT).

Sequence Diagram

sequenceDiagram
    actor Client
    participant DriveService
    participant LambdaService
    participant SQS
    participant Lambda
    participant S3
    participant BullMQ
    participant ResultHandler

    rect rgb(220, 240, 255)
    note over Client,S3: Task Dispatch Flow
    Client->>DriveService: dispatchLambdaImageTransform()
    DriveService->>DriveService: preCreateDriveFile()
    DriveService->>LambdaService: dispatchImageTransform(params)
    LambdaService->>LambdaService: generateJobId()
    LambdaService->>LambdaService: buildTaskEnvelope()
    LambdaService->>SQS: sendMessage(taskEnvelope)
    SQS-->>LambdaService: ✓ sent
    LambdaService-->>DriveService: LambdaJobRecord
    DriveService-->>Client: {driveFile, lambdaJob}
    end

    rect rgb(255, 240, 220)
    note over Lambda,S3: Lambda Processing (async)
    SQS->>Lambda: invoked with taskEnvelope
    Lambda->>S3: read source
    S3-->>Lambda: file content
    Lambda->>Lambda: process (transform/render/analyze)
    Lambda->>S3: write resultEnvelope
    S3-->>Lambda: ✓ stored
    Lambda->>SQS: publish resultEnvelope to result queue
    SQS-->>Lambda: ✓ published
    end

    rect rgb(240, 255, 240)
    note over SQS,ResultHandler: Result Polling & Processing
    SqsBridgeService->>SQS: pollMessages()
    SQS-->>SqsBridgeService: resultEnvelopes
    SqsBridgeService->>BullMQ: enqueue job (deduplicated by jobId)
    BullMQ-->>SqsBridgeService: ✓ queued
    SqsBridgeService->>SQS: deleteMessage()
    
    BullMQ->>ResultHandler: processResult(envelope)
    ResultHandler->>ResultHandler: routeByTaskType()
    alt Success Path
        ResultHandler->>S3: getJobOutput()
        S3-->>ResultHandler: outputContent
        ResultHandler->>ResultHandler: persistContentToCache()
        ResultHandler->>ResultHandler: updateDriveFile()
    else Failure Path
        ResultHandler->>ResultHandler: handleFailure()
        ResultHandler->>ResultHandler: markJobFailed()
    end
    BullMQ-->>ResultHandler: ✓ processed
    end

    rect rgb(255, 240, 240)
    note over Client,DriveService: Status Polling (Optional)
    Client->>DriveService: getLambdaJobStatus(jobId)
    DriveService->>LambdaService: getJobStatus(jobId)
    LambdaService->>LambdaService: computeEffectiveStatus()
    LambdaService-->>DriveService: {job, effectiveStatus}
    DriveService-->>Client: status
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • nettee

🐰 Hop along to Lambda land, where rabbits don't toil—
Dispatch your tasks to the cloud, let async jobs uncoil,
With SQS bridges and S3 stores so grand,
Results come hopping back to your hand,
No more parsing by paw, just workflows that soar! ✨🚀

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title '[REF-1171]Feat/tools/file lambda' is vague and uses non-standard formatting with a ticket prefix and unclear terms ('tools/file lambda'). It does not clearly convey the main change—adding Lambda-based async file processing infrastructure. Revise the title to clearly describe the main change, such as: 'Add Lambda-based async file processing with SQS and BullMQ integration' or 'Integrate AWS Lambda for asynchronous file operations'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@alchemistklk alchemistklk changed the title Feat/tools/file lambda [REF-1171]Feat/tools/file lambda Jan 7, 2026
@linear
Copy link

linear bot commented Jan 7, 2026

@alchemistklk alchemistklk changed the title [REF-1171]Feat/tools/file lambda [REF-1086]Feat/tools/file lambda Jan 7, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Fix all issues with AI agents
In @apps/api/package.json:
- Around line 35-36: The package.json declares mismatched, outdated AWS SDK
clients; update the dependencies "@aws-sdk/client-s3" and "@aws-sdk/client-sqs"
to the unified latest caret version ^3.964.0 so both are aligned and include the
newest patches; modify the versions for those two entries in
apps/api/package.json, run npm/yarn install and then run tests/build to ensure
no breaking changes.

In @apps/api/src/modules/drive/drive.service.ts:
- Around line 2260-2309: dispatchLambdaDocumentRender uses this.lambdaService
without guarding for null/undefined; add a null-check before calling
this.lambdaService.dispatchDocumentRender (e.g., if (!this.lambdaService) throw
or return an appropriate error/result) so the method fails fast with a clear
message when lambdaService is not configured, and keep the rest of the flow
(preCreateDriveFileForLambda, logger.info, and returned { driveFile, lambdaJob
}) intact; reference the dispatchLambdaDocumentRender method and the
lambdaService property when making the change.
- Around line 2204-2251: dispatchLambdaImageTransform calls
this.lambdaService.dispatchImageTransform without guarding that
this.lambdaService is present (it’s injected @Optional()), so add a null check
before invoking dispatchImageTransform in dispatchLambdaImageTransform: if
this.lambdaService is missing, log a clear warning and either throw a
descriptive error (e.g., "Lambda service not configured") or return a handled
failure, so you avoid a runtime TypeError; reference the method names
dispatchLambdaImageTransform and dispatchImageTransform and ensure the
preCreateDriveFileForLambda call and created driveFile are handled appropriately
if you choose to abort early.
- Around line 1036-1055: The code calls JSON.parse(cache.metadata) in the block
that checks cache.parseStatus and will throw if metadata is malformed; wrap the
parse in a safe try/catch (e.g., parse into metadata = {} on error), log a
warning including fileId and the raw cache.metadata, and proceed using the empty
metadata so processing continues; update the block that references
cache.metadata, toDTO(driveFile), and
lambdaService.getJobStatus(metadata.lambdaJobId) to use the safely parsed
metadata.

In @apps/api/src/modules/lambda/lambda.service.ts:
- Around line 476-487: The S3 response.Body is an SdkStream (platform-union) not
a Node Readable, so update the S3 client creation or response handling in the
method that returns the stream: either narrow the client by casting the created
S3Client to NodeJsClient<S3Client> (so response.Body is typed as Node.js stream)
before calling getObject, or wrap/convert the returned SdkStream using
sdkStreamMixin from @smithy/util-stream (and then call toReadableStream/convert
to Node Readable) instead of directly asserting response.Body as Readable;
reference the S3Client instance creation, the response.Body usage, and
import/use of NodeJsClient<S3Client> or sdkStreamMixin to implement the change.
🧹 Nitpick comments (13)
apps/api/prisma/schema.prisma (1)

1983-1984: Consider extracting frequently-queried fields from metadata.

The metadata field stores JSON including lambdaJobId. If the codebase frequently queries parse cache entries by Lambda job ID, consider adding a dedicated lambdaJobId column with an index for better query performance. This would avoid JSON parsing overhead and enable efficient lookups.

apps/api/src/modules/lambda/lambda.processor.ts (1)

12-14: Consider adding JSDoc comment for the interface.

While the interface is straightforward, adding a brief JSDoc comment would align with the documentation pattern used for the processor class below.

📝 Suggested documentation
+/**
+ * Job data structure for Lambda result processing jobs
+ */
 export interface LambdaResultJobData {
   envelope: LambdaResultEnvelope<ResultPayload>;
 }
apps/api/src/modules/config/app.config.ts (1)

279-313: Lambda configuration block is well-structured.

The configuration covers all necessary aspects of Lambda integration (region, function ARNs, SQS queues, S3 settings, polling, and timeouts). A few minor observations:

  1. Lines 294-297 use Number.parseInt(process.env.X || 'default') while lines 301-302, 309-312 use Number.parseInt(process.env.X) || default. Both work, but the latter is more consistent with the rest of the file.

  2. Consider adding validation or documentation for the expected ranges of these values (e.g., maxMessages should be 1-10 for SQS).

♻️ Optional: Consistent parseInt pattern
       bridge: {
-        pollIntervalMs: Number.parseInt(process.env.SQS_BRIDGE_POLL_INTERVAL_MS || '1000'),
-        maxMessages: Number.parseInt(process.env.SQS_BRIDGE_MAX_MESSAGES || '10'),
-        waitTimeSeconds: Number.parseInt(process.env.SQS_BRIDGE_WAIT_TIME_SECONDS || '20'),
-        visibilityTimeout: Number.parseInt(process.env.SQS_BRIDGE_VISIBILITY_TIMEOUT || '60'),
+        pollIntervalMs: Number.parseInt(process.env.SQS_BRIDGE_POLL_INTERVAL_MS) || 1000,
+        maxMessages: Number.parseInt(process.env.SQS_BRIDGE_MAX_MESSAGES) || 10,
+        waitTimeSeconds: Number.parseInt(process.env.SQS_BRIDGE_WAIT_TIME_SECONDS) || 20,
+        visibilityTimeout: Number.parseInt(process.env.SQS_BRIDGE_VISIBILITY_TIMEOUT) || 60,
       },
apps/api/src/modules/lambda/sqs-bridge.service.ts (3)

180-207: Potential inFlightMessages counter inconsistency on early return.

If message.Body or receiptHandle is missing (lines 184-186), the function returns early before incrementing inFlightMessages. This is correct. However, if the resultQueue is unavailable (lines 204-207), the message is not deleted and the counter is decremented in finally. This could cause messages to accumulate in SQS if the queue remains unavailable.

Consider logging at a higher severity or adding a mechanism to handle persistent queue unavailability.


130-147: Backoff is linear, not exponential.

The comment says "exponential backoff" but pollIntervalMs * 2 is a fixed delay, not exponential. For true exponential backoff, track consecutive errors and increase delay exponentially (e.g., min(baseDelay * 2^retries, maxDelay)).

♻️ True exponential backoff
+  private consecutiveErrors = 0;
+
   private async pollLoop(): Promise<void> {
     while (!this.shouldStop) {
       try {
         await this.pollMessages();
+        this.consecutiveErrors = 0;
       } catch (error) {
         this.logger.error({ error }, 'Error in poll loop');
-        // Exponential backoff on error
-        await this.sleep(Math.min(this.pollIntervalMs * 2, 30000));
+        this.consecutiveErrors++;
+        const backoffMs = Math.min(this.pollIntervalMs * Math.pow(2, this.consecutiveErrors), 30000);
+        await this.sleep(backoffMs);
       }

285-289: pollTimeout reference overwrite may prevent proper cleanup.

The sleep method assigns a new timeout to this.pollTimeout on each call. If sleep is called concurrently (e.g., multiple messages processing), earlier timeouts cannot be cleared via onModuleDestroy. Consider using a local variable or a Set to track active timeouts if concurrent sleeps are expected.

♻️ Safer sleep implementation
-  private pollTimeout: NodeJS.Timeout | null = null;
+  private activeTimeouts = new Set<NodeJS.Timeout>();
 
   private sleep(ms: number): Promise<void> {
     return new Promise((resolve) => {
-      this.pollTimeout = setTimeout(resolve, ms);
+      const timeout = setTimeout(() => {
+        this.activeTimeouts.delete(timeout);
+        resolve();
+      }, ms);
+      this.activeTimeouts.add(timeout);
     });
   }
 
   async onModuleDestroy() {
     // ...
-    if (this.pollTimeout) {
-      clearTimeout(this.pollTimeout);
-      this.pollTimeout = null;
-    }
+    for (const timeout of this.activeTimeouts) {
+      clearTimeout(timeout);
+    }
+    this.activeTimeouts.clear();
     // ...
   }
apps/api/src/modules/lambda/result-handler.service.ts (3)

111-113: Unknown task type should mark job as failed.

Currently, unknown task types are logged but the job remains in a non-terminal state. Consider marking the job as failed to prevent indefinite pending status.

♻️ Mark unknown task type as failed
         default:
           this.logger.warn({ type }, 'Unknown task type');
+          await this.handleFailure(context, {
+            code: 'UNKNOWN_TASK_TYPE',
+            message: `Unknown task type: ${type}`,
+            retryable: false,
+          });
       }

381-388: Remove or implement markDriveFileFailed.

This method is a no-op with an eslint-disable comment. If the functionality isn't needed, remove the method and its call site (line 356). If it's planned for future implementation, add a TODO comment or tracking issue.

♻️ Option 1: Remove the no-op method
-  // eslint-disable-next-line @typescript-eslint/no-unused-vars
-  private async markDriveFileFailed(_fileId: string, _errorMessage: string): Promise<void> {
-    // DriveFile doesn't have an uploadStatus field
-    // The failure is tracked in LambdaJob table
-    // We could optionally delete the pre-created DriveFile here, but leaving it
-    // allows users to see that there was an attempted upload
-  }

And in handleFailure:

-    // If there's a fileId, mark the file as failed
-    if (context.fileId) {
-      await this.markDriveFileFailed(context.fileId, error?.message || 'Lambda processing failed');
-    }

459-466: Missing error handling in markJobProcessing.

Unlike other methods, markJobProcessing doesn't handle the case where the job doesn't exist (Prisma will throw). Consider adding a try-catch or existence check.

♻️ Add error handling
   async markJobProcessing(jobId: string): Promise<void> {
+    try {
       await this.prisma.lambdaJob.update({
         where: { jobId },
         data: {
           status: LAMBDA_JOB_STATUS_PROCESSING,
         },
       });
+    } catch (error) {
+      this.logger.warn({ jobId, error }, 'Failed to mark job as processing');
+    }
   }
apps/api/src/modules/lambda/lambda.service.ts (2)

47-58: Unused BullMQ queue injections.

These four queues are injected but never used in the service. If they're planned for future use, add a TODO comment. Otherwise, remove them to reduce confusion.

♻️ Remove unused injections
   constructor(
     @InjectPinoLogger(LambdaService.name)
     private readonly logger: PinoLogger,
     private readonly config: ConfigService,
     private readonly prisma: PrismaService,
-    @Optional()
-    @InjectQueue(QUEUE_LAMBDA_DOC_INGEST)
-    private readonly docIngestQueue?: Queue,
-    @Optional()
-    @InjectQueue(QUEUE_LAMBDA_IMAGE_TRANSFORM)
-    private readonly imageTransformQueue?: Queue,
-    @Optional()
-    @InjectQueue(QUEUE_LAMBDA_DOC_RENDER)
-    private readonly docRenderQueue?: Queue,
-    @Optional()
-    @InjectQueue(QUEUE_LAMBDA_VIDEO_ANALYZE)
-    private readonly videoAnalyzeQueue?: Queue,
   ) {

410-431: Duplicated effective status logic.

This logic duplicates getEffectiveStatus from lambda.dto.ts. Consider reusing the utility function.

♻️ Reuse getEffectiveStatus from DTOs
+import { getEffectiveStatus } from './lambda.dto';
+
   async getJobStatus(jobId: string): Promise<{
     job: LambdaJobRecord | null;
     effectiveStatus: string | null;
   }> {
     const job = await this.getJob(jobId);
     if (!job) {
       return { job: null, effectiveStatus: null };
     }
 
-    let effectiveStatus: string;
-    if (job.status === 'failed') {
-      effectiveStatus = 'FAILED';
-    } else if (job.status === 'success') {
-      effectiveStatus = job.storageType === 'temporary' ? 'PENDING_PERSIST' : 'COMPLETED';
-    } else if (job.status === 'processing') {
-      effectiveStatus = 'PROCESSING';
-    } else {
-      effectiveStatus = 'PENDING';
-    }
+    const effectiveStatus = getEffectiveStatus(job.status, job.storageType);
 
     return { job, effectiveStatus };
   }
apps/api/src/modules/drive/drive.service.ts (1)

1222-1236: Fire-and-forget pattern with swallowed errors.

The .catch(() => {}) pattern (lines 1232, 1286) silently swallows all errors from async parsing. Consider at minimum logging failures even if they shouldn't block the response.

♻️ Log errors instead of swallowing
     this.triggerAsyncParse(user, {
       fileId: file.fileId,
       type: file.type,
       storageKey: req.driveStorageKey,
       name: file.name,
-    }).catch(() => {});
+    }).catch((err) => {
+      this.logger.debug({ fileId: file.fileId, error: err.message }, 'Async parse trigger failed');
+    });
apps/api/src/modules/lambda/lambda.dto.ts (1)

86-92: Note: TODO comment for OCR and extractImages.

Line 86 has a TODO for features not yet available. Consider tracking this in an issue.

Would you like me to open an issue to track the OCR and extractImages feature implementation?

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 08ff73c and e56d1f2.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (13)
  • .gitignore
  • apps/api/package.json
  • apps/api/prisma/schema.prisma
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/utils/const.ts
🧰 Additional context used
📓 Path-based instructions (16)
**/*.{js,ts,jsx,tsx}

📄 CodeRabbit inference engine (.cursorrules)

**/*.{js,ts,jsx,tsx}: Always use optional chaining (?.) when accessing object properties
Always use nullish coalescing (??) or default values for potentially undefined values
Always check array existence before using array methods
Always validate object properties before destructuring
Always use single quotes for string literals in JavaScript/TypeScript code

**/*.{js,ts,jsx,tsx}: Use semicolons at the end of statements
Include spaces around operators (e.g., a + b instead of a+b)
Always use curly braces for control statements
Place opening braces on the same line as their statement

**/*.{js,ts,jsx,tsx}: Group import statements in order: React/framework libraries, third-party libraries, internal modules, relative path imports, type imports, style imports
Sort imports alphabetically within each import group
Leave a blank line between import groups
Extract complex logic into custom hooks
Use functional updates for state (e.g., setCount(prev => prev + 1))
Split complex state into multiple state variables rather than single large objects
Use useReducer for complex state logic instead of multiple useState calls

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{js,ts,tsx,jsx,py,java,cpp,c,cs,rb,go,rs,php,swift,kt,scala,r,m,mm,sql}

📄 CodeRabbit inference engine (.cursor/rules/00-language-priority.mdc)

**/*.{js,ts,tsx,jsx,py,java,cpp,c,cs,rb,go,rs,php,swift,kt,scala,r,m,mm,sql}: All code comments MUST be written in English
All variable names, function names, class names, and other identifiers MUST use English words
Comments should be concise and explain 'why' rather than 'what'
Use proper grammar and punctuation in comments
Keep comments up-to-date when code changes
Document complex logic, edge cases, and important implementation details
Use clear, descriptive names that indicate purpose
Avoid abbreviations unless they are universally understood

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{js,ts,tsx,jsx}

📄 CodeRabbit inference engine (.cursor/rules/00-language-priority.mdc)

Use JSDoc style comments for functions and classes in JavaScript/TypeScript

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{js,jsx,ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/01-code-style.mdc)

**/*.{js,jsx,ts,tsx}: Use single quotes for string literals in TypeScript/JavaScript
Always use optional chaining (?.) when accessing object properties in TypeScript/JavaScript
Always use nullish coalescing (??) or default values for potentially undefined values in TypeScript/JavaScript
Always check array existence before using array methods in TypeScript/JavaScript
Validate object properties before destructuring in TypeScript/JavaScript
Use ES6+ features like arrow functions, destructuring, and spread operators in TypeScript/JavaScript
Avoid magic numbers and strings - use named constants in TypeScript/JavaScript
Use async/await instead of raw promises for asynchronous code in TypeScript/JavaScript

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/03-typescript-guidelines.mdc)

**/*.{ts,tsx}: Avoid using any type whenever possible - use unknown type instead with proper type guards
Always define explicit return types for functions, especially for public APIs
Prefer extending existing types over creating entirely new types
Use TypeScript utility types (Partial<T>, Pick<T, K>, Omit<T, K>, Readonly<T>, Record<K, T>) to derive new types
Use union types and intersection types to combine existing types
Always import types explicitly using the import type syntax
Group type imports separately from value imports
Minimize creating local type aliases for imported types

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{js,ts,jsx,tsx,css,json}

📄 CodeRabbit inference engine (.cursor/rules/04-code-formatting.mdc)

Maximum line length of 100 characters

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/package.json
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{js,ts,jsx,tsx,css,json,yml,yaml}

📄 CodeRabbit inference engine (.cursor/rules/04-code-formatting.mdc)

Use 2 spaces for indentation, no tabs

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/package.json
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{js,ts,jsx,tsx,css,json,yml,yaml,md}

📄 CodeRabbit inference engine (.cursor/rules/04-code-formatting.mdc)

No trailing whitespace at the end of lines

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/package.json
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{css,scss,sass,less,js,jsx,ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/09-design-system.mdc)

**/*.{css,scss,sass,less,js,jsx,ts,tsx}: Primary color (#155EEF) should be used for main brand color in buttons, links, and accents
Error color (#F04438) should be used for error states and destructive actions
Success color (#12B76A) should be used for success states and confirmations
Warning color (#F79009) should be used for warnings and important notifications
Info color (#0BA5EC) should be used for informational elements

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{tsx,ts}

📄 CodeRabbit inference engine (.cursor/rules/09-i18n-guidelines.mdc)

**/*.{tsx,ts}: Use the translation wrapper component and useTranslation hook in components
Ensure all user-facing text is translatable

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{tsx,ts,json}

📄 CodeRabbit inference engine (.cursor/rules/09-i18n-guidelines.mdc)

Support dynamic content with placeholders in translations

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/package.json
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{tsx,ts,jsx,js,vue,css,scss,less}

📄 CodeRabbit inference engine (.cursor/rules/11-ui-design-patterns.mdc)

**/*.{tsx,ts,jsx,js,vue,css,scss,less}: Use the primary blue (#155EEF) for main UI elements, CTAs, and active states
Use red (#F04438) only for errors, warnings, and destructive actions
Use green (#12B76A) for success states and confirmations
Use orange (#F79009) for warning states and important notifications
Use blue (#0BA5EC) for informational elements
Primary buttons should be solid with the primary color
Secondary buttons should have a border with transparent or light background
Danger buttons should use the error color
Use consistent padding, border radius, and hover states for all buttons
Follow fixed button sizes based on their importance and context
Use consistent border radius (rounded-lg) for all cards
Apply light shadows (shadow-sm) for card elevation
Maintain consistent padding inside cards (p-4 or p-6)
Use subtle borders for card separation
Ensure proper spacing between card elements
Apply consistent styling to all form inputs
Use clear visual indicators for focus, hover, and error states in form elements
Apply proper spacing between elements using 8px, 16px, 24px increments
Ensure proper alignment of elements (left, center, or right)
Use responsive layouts that work across different device sizes
Maintain a minimum contrast ratio of 4.5:1 for text

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{tsx,ts,jsx,js,vue}

📄 CodeRabbit inference engine (.cursor/rules/11-ui-design-patterns.mdc)

**/*.{tsx,ts,jsx,js,vue}: Include appropriate loading states for async actions in buttons
Group related form elements with appropriate spacing
Provide clear validation feedback for forms
Ensure proper labeling and accessibility for form elements
Ensure all interactive elements are keyboard accessible
Include appropriate ARIA attributes for complex components
Provide alternative text for images and icons
Support screen readers with semantic HTML elements

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.cursor/rules/08-contributing-guidelines.mdc)

**/*.{ts,tsx,js,jsx}: Follow the TypeScript/JavaScript style guidelines
Ensure code is well-tested and documented

Files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/lambda/lambda.processor.ts
  • apps/api/src/utils/const.ts
  • apps/api/src/modules/config/app.config.ts
  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
apps/api/src/{utils,common}/**

📄 CodeRabbit inference engine (.cursor/rules/06-api-structure.mdc)

Organize shared utilities in dedicated utility and common directories

Files:

  • apps/api/src/utils/const.ts
apps/api/src/**/*.{controller,service}.ts

📄 CodeRabbit inference engine (.cursor/rules/06-api-structure.mdc)

Implement proper error handling in API modules

Files:

  • apps/api/src/modules/lambda/result-handler.service.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
🧠 Learnings (2)
📚 Learning: 2025-11-25T03:04:12.836Z
Learnt from: CR
Repo: refly-ai/refly PR: 0
File: .cursor/rules/06-api-structure.mdc:0-0
Timestamp: 2025-11-25T03:04:12.836Z
Learning: Applies to apps/api/src/{auth,user,project,canvas,rag,knowledge,search,skill,share,code-artifact}/**/*.{ts,tsx} : Follow NestJS module structure with controllers, services, and DTOs in dedicated module directories

Applied to files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/lambda.dto.ts
📚 Learning: 2025-11-25T03:04:12.836Z
Learnt from: CR
Repo: refly-ai/refly PR: 0
File: .cursor/rules/06-api-structure.mdc:0-0
Timestamp: 2025-11-25T03:04:12.836Z
Learning: Applies to apps/api/src/{auth,user,project,canvas,rag,knowledge,search,skill,share,code-artifact}/**/*.{controller,service}.ts : Use dependency injection for module dependencies in NestJS

Applied to files:

  • apps/api/src/modules/lambda/lambda.module.ts
  • apps/api/src/modules/drive/drive.module.ts
  • apps/api/src/modules/lambda/sqs-bridge.service.ts
  • apps/api/src/modules/lambda/lambda.service.ts
  • apps/api/src/modules/drive/drive.service.ts
🧬 Code graph analysis (6)
apps/api/src/modules/lambda/lambda.processor.ts (2)
apps/api/src/modules/lambda/lambda.dto.ts (2)
  • LambdaResultEnvelope (42-57)
  • ResultPayload (323-327)
apps/api/src/utils/const.ts (1)
  • QUEUE_LAMBDA_RESULT (43-43)
apps/api/src/modules/config/app.config.ts (2)
apps/api/src/modules/lambda/lambda.processor.ts (1)
  • process (35-46)
apps/api/src/modules/schedule/schedule.processor.ts (1)
  • process (79-653)
apps/api/src/modules/drive/drive.module.ts (1)
apps/api/src/modules/lambda/lambda.module.ts (1)
  • Module (31-36)
apps/api/src/modules/lambda/sqs-bridge.service.ts (2)
apps/api/src/utils/const.ts (1)
  • QUEUE_LAMBDA_RESULT (43-43)
apps/api/src/modules/lambda/lambda.dto.ts (2)
  • LambdaResultEnvelope (42-57)
  • ResultPayload (323-327)
apps/api/src/modules/lambda/lambda.service.ts (2)
apps/api/src/utils/const.ts (7)
  • QUEUE_LAMBDA_DOC_INGEST (39-39)
  • QUEUE_LAMBDA_IMAGE_TRANSFORM (40-40)
  • QUEUE_LAMBDA_DOC_RENDER (41-41)
  • QUEUE_LAMBDA_VIDEO_ANALYZE (42-42)
  • LAMBDA_JOB_STATUS_PENDING (52-52)
  • LAMBDA_STORAGE_TYPE_TEMPORARY (58-58)
  • LAMBDA_JOB_STATUS_FAILED (55-55)
apps/api/src/modules/lambda/lambda.dto.ts (11)
  • LambdaTaskType (12-16)
  • LambdaJobRecord (291-308)
  • LambdaTaskEnvelope (26-36)
  • DispatchDocumentIngestParams (212-230)
  • DocumentIngestTaskPayload (72-93)
  • DispatchImageTransformParams (232-248)
  • ImageTransformTaskPayload (109-128)
  • DispatchDocumentRenderParams (250-267)
  • DocumentRenderTaskPayload (143-161)
  • DispatchVideoAnalyzeParams (269-285)
  • VideoAnalyzeTaskPayload (173-192)
apps/api/src/modules/drive/drive.service.ts (6)
packages/observability/src/index.ts (3)
  • Trace (60-60)
  • getTracer (60-60)
  • getCurrentSpan (60-60)
packages/observability/src/trace-decorator.ts (3)
  • Trace (10-67)
  • getTracer (113-115)
  • getCurrentSpan (108-110)
packages/openapi-schema/src/types.gen.ts (2)
  • DriveFile (8282-8351)
  • DriveFileSource (8278-8278)
apps/api/src/modules/tool/utils/token.ts (1)
  • truncateContent (314-317)
packages/utils/src/id.ts (1)
  • genDriveFileID (85-87)
apps/api/src/modules/lambda/lambda.dto.ts (1)
  • LambdaJobRecord (291-308)
🔇 Additional comments (18)
.gitignore (1)

59-59: LGTM!

The addition of .ai/* follows the established pattern (consistent with .codex/* on line 58) and appropriately excludes AI-related configuration/state files from version control.

apps/api/src/modules/drive/drive.module.ts (1)

7-7: LGTM! Clean module integration.

The LambdaModule import follows NestJS dependency injection patterns correctly, enabling Lambda-based processing capabilities in DriveModule.

Also applies to: 10-10

apps/api/prisma/schema.prisma (1)

2418-2460: Well-structured Lambda job tracking model.

The LambdaJob model provides comprehensive tracking with appropriate indexes for common access patterns (by user, status, fileId, and resultId). The field types, defaults, and optional markers are correctly defined.

apps/api/src/modules/lambda/lambda.processor.ts (1)

35-46: Solid error handling with BullMQ retry integration.

The processor correctly delegates to the result handler service and re-throws errors to engage BullMQ's retry mechanism. Error logging includes the jobId and stack trace for debugging.

apps/api/src/utils/const.ts (1)

38-59: Well-organized Lambda constants following existing patterns.

The new constants maintain consistency with existing queue naming conventions and provide clear, descriptive identifiers for Lambda job types, statuses, and storage types. The naming aligns well with the Prisma schema definitions.

apps/api/src/modules/lambda/lambda.module.ts (1)

18-36: No action needed—the exported services work correctly in desktop mode.

The concern about LambdaService and LambdaResultHandlerService failing in desktop mode due to missing queue dependencies is unfounded:

  1. LambdaService injects queues with @Optional(), but never uses them. The service dispatches tasks via AWS SQS (sendToSQS()), not BullMQ queues.
  2. LambdaResultHandlerService has no queue dependencies whatsoever—only ConfigService, PrismaService, and ObjectStorageService.
  3. SqsBridgeService (which genuinely uses @InjectQueue()) is correctly placed in the conditionally registered processorProviders, so it only instantiates in non-desktop mode.

The conditional registration pattern is already correct and requires no changes.

Likely an incorrect or invalid review comment.

apps/api/src/modules/config/app.config.ts (1)

40-40: LGTM!

The new maxStorageTokens configuration is well-placed and follows the existing pattern for drive-related settings.

apps/api/src/modules/lambda/sqs-bridge.service.ts (3)

1-14: LGTM!

Clean import organization with proper separation of external packages, AWS SDK, and internal modules.


16-31: Excellent JSDoc documentation.

The architecture diagram and feature list provide clear context for maintainers.


237-258: Envelope parsing and validation is solid.

Good validation of required fields and version check. The truncated body in error logging (line 255) prevents log flooding with large payloads.

apps/api/src/modules/lambda/result-handler.service.ts (2)

52-122: Core result processing logic is well-structured.

Good idempotency check (lines 65-68) and proper routing to type-specific handlers. The error handling wraps handler exceptions and delegates to handleFailure.


160-230: Robust cache persistence with proper error handling.

Good pattern: try to persist, on failure update cache with error status. The content cleaning (line 174) and token truncation are appropriate.

apps/api/src/modules/lambda/lambda.service.ts (1)

177-228: Dispatch pattern is well-implemented.

Good separation: create DB record first, then dispatch to SQS. Error handling marks the job as failed if dispatch fails. The envelope structure is consistent across all dispatch methods.

apps/api/src/modules/drive/drive.service.ts (2)

78-78: Good use of @optional() for backward compatibility.

The optional LambdaService injection allows the drive service to function without Lambda infrastructure.


645-653: Good supported content type check.

Clear list of Lambda-parsable types. Consider extracting to a constant for reusability.

apps/api/src/modules/lambda/lambda.dto.ts (3)

1-36: Well-structured type definitions.

Good use of generic envelope types (LambdaTaskEnvelope<TPayload>) for type-safe payload handling. The version field enables future schema evolution.


71-106: Clear task and result payload definitions.

Consistent structure across all payload types with proper JSDoc comments. The separation of task payloads (input) and result payloads (output) is clean.


340-360: getEffectiveStatus utility is well-implemented.

Clear, explicit handling of all status/storageType combinations. This centralizes status logic for reuse across services.

Comment on lines +35 to +36
"@aws-sdk/client-s3": "^3.958.0",
"@aws-sdk/client-sqs": "^3.960.0",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Check latest versions available on npm
npm view @aws-sdk/client-s3@latest version
npm view @aws-sdk/client-sqs@latest version

Repository: refly-ai/refly

Length of output: 514


🏁 Script executed:

# Check if there are any known security vulnerabilities using npm audit
# First, let's see the package.json context
head -40 apps/api/package.json

Repository: refly-ai/refly

Length of output: 1921


🏁 Script executed:

# Search for security advisories via npm security database
npm audit --json 2>/dev/null | jq -r '.vulnerabilities | keys[] | select(. | contains("aws-sdk"))'

Repository: refly-ai/refly

Length of output: 107


Update AWS SDK dependencies to the latest version and align package versions.

Both @aws-sdk/client-s3 and @aws-sdk/client-sqs are behind the latest available version (3.964.0). Update both packages to ^3.964.0 for consistency and to ensure the latest security patches are available.

  • @aws-sdk/client-s3: ^3.958.0^3.964.0
  • @aws-sdk/client-sqs: ^3.960.0^3.964.0
🤖 Prompt for AI Agents
In @apps/api/package.json around lines 35 - 36, The package.json declares
mismatched, outdated AWS SDK clients; update the dependencies
"@aws-sdk/client-s3" and "@aws-sdk/client-sqs" to the unified latest caret
version ^3.964.0 so both are aligned and include the newest patches; modify the
versions for those two entries in apps/api/package.json, run npm/yarn install
and then run tests/build to ensure no breaking changes.

Comment on lines +712 to +814
// Poll for result with timeout
const pollIntervalMs = this.config.get<number>('lambda.resultPolling.intervalMs') || 1000;
const maxRetries = this.config.get<number>('lambda.resultPolling.maxRetries') || 300;

const maxTokens = this.config.get<number>('drive.maxContentTokens') || 25000;
const originalLen = content.length;
const truncateStart = performance.now();
content = truncateContent(content, maxTokens);
this.logger.info(
`[loadOrParseDriveFile] truncateContent from cache: fileId=${fileId}, len=${originalLen}, time=${(performance.now() - truncateStart).toFixed(2)}ms`,
);
let retries = 0;
while (retries < maxRetries) {
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
retries++;

span.setAttributes({ 'cache.hit': true, 'content.length': content.length });
span.end();
return { ...this.toDTO(driveFile), content };
} catch (error) {
this.logger.warn(`Cache read failed for ${fileId}, will re-parse:`, error);
span.setAttributes({ 'cache.hit': false, 'cache.error': true });
span.end();
throw error; // Rethrow to continue to parse below
}
})
.catch(() => null); // Return null to continue parsing if cache fails
}
const { job, effectiveStatus } = await this.lambdaService!.getJobStatus(lambdaJob.jobId);

// Check file size limit before downloading - large files should use execute_code tool
const storageKey = driveFile.storageKey ?? this.generateStorageKey(user, driveFile);
const maxFileSizeKB = this.config.get<number>('drive.maxParseFileSizeKB') || 512;
const maxFileSizeBytes = maxFileSizeKB * 1024;
if (effectiveStatus === 'FAILED') {
const errorMsg = job?.error || 'Lambda processing failed';
this.logger.error(`Lambda job ${lambdaJob.jobId} failed: ${errorMsg}`);

let fileStat: ObjectInfo | undefined;
try {
fileStat = await this.internalOss.statObject(storageKey);
} catch (error) {
this.logger.error(`Failed to stat drive file ${fileId}: ${(error as Error)?.message}`);
throw new DriveFileNotFoundError(`Drive file not found: ${fileId}`);
}
await this.prisma.driveFileParseCache.update({
where: { fileId },
data: {
parseStatus: 'failed',
parseError: JSON.stringify({ message: errorMsg, lambdaJobId: lambdaJob.jobId }),
updatedAt: new Date(),
},
});

if (fileStat && fileStat.size > maxFileSizeBytes) {
const fileSizeKB = Math.round(fileStat.size / 1024);
this.logger.info(
`Drive file ${fileId} exceeds size limit: ${fileSizeKB}KB > ${maxFileSizeKB}KB`,
);
throw new FileTooLargeError(
'File exceeds size limit. Use execute_code tool to process this file.',
fileSizeKB,
);
// Fallback to local parsing
return this.parseLocally(user, driveFile, storageKey);
}

if (effectiveStatus === 'COMPLETED' || effectiveStatus === 'PENDING_PERSIST') {
// Job completed, fetch the result
if (job?.storageKey) {
return tracer.startActiveSpan('drive.loadLambdaResult', async (span) => {
try {
// Use LambdaService to read from AWS S3 where Lambda writes output
// (internalOss points to local MinIO, not AWS S3)
let content = await this.lambdaService!.getJobOutputAsString(job.storageKey!);
if (!content) {
throw new Error(`Failed to read Lambda output from S3: ${job.storageKey}`);
}

// Process content - remove null characters
content = content.replace(/\0/g, '');
content = this.normalizeWhitespace(content);

const maxTokens = this.config.get<number>('drive.maxContentTokens') || 25000;
const truncatedContent = truncateContent(content, maxTokens);

// Save to cache
const contentStorageKey = `drive-parsed/${user.uid}/${fileId}.txt`;
await this.internalOss.putObject(contentStorageKey, truncatedContent);

const metadata = job.metadata ? JSON.parse(job.metadata) : {};
const wordCount = metadata.wordCount || readingTime(truncatedContent).words;

await this.prisma.driveFileParseCache.update({
where: { fileId },
data: {
contentStorageKey,
parser: 'lambda',
numPages: metadata.pageCount || null,
wordCount,
parseStatus: 'success',
parseError: null,
updatedAt: new Date(),
},
});

span.setAttributes({
'lambda.jobId': lambdaJob.jobId,
'content.length': truncatedContent.length,
});
span.end();

return { ...this.toDTO(driveFile), content: truncatedContent };
} catch (error) {
span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
span.end();
throw error;
}
});
}
}

// Still processing, continue polling
}

// Step 2: No cache found, perform parsing
getCurrentSpan()?.setAttribute('cache.hit', false);
// Timeout reached
this.logger.error(`Lambda job ${lambdaJob.jobId} timed out after ${maxRetries} retries`);
await this.prisma.driveFileParseCache.update({
where: { fileId },
data: {
parseStatus: 'failed',
parseError: JSON.stringify({
message: 'Lambda processing timeout',
lambdaJobId: lambdaJob.jobId,
}),
updatedAt: new Date(),
},
});

// Fallback to local parsing
return this.parseLocally(user, driveFile, storageKey);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Blocking polling loop may cause request timeouts.

The parseViaLambda method blocks the request thread while polling for up to 5 minutes (300 retries × 1s). This could cause HTTP timeouts and poor user experience.

Consider returning early with a "processing" status and letting the client poll for completion, or using WebSocket/SSE for real-time updates.

Comment on lines +1036 to +1055
// Check if cache is still processing (Lambda job in progress)
if (cache?.parseStatus === 'processing') {
// Check for timeout (default: 5 minutes)
const timeoutMs = this.config.get<number>('lambda.parseTimeoutMs') || 5 * 60 * 1000;
const elapsed = Date.now() - new Date(cache.updatedAt).getTime();

if (elapsed < timeoutMs) {
const metadata = cache.metadata ? JSON.parse(cache.metadata) : {};
if (metadata.lambdaJobId && this.lambdaService) {
const { effectiveStatus } = await this.lambdaService.getJobStatus(metadata.lambdaJobId);
if (effectiveStatus === 'PROCESSING' || effectiveStatus === 'PENDING') {
// Return summary while processing
return { ...this.toDTO(driveFile), content: driveFile.summary || 'Processing...' };
}
}
} else {
// Timeout exceeded, mark as stale and allow re-parsing
this.logger.warn({ fileId, elapsed, timeoutMs }, 'Lambda parsing timeout, will retry');
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential JSON.parse error on cache.metadata.

Line 1043 calls JSON.parse(cache.metadata) without try-catch. If metadata is malformed, this will throw an unhandled exception.

🐛 Add safe JSON parsing
       if (elapsed < timeoutMs) {
-        const metadata = cache.metadata ? JSON.parse(cache.metadata) : {};
+        let metadata = {};
+        try {
+          metadata = cache.metadata ? JSON.parse(cache.metadata) : {};
+        } catch {
+          this.logger.warn({ fileId }, 'Failed to parse cache metadata');
+        }
         if (metadata.lambdaJobId && this.lambdaService) {
🤖 Prompt for AI Agents
In @apps/api/src/modules/drive/drive.service.ts around lines 1036 - 1055, The
code calls JSON.parse(cache.metadata) in the block that checks cache.parseStatus
and will throw if metadata is malformed; wrap the parse in a safe try/catch
(e.g., parse into metadata = {} on error), log a warning including fileId and
the raw cache.metadata, and proceed using the empty metadata so processing
continues; update the block that references cache.metadata, toDTO(driveFile),
and lambdaService.getJobStatus(metadata.lambdaJobId) to use the safely parsed
metadata.

Comment on lines +2204 to +2251
async dispatchLambdaImageTransform(
user: User,
params: {
canvasId: string;
name: string;
sourceKey: string;
options?: {
format?: 'webp' | 'jpeg' | 'png';
quality?: number;
maxWidth?: number;
maxHeight?: number;
};
},
): Promise<{ driveFile: DriveFile; lambdaJob: LambdaJobRecord }> {
// Determine output content type based on format
const format = params.options?.format ?? 'webp';
const contentType = `image/${format}`;

// 1. Pre-create DriveFile record
const driveFile = await this.preCreateDriveFileForLambda(user, {
canvasId: params.canvasId,
name: params.name,
contentType,
});

// 2. Dispatch to Lambda
const inputBucket =
this.config.get<string>('objectStorage.minio.internal.bucket') || 'refly-weblink';
const outputBucket = this.config.get<string>('lambda.s3.bucket') || inputBucket;

const lambdaJob = await this.lambdaService.dispatchImageTransform({
uid: user.uid,
fileId: driveFile.fileId,
s3Input: {
bucket: inputBucket,
key: params.sourceKey,
},
outputBucket,
name: params.name,
options: params.options,
});

this.logger.info(
`Dispatched Lambda image transform: fileId=${driveFile.fileId}, jobId=${lambdaJob.jobId}`,
);

return { driveFile, lambdaJob };
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Missing null check for lambdaService before dispatch.

Line 2234 uses this.lambdaService.dispatchImageTransform without checking if lambdaService is defined. Since it's injected with @Optional(), this will throw a runtime error if Lambda is not configured.

🐛 Add null check
   async dispatchLambdaImageTransform(
     user: User,
     params: { ... },
   ): Promise<{ driveFile: DriveFile; lambdaJob: LambdaJobRecord }> {
+    if (!this.lambdaService) {
+      throw new Error('Lambda service is not available');
+    }
+
     // Determine output content type based on format
     const format = params.options?.format ?? 'webp';
🤖 Prompt for AI Agents
In @apps/api/src/modules/drive/drive.service.ts around lines 2204 - 2251,
dispatchLambdaImageTransform calls this.lambdaService.dispatchImageTransform
without guarding that this.lambdaService is present (it’s injected @Optional()),
so add a null check before invoking dispatchImageTransform in
dispatchLambdaImageTransform: if this.lambdaService is missing, log a clear
warning and either throw a descriptive error (e.g., "Lambda service not
configured") or return a handled failure, so you avoid a runtime TypeError;
reference the method names dispatchLambdaImageTransform and
dispatchImageTransform and ensure the preCreateDriveFileForLambda call and
created driveFile are handled appropriately if you choose to abort early.

Comment on lines +2260 to +2309
async dispatchLambdaDocumentRender(
user: User,
params: {
canvasId: string;
name: string;
sourceKey: string;
format: 'pdf' | 'docx';
options?: {
template?: string;
pageSize?: 'A4' | 'Letter';
};
},
): Promise<{ driveFile: DriveFile; lambdaJob: LambdaJobRecord }> {
// Determine content type based on format
const contentType =
params.format === 'pdf'
? 'application/pdf'
: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document';

// 1. Pre-create DriveFile record
const driveFile = await this.preCreateDriveFileForLambda(user, {
canvasId: params.canvasId,
name: params.name,
contentType,
});

// 2. Dispatch to Lambda
const inputBucket =
this.config.get<string>('objectStorage.minio.internal.bucket') || 'refly-weblink';
const outputBucket = this.config.get<string>('lambda.s3.bucket') || inputBucket;

const lambdaJob = await this.lambdaService.dispatchDocumentRender({
uid: user.uid,
fileId: driveFile.fileId,
s3Input: {
bucket: inputBucket,
key: params.sourceKey,
},
outputBucket,
name: params.name,
format: params.format,
options: params.options,
});

this.logger.info(
`Dispatched Lambda document render: fileId=${driveFile.fileId}, jobId=${lambdaJob.jobId}`,
);

return { driveFile, lambdaJob };
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Same null check issue for dispatchLambdaDocumentRender.

Line 2291 also uses this.lambdaService without null check.

🐛 Add null check
   async dispatchLambdaDocumentRender(
     user: User,
     params: { ... },
   ): Promise<{ driveFile: DriveFile; lambdaJob: LambdaJobRecord }> {
+    if (!this.lambdaService) {
+      throw new Error('Lambda service is not available');
+    }
+
     // Determine content type based on format
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async dispatchLambdaDocumentRender(
user: User,
params: {
canvasId: string;
name: string;
sourceKey: string;
format: 'pdf' | 'docx';
options?: {
template?: string;
pageSize?: 'A4' | 'Letter';
};
},
): Promise<{ driveFile: DriveFile; lambdaJob: LambdaJobRecord }> {
// Determine content type based on format
const contentType =
params.format === 'pdf'
? 'application/pdf'
: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document';
// 1. Pre-create DriveFile record
const driveFile = await this.preCreateDriveFileForLambda(user, {
canvasId: params.canvasId,
name: params.name,
contentType,
});
// 2. Dispatch to Lambda
const inputBucket =
this.config.get<string>('objectStorage.minio.internal.bucket') || 'refly-weblink';
const outputBucket = this.config.get<string>('lambda.s3.bucket') || inputBucket;
const lambdaJob = await this.lambdaService.dispatchDocumentRender({
uid: user.uid,
fileId: driveFile.fileId,
s3Input: {
bucket: inputBucket,
key: params.sourceKey,
},
outputBucket,
name: params.name,
format: params.format,
options: params.options,
});
this.logger.info(
`Dispatched Lambda document render: fileId=${driveFile.fileId}, jobId=${lambdaJob.jobId}`,
);
return { driveFile, lambdaJob };
}
async dispatchLambdaDocumentRender(
user: User,
params: {
canvasId: string;
name: string;
sourceKey: string;
format: 'pdf' | 'docx';
options?: {
template?: string;
pageSize?: 'A4' | 'Letter';
};
},
): Promise<{ driveFile: DriveFile; lambdaJob: LambdaJobRecord }> {
if (!this.lambdaService) {
throw new Error('Lambda service is not available');
}
// Determine content type based on format
const contentType =
params.format === 'pdf'
? 'application/pdf'
: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document';
// 1. Pre-create DriveFile record
const driveFile = await this.preCreateDriveFileForLambda(user, {
canvasId: params.canvasId,
name: params.name,
contentType,
});
// 2. Dispatch to Lambda
const inputBucket =
this.config.get<string>('objectStorage.minio.internal.bucket') || 'refly-weblink';
const outputBucket = this.config.get<string>('lambda.s3.bucket') || inputBucket;
const lambdaJob = await this.lambdaService.dispatchDocumentRender({
uid: user.uid,
fileId: driveFile.fileId,
s3Input: {
bucket: inputBucket,
key: params.sourceKey,
},
outputBucket,
name: params.name,
format: params.format,
options: params.options,
});
this.logger.info(
`Dispatched Lambda document render: fileId=${driveFile.fileId}, jobId=${lambdaJob.jobId}`,
);
return { driveFile, lambdaJob };
}
🤖 Prompt for AI Agents
In @apps/api/src/modules/drive/drive.service.ts around lines 2260 - 2309,
dispatchLambdaDocumentRender uses this.lambdaService without guarding for
null/undefined; add a null-check before calling
this.lambdaService.dispatchDocumentRender (e.g., if (!this.lambdaService) throw
or return an appropriate error/result) so the method fails fast with a clear
message when lambdaService is not configured, and keep the rest of the flow
(preCreateDriveFileForLambda, logger.info, and returned { driveFile, lambdaJob
}) intact; reference the dispatchLambdaDocumentRender method and the
lambdaService property when making the change.

Comment on lines +476 to +487
if (!response.Body) {
this.logger.warn({ storageKey }, 'No body in S3 response');
return null;
}

// Convert SDK stream to Node.js Readable
return response.Body as Readable;
} catch (error) {
this.logger.error({ storageKey, error }, 'Failed to get job output from S3');
throw error;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n apps/api/src/modules/lambda/lambda.service.ts | sed -n '460,500p'

Repository: refly-ai/refly

Length of output: 1491


🏁 Script executed:

# Check if there are any imports related to AWS SDK or stream handling in the file
head -50 apps/api/src/modules/lambda/lambda.service.ts

Repository: refly-ai/refly

Length of output: 1660


🌐 Web query:

AWS SDK v3 GetObjectCommand response Body type SdkStream Readable Node.js stream

💡 Result:

Short answer

  • GetObjectCommand.response.Body is a platform‑union stream type (StreamingBlobPayloadOutputTypes / SdkStream). In Node.js it can be a Node Readable (IncomingMessage) but the types are a union with browser Web Streams so TypeScript often won’t let you call Node stream methods directly. [1][2]

How to handle it (concise options)

  1. Narrow the client type so Body is typed as a Node stream:

    • Cast your client to NodeJsClient, then Body is SdkStream and you can use Node stream APIs (pipe, for-await-of, destroy, etc.). Example: const s3 = new S3Client({}) as NodeJsClient; const res = await s3.send(new GetObjectCommand(...)); const stream = res.Body;. [1][3]
  2. Use the SDK stream utilities to convert the SdkStream:

    • Use sdkStreamMixin (from @smithy/util-stream / historically @aws-sdk/util-stream-node) on the returned Body to call transformToString(), transformToByteArray(), or transformToWebStream(). Useful for tests/mocks and when you want the whole payload. Example: const text = await sdkStreamMixin(res.Body!).transformToString();. [2][6]
  3. Treat it as a Node Readable at runtime (practical runtime patterns):

    • If you know you’re on Node, you can cast to stream.Readable and consume it (for-await-of, pipe to file/response, or collect buffers). Example: const body = res.Body as Readable; for await (const chunk of body) { /* chunk is Buffer */ } or accumulate chunks then Buffer.concat(...). [1][4]

Notes / gotchas

  • transformToWebStream requires Node >= 17 (or a polyfill) when converting Readable to WebStream. Tests/mocks often require wrapping a Readable with sdkStreamMixin so the mock looks like the SDK stream. [6][7]
  • SDK docs show the recommended type-narrowing example and mention the SdkStream/StreamingBlobPayload types. [1]

Sources
[1] AWS SDK docs — Create and call service objects (example with NodeJsClient and SdkStream).
[2] Discussions / examples showing sdkStreamMixin and transformToString/transformToByteArray usage (StackOverflow / GitHub history).
[3] Blog post describing casting to NodeJsClient to narrow stream types (Ben Limmer).
[4] StackOverflow examples showing consuming response.Body as Readable (for-await-of / buffer concat).
[6] StackOverflow / issues about sdkStreamMixin, transformToWebStream and Node version note.

If you want, I can paste the minimal TypeScript code for each option (narrowing, sdkStreamMixin, and manual consumption).


Narrow S3Client type or use SDK stream utilities for proper type safety.

response.Body is typed as SdkStream (a platform-union type), not Readable. While the direct cast works at runtime in Node.js, it bypasses TypeScript type safety. Either cast the client to NodeJsClient<S3Client> to narrow the Body type:

const s3 = new S3Client({...}) as NodeJsClient<S3Client>;

Or use sdkStreamMixin from @smithy/util-stream for runtime conversion if working with stream utilities is preferred.

🤖 Prompt for AI Agents
In @apps/api/src/modules/lambda/lambda.service.ts around lines 476 - 487, The S3
response.Body is an SdkStream (platform-union) not a Node Readable, so update
the S3 client creation or response handling in the method that returns the
stream: either narrow the client by casting the created S3Client to
NodeJsClient<S3Client> (so response.Body is typed as Node.js stream) before
calling getObject, or wrap/convert the returned SdkStream using sdkStreamMixin
from @smithy/util-stream (and then call toReadableStream/convert to Node
Readable) instead of directly asserting response.Body as Readable; reference the
S3Client instance creation, the response.Body usage, and import/use of
NodeJsClient<S3Client> or sdkStreamMixin to implement the change.

@mrcfps mrcfps closed this Jan 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants