diff --git a/Directory.Packages.props b/Directory.Packages.props index 6d2fd07..dcdda82 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -43,6 +43,7 @@ + diff --git a/appsettings.Development.json b/appsettings.Development.json index d6f0345..5beaf19 100644 --- a/appsettings.Development.json +++ b/appsettings.Development.json @@ -43,10 +43,18 @@ }, "EdgeCpuWhisper": { "Endpoint": "http://localhost:11434" + }, + "EdgeEmbedding": { + "Endpoint": "http://localhost:11434" } }, "Agents": {} }, + "RagConfig": { + "EmbeddingProvider": "EdgeEmbedding", + "DocumentsPath": "./rag-documents", + "AutoIngestOnStartup": false + }, "ApiAuthConfig": { "Username": "demo", "Password": "demo" diff --git a/appsettings.json b/appsettings.json index 84a9bac..6237a5c 100644 --- a/appsettings.json +++ b/appsettings.json @@ -73,6 +73,11 @@ "Type": "Ollama", "Endpoint": "http://ollama-gpu.utilities.svc:11434", "ModelName": "karanchopda333/whisper" + }, + "EdgeEmbedding": { + "Type": "Ollama", + "Endpoint": "http://ollama-gpu.utilities.svc:11434", + "ModelName": "nomic-embed-text" } }, "PollTtlMs": 3600000, @@ -285,6 +290,31 @@ } } }, + "RagConfig": { + "EmbeddingProvider": "EdgeEmbedding", + "IndexName": "rag-documents", + "Dimension": 768, + "DistanceMetric": "COSINE", + "ChunkSizeTokens": 512, + "ChunkOverlapTokens": 50, + "TopK": 5, + "DocumentsPath": "/data/rag-documents", + "AutoIngestOnStartup": true, + "AgentSources": { + "HeatingAgent": [ + { + "CollectionName": "rag-documents", + "TopK": 5 + } + ], + "AppliancesAgent": [ + { + "CollectionName": "rag-documents", + "TopK": 3 + } + ] + } + }, "SignalRHubConfig": { "SignalRHub": "http://signalrhub.prd.svc:8080", "ConsoleLogIntervalMs": 30000, diff --git a/docker-compose.yml b/docker-compose.yml index bc34717..befae55 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,8 +44,8 @@ services: # ---------- Infrastructure (always started) ---------- redis: - image: redis - command: [ "redis-server", "/usr/local/etc/redis/redis.conf" ] + image: redis/redis-stack-server + command: [ "redis-server", "/usr/local/etc/redis/redis.conf", "--loadmodule", "/opt/redis-stack/lib/redisearch.so" ] ports: - 6379:6379 volumes: @@ -121,7 +121,7 @@ services: condition: service_healthy environment: OLLAMA_HOST: http://ollama:11434 - entrypoint: [ "sh", "-c", "ollama pull qwen3.5:4b" ] + entrypoint: [ "sh", "-c", "ollama pull qwen3.5:4b && ollama pull nomic-embed-text" ] restart: "no" # ---------- Demo profile: signal-cli ---------- @@ -188,6 +188,11 @@ services: CasCap__AIConfig__Providers__EdgeGpuVL__Endpoint: http://ollama:11434 CasCap__AIConfig__Providers__EdgeOllamaCpuVLC__Endpoint: http://ollama:11434 CasCap__AIConfig__Providers__EdgeCpuWhisper__Endpoint: http://ollama:11434 + + # RAG — embedding model and document path + CasCap__AIConfig__Providers__EdgeEmbedding__Endpoint: http://ollama:11434 + CasCap__RagConfig__DocumentsPath: /data/rag-documents + CasCap__RagConfig__EmbeddingProvider: EdgeEmbedding depends_on: redis: condition: service_healthy diff --git a/src/CasCap.App.Server/Program.cs b/src/CasCap.App.Server/Program.cs index 3253f4c..f2a081b 100644 --- a/src/CasCap.App.Server/Program.cs +++ b/src/CasCap.App.Server/Program.cs @@ -149,6 +149,13 @@ if (enabledFeatures.Contains(FeatureNames.Comms)) builder.AddComms(); + if (enabledFeatures.Contains(FeatureNames.Rag)) + { + builder.AddRag(); + mcpBuilder.WithToolsFromAssembly(typeof(RagMcpQueryService).Assembly); + mcpBuilder.WithPromptsFromAssembly(typeof(RagMcpQueryService).Assembly); + } + // Register all AI agent profiles with deferred tool resolution var otelSourceName = AgentExtensions.GetAISourceName(appConfig.MetricNamePrefix); foreach (var (agentName, agentConfig) in aiConfig.Agents.Where(a => a.Value.Enabled)) diff --git a/src/CasCap.SmartHaus/Abstractions/IDocumentIngestionService.cs b/src/CasCap.SmartHaus/Abstractions/IDocumentIngestionService.cs new file mode 100644 index 0000000..ad6d861 --- /dev/null +++ b/src/CasCap.SmartHaus/Abstractions/IDocumentIngestionService.cs @@ -0,0 +1,34 @@ +namespace CasCap.Abstractions; + +/// +/// Abstraction for ingesting documents (PDFs) into the vector store. +/// +public interface IDocumentIngestionService +{ + /// + /// Ingests a PDF document: extracts text, chunks, generates embeddings, and stores in the vector index. + /// + /// Stream containing the PDF content. + /// Human-readable document name. + /// Target vector collection name. + /// Cancellation token. + /// Metadata about the ingested document. + Task IngestDocumentAsync(Stream pdfStream, string documentName, string collectionName, CancellationToken cancellationToken = default); + + /// + /// Ingests all PDF files from a directory into the vector store. + /// + /// Absolute path to the directory containing PDF files. + /// Target vector collection name. + /// Cancellation token. + /// Metadata for each ingested document. + Task> IngestDirectoryAsync(string directoryPath, string collectionName, CancellationToken cancellationToken = default); + + /// + /// Removes a document and all its chunks from the vector store. + /// + /// The document identifier to remove. + /// Target vector collection name. + /// Cancellation token. + Task RemoveDocumentAsync(string documentId, string collectionName, CancellationToken cancellationToken = default); +} diff --git a/src/CasCap.SmartHaus/Abstractions/IDocumentVectorStore.cs b/src/CasCap.SmartHaus/Abstractions/IDocumentVectorStore.cs new file mode 100644 index 0000000..f2a8796 --- /dev/null +++ b/src/CasCap.SmartHaus/Abstractions/IDocumentVectorStore.cs @@ -0,0 +1,42 @@ +namespace CasCap.Abstractions; + +/// +/// Abstraction for storing and searching document chunks in a vector database. +/// +public interface IDocumentVectorStore +{ + /// Ensures the vector collection and its index exist in Redis. + /// Logical name of the collection. + /// Cancellation token. + Task EnsureCollectionAsync(string collectionName, CancellationToken cancellationToken = default); + + /// Upserts pre-embedded document chunks into the vector store. + /// Logical name of the collection. + /// Chunks with embeddings already populated. + /// Cancellation token. + Task UpsertChunksAsync(string collectionName, IReadOnlyList chunks, CancellationToken cancellationToken = default); + + /// Performs a vector similarity search using the provided query embedding. + /// Logical name of the collection. + /// The query vector. + /// Number of results to return. + /// Cancellation token. + /// Ranked search results. + Task> SearchAsync(string collectionName, ReadOnlyMemory queryEmbedding, int topK, CancellationToken cancellationToken = default); + + /// Removes all chunks belonging to a specific document. + /// Logical name of the collection. + /// The document identifier. + /// Cancellation token. + Task RemoveDocumentAsync(string collectionName, string documentId, CancellationToken cancellationToken = default); + + /// Deletes the entire collection and its index. + /// Logical name of the collection. + /// Cancellation token. + Task DeleteCollectionAsync(string collectionName, CancellationToken cancellationToken = default); + + /// Returns metadata for all ingested documents in a collection. + /// Logical name of the collection. + /// Cancellation token. + Task> ListDocumentsAsync(string collectionName, CancellationToken cancellationToken = default); +} diff --git a/src/CasCap.SmartHaus/CasCap.SmartHaus.csproj b/src/CasCap.SmartHaus/CasCap.SmartHaus.csproj index 5ff78ba..769e53f 100644 --- a/src/CasCap.SmartHaus/CasCap.SmartHaus.csproj +++ b/src/CasCap.SmartHaus/CasCap.SmartHaus.csproj @@ -71,6 +71,7 @@ + diff --git a/src/CasCap.SmartHaus/Extensions/EmbeddingGeneratorFactory.cs b/src/CasCap.SmartHaus/Extensions/EmbeddingGeneratorFactory.cs new file mode 100644 index 0000000..46b4170 --- /dev/null +++ b/src/CasCap.SmartHaus/Extensions/EmbeddingGeneratorFactory.cs @@ -0,0 +1,58 @@ +using Microsoft.Extensions.AI; +using OllamaSharp; +using System.ClientModel; + +namespace CasCap.Extensions; + +/// +/// Factory methods for creating instances +/// from . +/// +/// +/// This is a SmartHaus-local helper that mirrors the AgentExtensions.CreateAgent pattern +/// from CasCap.Common.AI. When the shared library adds a CreateEmbeddingGenerator method, +/// this class can be removed in favour of the shared implementation. +/// +public static class EmbeddingGeneratorFactory +{ + /// + /// Creates an from the specified provider configuration. + /// + /// The provider configuration containing endpoint, model, and type. + /// Optional pre-configured HTTP client (e.g. with basic auth for dev Ollama). + /// An embedding generator for the configured provider. + public static IEmbeddingGenerator> CreateEmbeddingGenerator(ProviderConfig provider, HttpClient? httpClient = null) => + provider.Type switch + { + AgentType.Ollama => CreateOllamaEmbeddingGenerator(provider, httpClient), + AgentType.OpenAI => CreateOpenAIEmbeddingGenerator(provider), + AgentType.AzureOpenAI => CreateAzureOpenAIEmbeddingGenerator(provider), + _ => throw new NotSupportedException($"Embedding generation is not supported for provider type '{provider.Type}'."), + }; + + private static IEmbeddingGenerator> CreateOllamaEmbeddingGenerator(ProviderConfig provider, HttpClient? httpClient) + { + var uri = provider.Endpoint ?? new Uri("http://localhost:11434"); + + // OllamaApiClient directly implements IEmbeddingGenerator>. + if (httpClient is not null) + return new OllamaApiClient(httpClient, provider.ModelName); + + return new OllamaApiClient(uri, provider.ModelName); + } + + private static IEmbeddingGenerator> CreateOpenAIEmbeddingGenerator(ProviderConfig provider) + { + var apiKey = provider.ApiKey ?? throw new InvalidOperationException("OpenAI embedding provider requires an ApiKey."); + var client = new OpenAI.OpenAIClient(new ApiKeyCredential(apiKey)); + return client.GetEmbeddingClient(provider.ModelName).AsIEmbeddingGenerator(); + } + + private static IEmbeddingGenerator> CreateAzureOpenAIEmbeddingGenerator(ProviderConfig provider) + { + var endpoint = provider.Endpoint ?? throw new InvalidOperationException("AzureOpenAI embedding provider requires an Endpoint."); + var apiKey = provider.ApiKey ?? throw new InvalidOperationException("AzureOpenAI embedding provider requires an ApiKey."); + var client = new Azure.AI.OpenAI.AzureOpenAIClient(endpoint, new AzureKeyCredential(apiKey)); + return client.GetEmbeddingClient(provider.ModelName).AsIEmbeddingGenerator(); + } +} diff --git a/src/CasCap.SmartHaus/Extensions/McpServiceCollectionExtensions.cs b/src/CasCap.SmartHaus/Extensions/McpServiceCollectionExtensions.cs index ae71367..afb940c 100644 --- a/src/CasCap.SmartHaus/Extensions/McpServiceCollectionExtensions.cs +++ b/src/CasCap.SmartHaus/Extensions/McpServiceCollectionExtensions.cs @@ -1,3 +1,6 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.AI; + namespace CasCap.Extensions; /// @@ -104,4 +107,45 @@ public static void AddMessagingMcpStub(this IServiceCollection services) string.Empty, string.Empty)); } + + /// + /// Registers and its dependencies + /// (, , + /// embedding generator, and configuration) for RAG document management tools. + /// + /// The web application builder. + public static void AddRag(this WebApplicationBuilder builder) + { + builder.Services.AddCasCapConfiguration(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + + // Register IEmbeddingGenerator from the configured provider. + builder.Services.AddSingleton(sp => + { + var ragCfg = sp.GetRequiredService>().Value; + var aiCfg = sp.GetRequiredService>().Value; + + if (!aiCfg.Providers.TryGetValue(ragCfg.EmbeddingProvider, out var providerConfig)) + throw new InvalidOperationException( + $"Embedding provider '{ragCfg.EmbeddingProvider}' not found in AIConfig.Providers."); + + HttpClient? httpClient = null; + if (providerConfig.Type is AgentType.Ollama && builder.Environment.IsDevelopment()) + { + httpClient = new HttpClient + { + BaseAddress = providerConfig.Endpoint, + Timeout = Timeout.InfiniteTimeSpan, + }; + var authOpts = sp.GetRequiredService>().Value; + httpClient.SetBasicAuth(authOpts.Username, authOpts.Password); + } + + return EmbeddingGeneratorFactory.CreateEmbeddingGenerator(providerConfig, httpClient); + }); + + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + } } diff --git a/src/CasCap.SmartHaus/Models/DocumentChunk.cs b/src/CasCap.SmartHaus/Models/DocumentChunk.cs new file mode 100644 index 0000000..49cb4c6 --- /dev/null +++ b/src/CasCap.SmartHaus/Models/DocumentChunk.cs @@ -0,0 +1,29 @@ +namespace CasCap.Models; + +/// +/// A chunked section of an ingested document with its embedding vector, stored in Redis +/// as a hash with a vector field for similarity search. +/// +public class DocumentChunk +{ + /// Unique chunk identifier ({documentId}:{chunkIndex}). + public required string Id { get; init; } + + /// Stable identifier for the source document (derived from the file name). + public required string DocumentId { get; init; } + + /// Human-readable document name (e.g. the original PDF filename). + public required string DocumentName { get; init; } + + /// Plain-text content of this chunk. + public required string Content { get; init; } + + /// One-based page number in the source PDF where this chunk starts. + public int PageNumber { get; init; } + + /// Zero-based index of this chunk within its parent document. + public int ChunkIndex { get; init; } + + /// Embedding vector generated from . + public ReadOnlyMemory Embedding { get; set; } +} diff --git a/src/CasCap.SmartHaus/Models/DocumentInfo.cs b/src/CasCap.SmartHaus/Models/DocumentInfo.cs new file mode 100644 index 0000000..87f1cf4 --- /dev/null +++ b/src/CasCap.SmartHaus/Models/DocumentInfo.cs @@ -0,0 +1,27 @@ +namespace CasCap.Models; + +/// +/// Metadata about an ingested document. +/// +public record DocumentInfo +{ + /// Stable identifier for the document (derived from the file name). + [Description("Unique document identifier.")] + public required string DocumentId { get; init; } + + /// Original file name of the document. + [Description("Original PDF file name.")] + public required string DocumentName { get; init; } + + /// Number of pages extracted from the PDF. + [Description("Total pages in the source PDF.")] + public int PageCount { get; init; } + + /// Number of chunks generated from the document. + [Description("Number of text chunks stored in the vector index.")] + public int ChunkCount { get; init; } + + /// UTC timestamp when the document was ingested. + [Description("When the document was last ingested (UTC).")] + public DateTime IngestedAtUtc { get; init; } +} diff --git a/src/CasCap.SmartHaus/Models/DocumentSearchResult.cs b/src/CasCap.SmartHaus/Models/DocumentSearchResult.cs new file mode 100644 index 0000000..6be3bbe --- /dev/null +++ b/src/CasCap.SmartHaus/Models/DocumentSearchResult.cs @@ -0,0 +1,23 @@ +namespace CasCap.Models; + +/// +/// A single result from a vector similarity search against ingested documents. +/// +public record DocumentSearchResult +{ + /// Human-readable document name. + [Description("Source document name.")] + public required string DocumentName { get; init; } + + /// One-based page number in the source PDF. + [Description("Page number in the source PDF.")] + public int PageNumber { get; init; } + + /// Similarity score (lower is more similar for cosine distance). + [Description("Similarity score — lower values indicate higher relevance.")] + public double Score { get; init; } + + /// The text content of the matched chunk. + [Description("Matched text content from the document.")] + public required string Content { get; init; } +} diff --git a/src/CasCap.SmartHaus/Models/FeatureNames.cs b/src/CasCap.SmartHaus/Models/FeatureNames.cs index 3969b59..2187010 100644 --- a/src/CasCap.SmartHaus/Models/FeatureNames.cs +++ b/src/CasCap.SmartHaus/Models/FeatureNames.cs @@ -64,6 +64,9 @@ public static class FeatureNames /// Single-instance communications and media analysis service — consumes key events and binary media from Redis Streams, routes media to domain agents, and relays notifications via Signal messenger. public const string Comms = nameof(Comms); + /// RAG (Retrieval-Augmented Generation) document ingestion and vector search. + public const string Rag = nameof(Rag); + /// Lightweight feature name used by integration tests to boot the application without activating any hardware features. public const string Test = nameof(Test); } diff --git a/src/CasCap.SmartHaus/Models/_RagConfig.cs b/src/CasCap.SmartHaus/Models/_RagConfig.cs new file mode 100644 index 0000000..01a1b3c --- /dev/null +++ b/src/CasCap.SmartHaus/Models/_RagConfig.cs @@ -0,0 +1,87 @@ +namespace CasCap.Models; + +/// +/// Configuration for the RAG (Retrieval-Augmented Generation) vector storage pipeline. +/// +/// +/// Bound from CasCap:RagConfig. Controls embedding generation, PDF chunking, +/// Redis vector index parameters, and the document source directory. +/// +public record RagConfig : IAppConfig +{ + /// + public static string ConfigurationSectionName => $"{nameof(CasCap)}:{nameof(RagConfig)}"; + + /// + /// Key into identifying the embedding model provider. + /// + /// Defaults to "EdgeEmbedding". + [Required, MinLength(1)] + public string EmbeddingProvider { get; init; } = "EdgeEmbedding"; + + /// Redis Search index name for the document vector collection. + /// Defaults to "rag-documents". + [Required, MinLength(1)] + public string IndexName { get; init; } = "rag-documents"; + + /// Embedding vector dimension. Must match the embedding model output. + /// Defaults to 768 (nomic-embed-text). + [Range(1, 8192)] + public int Dimension { get; init; } = 768; + + /// Distance metric for vector similarity search. + /// Defaults to "COSINE". + [Required, MinLength(1)] + public string DistanceMetric { get; init; } = "COSINE"; + + /// Maximum chunk size in tokens when splitting PDF text. + /// Defaults to 512. + [Range(50, 8192)] + public int ChunkSizeTokens { get; init; } = 512; + + /// Number of overlapping tokens between consecutive chunks. + /// Defaults to 50. + [Range(0, 1024)] + public int ChunkOverlapTokens { get; init; } = 50; + + /// Number of top results returned by vector similarity search. + /// Defaults to 5. + [Range(1, 100)] + public int TopK { get; init; } = 5; + + /// + /// Local filesystem directory containing PDF documents to ingest on startup. + /// + /// Defaults to "/data/rag-documents". + [Required, MinLength(1)] + public string DocumentsPath { get; init; } = "/data/rag-documents"; + + /// + /// Whether to automatically ingest documents from on startup. + /// + /// Defaults to true. + public bool AutoIngestOnStartup { get; init; } = true; + + /// Per-agent RAG source overrides keyed by agent name. + /// + /// When an agent's key appears here, only the specified collections and TopK + /// values apply to that agent's RAG context injection. When absent, the agent + /// uses the default and . + /// + public Dictionary AgentSources { get; init; } = []; +} + +/// +/// Identifies a vector collection and retrieval depth for agent-specific RAG context injection. +/// +public record RagSource +{ + /// Name of the vector collection to search. + [Required, MinLength(1)] + public required string CollectionName { get; init; } + + /// Number of top results to retrieve from this collection. + /// Defaults to 5. + [Range(1, 100)] + public int TopK { get; init; } = 5; +} diff --git a/src/CasCap.SmartHaus/Resources/AppliancesAgent.instructions.md b/src/CasCap.SmartHaus/Resources/AppliancesAgent.instructions.md index a8d121d..e75eee1 100644 --- a/src/CasCap.SmartHaus/Resources/AppliancesAgent.instructions.md +++ b/src/CasCap.SmartHaus/Resources/AppliancesAgent.instructions.md @@ -2,6 +2,8 @@ You are an AI focussed on smart kitchen and laundry appliances connected via Hom You can check appliance status, start programs, and execute actions. +You may also have access to ingested reference documents (e.g. Miele appliance operation manuals) via the search_documents tool. When a user asks about appliance features, programs, troubleshooting, or maintenance, search the documents first to provide accurate manufacturer guidance. + ## Poll rules 1. When presenting choices, ONLY use the create_poll tool — NEVER list options in text. diff --git a/src/CasCap.SmartHaus/Resources/HeatingAgent.instructions.md b/src/CasCap.SmartHaus/Resources/HeatingAgent.instructions.md index 0259112..e51d3ac 100644 --- a/src/CasCap.SmartHaus/Resources/HeatingAgent.instructions.md +++ b/src/CasCap.SmartHaus/Resources/HeatingAgent.instructions.md @@ -2,6 +2,8 @@ You are an AI focussed on the smart home heat pump and KNX heating zones. You have access to tooling to monitor and control the heat pump and floor heating zone temperatures and setpoints. +You may also have access to ingested reference documents (e.g. heat pump setup and configuration manuals) via the search_documents tool. When a user asks about installation, configuration, error codes, or operating procedures, search the documents first to provide accurate manufacturer guidance. + ## Poll rules 1. When presenting choices, ONLY use the create_poll tool — NEVER list options in text. diff --git a/src/CasCap.SmartHaus/Services/Mcp/RagMcpQueryService.cs b/src/CasCap.SmartHaus/Services/Mcp/RagMcpQueryService.cs new file mode 100644 index 0000000..17354ec --- /dev/null +++ b/src/CasCap.SmartHaus/Services/Mcp/RagMcpQueryService.cs @@ -0,0 +1,129 @@ +using Microsoft.Extensions.AI; + +namespace CasCap.Services; + +/// +/// MCP tools for managing RAG document collections — searching, listing, and ingesting documents. +/// +[McpServerToolType] +public class RagMcpQueryService( + ILogger logger, + IDocumentVectorStore vectorStore, + IDocumentIngestionService ingestionSvc, + IEmbeddingGenerator> embeddingGenerator, + IOptions ragConfig) +{ + /// + /// Searches ingested documents for content relevant to the given query using vector similarity. + /// + [McpServerTool] + [Description("Semantic search across ingested PDF documents. Returns the most relevant text passages.")] + public async Task> SearchDocuments( + [Description("Natural-language search query.")] + string query, + [Description("Collection name to search. Omit for the default collection.")] + string? collectionName = null, + [Description("Number of results to return (1–100). Defaults to the configured TopK.")] + int? topK = null, + CancellationToken cancellationToken = default) + { + logger.LogDebug("{ClassName} {MethodName} query={Query}, collection={CollectionName}", + nameof(RagMcpQueryService), nameof(SearchDocuments), query, collectionName); + + var config = ragConfig.Value; + collectionName ??= config.IndexName; + topK ??= config.TopK; + + var embeddingResult = await embeddingGenerator.GenerateAsync(query, cancellationToken: cancellationToken); + return await vectorStore.SearchAsync(collectionName, embeddingResult.Vector, topK.Value, cancellationToken); + } + + /// + /// Lists all available document collections with their document counts. + /// + [McpServerTool] + [Description("Lists all RAG document collections available for search.")] + public async Task> ListDocuments( + [Description("Collection name. Omit for the default collection.")] + string? collectionName = null, + CancellationToken cancellationToken = default) + { + logger.LogDebug("{ClassName} {MethodName} collection={CollectionName}", + nameof(RagMcpQueryService), nameof(ListDocuments), collectionName); + + var config = ragConfig.Value; + collectionName ??= config.IndexName; + + return await vectorStore.ListDocumentsAsync(collectionName, cancellationToken); + } + + /// + /// Triggers ingestion of a PDF document from the configured documents directory. + /// + [McpServerTool] + [Description("Ingests a PDF file from the documents directory into the vector store for RAG search.")] + public async Task IngestDocument( + [Description("PDF file name (relative to the documents directory).")] + string fileName, + [Description("Collection name. Omit for the default collection.")] + string? collectionName = null, + CancellationToken cancellationToken = default) + { + logger.LogDebug("{ClassName} {MethodName} file={FileName}, collection={CollectionName}", + nameof(RagMcpQueryService), nameof(IngestDocument), fileName, collectionName); + + var config = ragConfig.Value; + collectionName ??= config.IndexName; + + var filePath = Path.Combine(config.DocumentsPath, fileName); + if (!File.Exists(filePath)) + throw new FileNotFoundException($"PDF file not found: {fileName}", filePath); + + await using var stream = File.OpenRead(filePath); + return await ingestionSvc.IngestDocumentAsync(stream, fileName, collectionName, cancellationToken); + } + + /// + /// Returns metadata about a specific ingested document. + /// + [McpServerTool] + [Description("Gets metadata about a specific ingested document — page count, chunk count, ingestion time.")] + public async Task GetDocumentInfo( + [Description("Document identifier (derived from file name, e.g. 'my-manual-pdf').")] + string documentId, + [Description("Collection name. Omit for the default collection.")] + string? collectionName = null, + CancellationToken cancellationToken = default) + { + logger.LogDebug("{ClassName} {MethodName} documentId={DocumentId}, collection={CollectionName}", + nameof(RagMcpQueryService), nameof(GetDocumentInfo), documentId, collectionName); + + var config = ragConfig.Value; + collectionName ??= config.IndexName; + + var documents = await vectorStore.ListDocumentsAsync(collectionName, cancellationToken); + return documents.Find(d => string.Equals(d.DocumentId, documentId, StringComparison.OrdinalIgnoreCase)); + } + + /// + /// Removes a document and all its chunks from the vector store. + /// + [McpServerTool] + [Description("Removes a document and all its chunks from the vector store.")] + public async Task RemoveDocument( + [Description("Document identifier to remove.")] + string documentId, + [Description("Collection name. Omit for the default collection.")] + string? collectionName = null, + CancellationToken cancellationToken = default) + { + logger.LogDebug("{ClassName} {MethodName} documentId={DocumentId}, collection={CollectionName}", + nameof(RagMcpQueryService), nameof(RemoveDocument), documentId, collectionName); + + var config = ragConfig.Value; + collectionName ??= config.IndexName; + + await ingestionSvc.RemoveDocumentAsync(documentId, collectionName, cancellationToken); + return true; + } +} diff --git a/src/CasCap.SmartHaus/Services/Rag/PdfDocumentIngestionService.cs b/src/CasCap.SmartHaus/Services/Rag/PdfDocumentIngestionService.cs new file mode 100644 index 0000000..4604e3d --- /dev/null +++ b/src/CasCap.SmartHaus/Services/Rag/PdfDocumentIngestionService.cs @@ -0,0 +1,201 @@ +using Microsoft.Extensions.AI; +using UglyToad.PdfPig; + +namespace CasCap.Services; + +/// +/// PDF ingestion service that extracts text from PDFs, splits into token-aware chunks, +/// generates embeddings via , +/// and stores in the vector store. +/// +public class PdfDocumentIngestionService( + ILogger logger, + IDocumentVectorStore vectorStore, + IEmbeddingGenerator> embeddingGenerator, + IOptions ragConfig + ) : IDocumentIngestionService +{ + /// + public async Task IngestDocumentAsync(Stream pdfStream, string documentName, string collectionName, CancellationToken cancellationToken = default) + { + logger.LogInformation("{ClassName} ingesting {DocumentName} into {CollectionName}", + nameof(PdfDocumentIngestionService), documentName, collectionName); + + var config = ragConfig.Value; + var documentId = GenerateDocumentId(documentName); + + // Remove existing chunks for this document before re-ingesting. + await vectorStore.RemoveDocumentAsync(collectionName, documentId, cancellationToken); + + // Extract text from PDF pages. + var pages = ExtractPages(pdfStream); + logger.LogInformation("{ClassName} extracted {PageCount} pages from {DocumentName}", + nameof(PdfDocumentIngestionService), pages.Count, documentName); + + // Chunk the extracted text. + var chunks = ChunkPages(pages, documentId, documentName, config.ChunkSizeTokens, config.ChunkOverlapTokens); + logger.LogInformation("{ClassName} created {ChunkCount} chunks from {DocumentName}", + nameof(PdfDocumentIngestionService), chunks.Count, documentName); + + if (chunks.Count == 0) + { + logger.LogWarning("{ClassName} no text content extracted from {DocumentName}", + nameof(PdfDocumentIngestionService), documentName); + return new DocumentInfo + { + DocumentId = documentId, + DocumentName = documentName, + PageCount = pages.Count, + ChunkCount = 0, + IngestedAtUtc = DateTime.UtcNow, + }; + } + + // Generate embeddings in batches. + var texts = chunks.Select(c => c.Content).ToList(); + var embeddings = await embeddingGenerator.GenerateAsync(texts, cancellationToken: cancellationToken); + + for (var i = 0; i < chunks.Count; i++) + chunks[i].Embedding = embeddings[i].Vector; + + // Store in vector store. + await vectorStore.EnsureCollectionAsync(collectionName, cancellationToken); + await vectorStore.UpsertChunksAsync(collectionName, chunks, cancellationToken); + + var info = new DocumentInfo + { + DocumentId = documentId, + DocumentName = documentName, + PageCount = pages.Count, + ChunkCount = chunks.Count, + IngestedAtUtc = DateTime.UtcNow, + }; + + // Store metadata. + if (vectorStore is RedisDocumentVectorStore redisStore) + await redisStore.StoreDocumentMetadataAsync(collectionName, info); + + logger.LogInformation("{ClassName} ingested {DocumentName} — {PageCount} pages, {ChunkCount} chunks", + nameof(PdfDocumentIngestionService), documentName, info.PageCount, info.ChunkCount); + + return info; + } + + /// + public async Task> IngestDirectoryAsync(string directoryPath, string collectionName, CancellationToken cancellationToken = default) + { + if (!Directory.Exists(directoryPath)) + { + logger.LogWarning("{ClassName} directory {Path} does not exist, skipping ingestion", + nameof(PdfDocumentIngestionService), directoryPath); + return []; + } + + var pdfFiles = Directory.GetFiles(directoryPath, "*.pdf", SearchOption.AllDirectories); + logger.LogInformation("{ClassName} found {Count} PDF files in {Path}", + nameof(PdfDocumentIngestionService), pdfFiles.Length, directoryPath); + + var results = new List(); + foreach (var pdfFile in pdfFiles) + { + cancellationToken.ThrowIfCancellationRequested(); + try + { + await using var stream = File.OpenRead(pdfFile); + var info = await IngestDocumentAsync(stream, Path.GetFileName(pdfFile), collectionName, cancellationToken); + results.Add(info); + } + catch (Exception ex) + { + logger.LogError(ex, "{ClassName} failed to ingest {File}", nameof(PdfDocumentIngestionService), pdfFile); + } + } + + return results; + } + + /// + public Task RemoveDocumentAsync(string documentId, string collectionName, CancellationToken cancellationToken = default) => + vectorStore.RemoveDocumentAsync(collectionName, documentId, cancellationToken); + + /// Extracts text from each page of a PDF. + private static List<(int PageNumber, string Text)> ExtractPages(Stream pdfStream) + { + var pages = new List<(int, string)>(); + + using var document = PdfDocument.Open(pdfStream); + foreach (var page in document.GetPages()) + { + var text = page.Text; + if (!string.IsNullOrWhiteSpace(text)) + pages.Add((page.Number, text)); + } + + return pages; + } + + /// + /// Approximate number of characters per token for chunk size estimation. + /// Standard English text averages roughly 4 characters per token with most tokenizers. + /// + private const int CharsPerTokenApprox = 4; + + /// + /// Splits extracted pages into chunks using a character-based approximation + /// ( characters ≈ 1 token). Chunks overlap by + /// tokens. + /// + private static List ChunkPages( + List<(int PageNumber, string Text)> pages, + string documentId, + string documentName, + int chunkSizeTokens, + int overlapTokens) + { + var chunks = new List(); + var chunkIndex = 0; + + var chunkSizeChars = chunkSizeTokens * CharsPerTokenApprox; + var overlapChars = overlapTokens * CharsPerTokenApprox; + var stride = chunkSizeChars - overlapChars; + + if (stride <= 0) + throw new ArgumentException( + $"ChunkOverlapTokens ({overlapTokens}) must be less than ChunkSizeTokens ({chunkSizeTokens})."); + + foreach (var (pageNumber, text) in pages) + { + var position = 0; + while (position < text.Length) + { + var length = Math.Min(chunkSizeChars, text.Length - position); + var chunkText = text.Substring(position, length).Trim(); + + if (!string.IsNullOrWhiteSpace(chunkText)) + { + chunks.Add(new DocumentChunk + { + Id = $"{documentId}:{chunkIndex}", + DocumentId = documentId, + DocumentName = documentName, + Content = chunkText, + PageNumber = pageNumber, + ChunkIndex = chunkIndex, + }); + chunkIndex++; + } + + position += stride; + } + } + + return chunks; + } + + /// Generates a stable document ID from the file name. + private static string GenerateDocumentId(string documentName) => + documentName + .Replace(' ', '-') + .Replace('.', '-') + .ToLowerInvariant(); +} diff --git a/src/CasCap.SmartHaus/Services/Rag/RagIngestionBgService.cs b/src/CasCap.SmartHaus/Services/Rag/RagIngestionBgService.cs new file mode 100644 index 0000000..f01180e --- /dev/null +++ b/src/CasCap.SmartHaus/Services/Rag/RagIngestionBgService.cs @@ -0,0 +1,62 @@ +namespace CasCap.Services; + +/// +/// Background service that automatically ingests PDF documents from the configured +/// directory into the Redis vector store on startup. +/// +public class RagIngestionBgService( + ILogger logger, + IDocumentIngestionService ingestionSvc, + IDocumentVectorStore vectorStore, + IOptions ragConfig + ) : IBgFeature +{ + /// + public string FeatureName => FeatureNames.Rag; + + /// + public async Task ExecuteAsync(CancellationToken cancellationToken) + { + var config = ragConfig.Value; + + if (!config.AutoIngestOnStartup) + { + logger.LogInformation("{ClassName} auto-ingestion disabled, idling", nameof(RagIngestionBgService)); + try { await Task.Delay(Timeout.Infinite, cancellationToken); } + catch (OperationCanceledException) { } + return; + } + + if (!Directory.Exists(config.DocumentsPath)) + { + logger.LogWarning("{ClassName} documents directory {Path} does not exist, skipping auto-ingestion", + nameof(RagIngestionBgService), config.DocumentsPath); + try { await Task.Delay(Timeout.Infinite, cancellationToken); } + catch (OperationCanceledException) { } + return; + } + + try + { + logger.LogInformation("{ClassName} starting auto-ingestion from {Path} into {Collection}", + nameof(RagIngestionBgService), config.DocumentsPath, config.IndexName); + + await vectorStore.EnsureCollectionAsync(config.IndexName, cancellationToken); + + var results = await ingestionSvc.IngestDirectoryAsync(config.DocumentsPath, config.IndexName, cancellationToken); + + logger.LogInformation("{ClassName} auto-ingestion complete — {Count} documents ingested", + nameof(RagIngestionBgService), results.Count); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + logger.LogError(ex, "{ClassName} auto-ingestion failed", nameof(RagIngestionBgService)); + } + + // Idle after ingestion. + try { await Task.Delay(Timeout.Infinite, cancellationToken); } + catch (OperationCanceledException) { } + + logger.LogInformation("{ClassName} exiting", nameof(RagIngestionBgService)); + } +} diff --git a/src/CasCap.SmartHaus/Services/Rag/RedisDocumentVectorStore.cs b/src/CasCap.SmartHaus/Services/Rag/RedisDocumentVectorStore.cs new file mode 100644 index 0000000..a8680fe --- /dev/null +++ b/src/CasCap.SmartHaus/Services/Rag/RedisDocumentVectorStore.cs @@ -0,0 +1,244 @@ +using System.Runtime.InteropServices; +using StackExchange.Redis; + +namespace CasCap.Services; + +/// +/// Redis-backed implementation of using Redis Search +/// (FT.CREATE, FT.SEARCH) commands via . +/// +/// +/// Each collection maps to a Redis Search index. Document chunks are stored as hashes +/// with keys following the pattern rag:{collectionName}:{chunkId}. +/// Document metadata is stored as a separate hash at rag:meta:{collectionName}:{documentId}. +/// +public class RedisDocumentVectorStore( + ILogger logger, + IConnectionMultiplexer connectionMultiplexer, + IOptions ragConfig + ) : IDocumentVectorStore +{ + private const string KeyPrefix = "rag"; + private const string MetaPrefix = "rag:meta"; + + private IDatabase Db => connectionMultiplexer.GetDatabase(); + + private IServer Server => + connectionMultiplexer.GetServers().FirstOrDefault() + ?? throw new InvalidOperationException("No Redis server available in the connection multiplexer."); + + private static string ChunkKey(string collectionName, string chunkId) => + $"{KeyPrefix}:{collectionName}:{chunkId}"; + + private static string IndexName(string collectionName) => + $"{KeyPrefix}:{collectionName}:idx"; + + /// + public async Task EnsureCollectionAsync(string collectionName, CancellationToken cancellationToken = default) + { + var db = Db; + var idxName = IndexName(collectionName); + var config = ragConfig.Value; + + try + { + // Check if index already exists. + await db.ExecuteAsync("FT.INFO", idxName); + logger.LogDebug("{ClassName} index {IndexName} already exists", nameof(RedisDocumentVectorStore), idxName); + return; + } + catch (RedisServerException ex) when (ex.Message.Contains("Unknown index name", StringComparison.OrdinalIgnoreCase)) + { + // Index doesn't exist — create it. + } + + var prefix = $"{KeyPrefix}:{collectionName}:"; + await db.ExecuteAsync("FT.CREATE", idxName, + "ON", "HASH", + "PREFIX", "1", prefix, + "SCHEMA", + "DocumentId", "TAG", + "DocumentName", "TEXT", + "Content", "TEXT", + "PageNumber", "NUMERIC", + "ChunkIndex", "NUMERIC", + "Embedding", "VECTOR", "HNSW", "6", + "TYPE", "FLOAT32", + "DIM", config.Dimension.ToString(), + "DISTANCE_METRIC", config.DistanceMetric); + + logger.LogInformation("{ClassName} created index {IndexName} with dimension {Dimension}", + nameof(RedisDocumentVectorStore), idxName, config.Dimension); + } + + /// + public async Task UpsertChunksAsync(string collectionName, IReadOnlyList chunks, CancellationToken cancellationToken = default) + { + var db = Db; + + foreach (var chunk in chunks) + { + var key = ChunkKey(collectionName, chunk.Id); + var embeddingBytes = EmbeddingToBytes(chunk.Embedding); + + await db.HashSetAsync(key, + [ + new HashEntry("DocumentId", chunk.DocumentId), + new HashEntry("DocumentName", chunk.DocumentName), + new HashEntry("Content", chunk.Content), + new HashEntry("PageNumber", chunk.PageNumber), + new HashEntry("ChunkIndex", chunk.ChunkIndex), + new HashEntry("Embedding", embeddingBytes), + ]); + } + + logger.LogInformation("{ClassName} upserted {Count} chunks into {CollectionName}", + nameof(RedisDocumentVectorStore), chunks.Count, collectionName); + } + + /// + public async Task> SearchAsync(string collectionName, ReadOnlyMemory queryEmbedding, int topK, CancellationToken cancellationToken = default) + { + var db = Db; + var idxName = IndexName(collectionName); + var queryBytes = EmbeddingToBytes(queryEmbedding); + + // KNN vector search query. + var query = $"*=>[KNN {topK} @Embedding $vector AS score]"; + + var result = await db.ExecuteAsync("FT.SEARCH", idxName, query, + "PARAMS", "2", "vector", queryBytes, + "SORTBY", "score", + "RETURN", "5", "DocumentName", "Content", "PageNumber", "score", "DocumentId", + "DIALECT", "2"); + + var results = new List(); + var array = (RedisResult[])result!; + + // First element is total count, then alternating key/value pairs. + for (var i = 1; i < array.Length; i += 2) + { + if (i + 1 >= array.Length) + break; + + var fields = (RedisResult[])array[i + 1]!; + var fieldDict = new Dictionary(StringComparer.OrdinalIgnoreCase); + + for (var j = 0; j < fields.Length; j += 2) + fieldDict[fields[j].ToString()!] = fields[j + 1].ToString()!; + + results.Add(new DocumentSearchResult + { + DocumentName = fieldDict.GetValueOrDefault("DocumentName") ?? string.Empty, + Content = fieldDict.GetValueOrDefault("Content") ?? string.Empty, + PageNumber = int.TryParse(fieldDict.GetValueOrDefault("PageNumber"), out var pn) ? pn : 0, + Score = double.TryParse(fieldDict.GetValueOrDefault("score"), out var s) ? s : 0, + }); + } + + logger.LogDebug("{ClassName} search in {CollectionName} returned {Count} results", + nameof(RedisDocumentVectorStore), collectionName, results.Count); + + return results; + } + + /// + public async Task RemoveDocumentAsync(string collectionName, string documentId, CancellationToken cancellationToken = default) + { + var db = Db; + var server = Server; + + // Find and delete all chunk keys belonging to this document. + var keyPattern = $"{KeyPrefix}:{collectionName}:{documentId}:*"; + var deletedCount = 0; + + await foreach (var key in server.KeysAsync(pattern: keyPattern)) + { + await db.KeyDeleteAsync(key); + deletedCount++; + } + + // Remove document metadata. + var metaKey = $"{MetaPrefix}:{collectionName}:{documentId}"; + await db.KeyDeleteAsync(metaKey); + + logger.LogInformation("{ClassName} removed {Count} chunks for document {DocumentId} from {CollectionName}", + nameof(RedisDocumentVectorStore), deletedCount, documentId, collectionName); + } + + /// + public async Task DeleteCollectionAsync(string collectionName, CancellationToken cancellationToken = default) + { + var db = Db; + var server = Server; + var idxName = IndexName(collectionName); + + // Drop the index (DD = delete associated docs). + try + { + await db.ExecuteAsync("FT.DROPINDEX", idxName, "DD"); + } + catch (RedisServerException ex) when (ex.Message.Contains("Unknown index name", StringComparison.OrdinalIgnoreCase)) + { + // Already gone. + } + + // Also clean up metadata keys. + await foreach (var key in server.KeysAsync(pattern: $"{MetaPrefix}:{collectionName}:*")) + await db.KeyDeleteAsync(key); + + logger.LogInformation("{ClassName} deleted collection {CollectionName}", nameof(RedisDocumentVectorStore), collectionName); + } + + /// + public async Task> ListDocumentsAsync(string collectionName, CancellationToken cancellationToken = default) + { + var db = Db; + var server = Server; + var documents = new List(); + + await foreach (var key in server.KeysAsync(pattern: $"{MetaPrefix}:{collectionName}:*")) + { + var hash = await db.HashGetAllAsync(key); + if (hash.Length == 0) + continue; + + var fields = hash.ToDictionary(h => h.Name.ToString(), h => h.Value.ToString()); + documents.Add(new DocumentInfo + { + DocumentId = fields.GetValueOrDefault("DocumentId") ?? string.Empty, + DocumentName = fields.GetValueOrDefault("DocumentName") ?? string.Empty, + PageCount = int.TryParse(fields.GetValueOrDefault("PageCount"), out var pc) ? pc : 0, + ChunkCount = int.TryParse(fields.GetValueOrDefault("ChunkCount"), out var cc) ? cc : 0, + IngestedAtUtc = DateTime.TryParse(fields.GetValueOrDefault("IngestedAtUtc"), out var dt) ? dt : DateTime.MinValue, + }); + } + + return documents; + } + + /// Stores document metadata as a Redis hash. + internal async Task StoreDocumentMetadataAsync(string collectionName, DocumentInfo info) + { + var db = Db; + var metaKey = $"{MetaPrefix}:{collectionName}:{info.DocumentId}"; + + await db.HashSetAsync(metaKey, + [ + new HashEntry("DocumentId", info.DocumentId), + new HashEntry("DocumentName", info.DocumentName), + new HashEntry("PageCount", info.PageCount), + new HashEntry("ChunkCount", info.ChunkCount), + new HashEntry("IngestedAtUtc", info.IngestedAtUtc.ToString("O")), + ]); + } + + /// Converts a float embedding to its binary representation for Redis vector storage. + private static byte[] EmbeddingToBytes(ReadOnlyMemory embedding) + { + var span = embedding.Span; + var bytes = new byte[span.Length * sizeof(float)]; + MemoryMarshal.AsBytes(span).CopyTo(bytes); + return bytes; + } +}