Skip to content

Commit fcc85cd

Browse files
committed
- Introduced DocEmbedder class for embedding documents and transforming them into the FeatureView schema.
- Added BaseChunker and TextChunker classes for document chunking. - Updated pyproject.toml to include sentence-transformers dependency. - Created a new Jupyter notebook example for using the RAG retriever with document embedding. Signed-off-by: Chaitany patel <patelchaitany93@gmail.com>
1 parent 9c07b4c commit fcc85cd

22 files changed

+6714
-4850
lines changed

examples/rag-retriever/rag_feast_docembedder.ipynb

Lines changed: 647 additions & 0 deletions
Large diffs are not rendered by default.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ qdrant = ["qdrant-client>=1.12.0"]
112112
rag = [
113113
"transformers>=4.36.0",
114114
"datasets>=3.6.0",
115+
"sentence-transformers>=3.0.0"
115116
]
116117
ray = [
117118
'ray>=2.47.0; python_version == "3.10"',

sdk/python/feast/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111

1212
from .aggregation import Aggregation
1313
from .batch_feature_view import BatchFeatureView
14+
from .chunker import BaseChunker, ChunkingConfig, TextChunker
1415
from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource
1516
from .dataframe import DataFrameEngine, FeastDataFrame
17+
from .doc_embedder import DocEmbedder, LogicalLayerFn
18+
from .embedder import BaseEmbedder, EmbeddingConfig, MultiModalEmbedder
1619
from .entity import Entity
1720
from .feature import Feature
1821
from .feature_service import FeatureService
@@ -58,4 +61,12 @@
5861
"AthenaSource",
5962
"Project",
6063
"FeastVectorStore",
64+
"DocEmbedder",
65+
"LogicalLayerFn",
66+
"BaseChunker",
67+
"TextChunker",
68+
"ChunkingConfig",
69+
"BaseEmbedder",
70+
"MultiModalEmbedder",
71+
"EmbeddingConfig",
6172
]

sdk/python/feast/chunker.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from abc import ABC, abstractmethod
2+
from dataclasses import dataclass
3+
from typing import Any, Optional
4+
5+
import pandas as pd
6+
7+
8+
@dataclass
9+
class ChunkingConfig:
10+
chunk_size: int = 100
11+
chunk_overlap: int = 20
12+
min_chunk_size: int = 20
13+
max_chunk_chars: Optional[int] = 500
14+
15+
16+
class BaseChunker(ABC):
17+
"""
18+
Abstract base class for document chunking.
19+
20+
Subclasses implement load_parse_and_chunk() with their own:
21+
- Loading logic
22+
- Parsing logic
23+
- Chunking strategy
24+
"""
25+
26+
def __init__(self, config: Optional[ChunkingConfig] = None):
27+
self.config = config or ChunkingConfig()
28+
29+
@abstractmethod
30+
def load_parse_and_chunk(
31+
self,
32+
source: Any,
33+
source_id: str,
34+
source_column: str,
35+
source_type: Optional[str] = None,
36+
) -> list[dict]:
37+
"""
38+
Load, parse, and chunk a document.
39+
40+
Args:
41+
source: File path, raw text, bytes, etc.
42+
source_id: Document identifier.
43+
source_type: Optional type hint.
44+
source_column: The column containing the document sources.
45+
46+
Returns:
47+
List of chunk dicts with keys:
48+
- chunk_id: str
49+
- original_id: str
50+
- text: str
51+
- chunk_index: int
52+
- (any additional metadata)
53+
"""
54+
pass
55+
56+
def chunk_dataframe(
57+
self,
58+
df: pd.DataFrame,
59+
id_column: str,
60+
source_column: str,
61+
type_column: Optional[str] = None,
62+
) -> pd.DataFrame:
63+
"""
64+
Chunk all documents in a DataFrame.
65+
66+
Args:
67+
df: The DataFrame containing the documents to chunk.
68+
id_column: The column containing the document IDs.
69+
source_column: The column containing the document sources.
70+
type_column: The column containing the document types.
71+
"""
72+
73+
chunks_per_row = df.apply(
74+
lambda row: self.load_parse_and_chunk(
75+
row[source_column],
76+
str(row[id_column]),
77+
source_column,
78+
row[type_column] if type_column else None,
79+
),
80+
axis=1,
81+
)
82+
exploded = chunks_per_row.explode().dropna()
83+
if exploded.empty:
84+
return pd.DataFrame(
85+
columns=["chunk_id", "original_id", source_column, "chunk_index"]
86+
)
87+
return pd.DataFrame(exploded.tolist())
88+
89+
90+
class TextChunker(BaseChunker):
91+
"""Default chunker for plain text. Chunks by word count."""
92+
93+
def load_parse_and_chunk(
94+
self,
95+
source: Any,
96+
source_id: str,
97+
source_column: str,
98+
source_type: Optional[str] = None,
99+
) -> list[dict]:
100+
# Load
101+
text = self._load(source)
102+
103+
# Chunk by words
104+
return self._chunk_by_words(text, source_id, source_column)
105+
106+
def _load(self, source: Any) -> str:
107+
from pathlib import Path
108+
109+
if isinstance(source, Path) and source.exists():
110+
return Path(source).read_text()
111+
if isinstance(source, str):
112+
if source.endswith(".txt") and Path(source).exists():
113+
return Path(source).read_text()
114+
return str(source)
115+
116+
def _chunk_by_words(
117+
self, text: str, source_id: str, source_column: str
118+
) -> list[dict]:
119+
words = text.split()
120+
chunks = []
121+
122+
step = self.config.chunk_size - self.config.chunk_overlap
123+
if step <= 0:
124+
raise ValueError(
125+
f"chunk_overlap ({self.config.chunk_overlap}) must be less than "
126+
f"chunk_size ({self.config.chunk_size})"
127+
)
128+
chunk_index = 0
129+
130+
for i in range(0, len(words), step):
131+
chunk_words = words[i : i + self.config.chunk_size]
132+
133+
if len(chunk_words) < self.config.min_chunk_size:
134+
continue
135+
136+
chunk_text = " ".join(chunk_words)
137+
if self.config.max_chunk_chars:
138+
chunk_text = chunk_text[: self.config.max_chunk_chars]
139+
140+
chunks.append(
141+
{
142+
"chunk_id": f"{source_id}_{chunk_index}",
143+
"original_id": source_id,
144+
source_column: chunk_text,
145+
"chunk_index": chunk_index,
146+
}
147+
)
148+
chunk_index += 1
149+
150+
return chunks

0 commit comments

Comments
 (0)