1818// Python bindings release GIL during the run.
1919
2020use std:: f64:: consts:: PI ;
21+ use std:: path:: Path ;
22+ use std:: sync:: Mutex ;
2123use std:: time:: Instant ;
2224
2325use crate :: QdpEngine ;
2426use crate :: dlpack:: DLManagedTensor ;
25- use crate :: error:: Result ;
27+ use crate :: error:: { MahoutError , Result } ;
28+ use crate :: io;
29+ use crate :: reader:: StreamingDataReader ;
30+ use crate :: readers:: ParquetStreamingReader ;
2631
2732/// Configuration for throughput/latency pipeline runs (Python run_throughput_pipeline_py).
2833#[ derive( Clone , Debug ) ]
@@ -58,14 +63,111 @@ pub struct PipelineRunResult {
5863 pub latency_ms_per_vector : f64 ,
5964}
6065
61- /// Data source for the pipeline iterator (Phase 1: Synthetic only; Phase 2: File).
62- #[ derive( Debug ) ]
66+ /// Data source for the pipeline iterator (Phase 1: Synthetic; Phase 2a: InMemory; Phase 2b: Streaming).
6367pub enum DataSource {
6468 Synthetic {
6569 seed : u64 ,
6670 batch_index : usize ,
6771 total_batches : usize ,
6872 } ,
73+ /// Phase 2a: full file loaded once; iterator slices by batch_size.
74+ InMemory {
75+ data : Vec < f64 > ,
76+ cursor : usize ,
77+ num_samples : usize ,
78+ sample_size : usize ,
79+ batches_yielded : usize ,
80+ batch_limit : usize ,
81+ } ,
82+ /// Phase 2b: stream from Parquet in chunks; iterator refills buffer and encodes by batch.
83+ /// Reader is in Mutex so PipelineIterator remains Sync (required by PyO3 pyclass).
84+ Streaming {
85+ reader : Mutex < ParquetStreamingReader > ,
86+ buffer : Vec < f64 > ,
87+ buffer_cursor : usize ,
88+ read_chunk_scratch : Vec < f64 > ,
89+ sample_size : usize ,
90+ batch_limit : usize ,
91+ batches_yielded : usize ,
92+ } ,
93+ }
94+
95+ impl std:: fmt:: Debug for DataSource {
96+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
97+ match self {
98+ DataSource :: Synthetic {
99+ seed,
100+ batch_index,
101+ total_batches,
102+ } => f
103+ . debug_struct ( "Synthetic" )
104+ . field ( "seed" , seed)
105+ . field ( "batch_index" , batch_index)
106+ . field ( "total_batches" , total_batches)
107+ . finish ( ) ,
108+ DataSource :: InMemory {
109+ cursor,
110+ num_samples,
111+ sample_size,
112+ batches_yielded,
113+ batch_limit,
114+ ..
115+ } => f
116+ . debug_struct ( "InMemory" )
117+ . field ( "cursor" , cursor)
118+ . field ( "num_samples" , num_samples)
119+ . field ( "sample_size" , sample_size)
120+ . field ( "batches_yielded" , batches_yielded)
121+ . field ( "batch_limit" , batch_limit)
122+ . finish ( ) ,
123+ DataSource :: Streaming {
124+ buffer,
125+ buffer_cursor,
126+ sample_size,
127+ batch_limit,
128+ batches_yielded,
129+ ..
130+ } => f
131+ . debug_struct ( "Streaming" )
132+ . field ( "buffer_len" , & buffer. len ( ) )
133+ . field ( "buffer_cursor" , buffer_cursor)
134+ . field ( "sample_size" , sample_size)
135+ . field ( "batch_limit" , batch_limit)
136+ . field ( "batches_yielded" , batches_yielded)
137+ . finish ( ) ,
138+ }
139+ }
140+ }
141+
142+ /// Default Parquet row group size for streaming reader (tunable).
143+ const DEFAULT_PARQUET_ROW_GROUP_SIZE : usize = 2048 ;
144+
145+ /// When buffer_cursor >= buffer.len() / BUFFER_COMPACT_DENOM, compact by draining consumed prefix.
146+ const BUFFER_COMPACT_DENOM : usize = 2 ;
147+
148+ /// Returns the path extension as lowercase ASCII (e.g. "parquet"), or None if missing/non-UTF8.
149+ fn path_extension_lower ( path : & Path ) -> Option < String > {
150+ path. extension ( )
151+ . and_then ( |e| e. to_str ( ) )
152+ . map ( |s| s. to_lowercase ( ) )
153+ }
154+
155+ /// Dispatches by path extension to the appropriate io reader. Returns (data, num_samples, sample_size).
156+ /// Unsupported or missing extension returns Err with message listing supported formats.
157+ fn read_file_by_extension ( path : & Path ) -> Result < ( Vec < f64 > , usize , usize ) > {
158+ let ext_lower = path_extension_lower ( path) ;
159+ let ext = ext_lower. as_deref ( ) ;
160+ match ext {
161+ Some ( "parquet" ) => io:: read_parquet_batch ( path) ,
162+ Some ( "arrow" ) | Some ( "feather" ) | Some ( "ipc" ) => io:: read_arrow_ipc_batch ( path) ,
163+ Some ( "npy" ) => io:: read_numpy_batch ( path) ,
164+ Some ( "pt" ) | Some ( "pth" ) => io:: read_torch_batch ( path) ,
165+ Some ( "pb" ) => io:: read_tensorflow_batch ( path) ,
166+ _ => Err ( MahoutError :: InvalidInput ( format ! (
167+ "Unsupported file extension {:?}. Supported: .parquet, .arrow, .feather, .ipc, .npy, .pt, .pth, .pb" ,
168+ path. extension( )
169+ ) ) ) ,
170+ }
69171}
70172
71173/// Stateful iterator that yields one batch DLPack at a time for Python `for` loop consumption.
@@ -77,6 +179,9 @@ pub struct PipelineIterator {
77179 vector_len : usize ,
78180}
79181
182+ /// (batch_data, batch_n, sample_size, num_qubits) from one source pull.
183+ type BatchFromSource = ( Vec < f64 > , usize , usize , usize ) ;
184+
80185impl PipelineIterator {
81186 /// Create a new synthetic-data pipeline iterator.
82187 pub fn new_synthetic ( engine : QdpEngine , config : PipelineConfig ) -> Result < Self > {
@@ -94,26 +199,227 @@ impl PipelineIterator {
94199 } )
95200 }
96201
97- /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted.
98- pub fn next_batch ( & mut self ) -> Result < Option < * mut DLManagedTensor > > {
99- let ( batch_data, num_qubits) = match & mut self . source {
202+ /// Create a pipeline iterator from a file (Phase 2a: load full file then slice by batch).
203+ /// Dispatches by path extension; validates dimensions at construction.
204+ ///
205+ /// Supported extensions: .parquet, .arrow, .feather, .ipc, .npy, .pt, .pth, .pb.
206+ /// For file source, `batch_limit` caps batches yielded (e.g. for testing); use `usize::MAX` to iterate until EOF.
207+ pub fn new_from_file < P : AsRef < Path > > (
208+ engine : QdpEngine ,
209+ path : P ,
210+ config : PipelineConfig ,
211+ batch_limit : usize ,
212+ ) -> Result < Self > {
213+ let path = path. as_ref ( ) ;
214+ let ( data, num_samples, sample_size) = read_file_by_extension ( path) ?;
215+ let vector_len = vector_len ( config. num_qubits , & config. encoding_method ) ;
216+
217+ // Dimension validation at construction.
218+ if sample_size != vector_len {
219+ return Err ( MahoutError :: InvalidInput ( format ! (
220+ "File feature length {} does not match vector_len {} for num_qubits={}, encoding={}" ,
221+ sample_size, vector_len, config. num_qubits, config. encoding_method
222+ ) ) ) ;
223+ }
224+ if data. len ( ) != num_samples * sample_size {
225+ return Err ( MahoutError :: InvalidInput ( format ! (
226+ "File data length {} is not num_samples ({}) * sample_size ({})" ,
227+ data. len( ) ,
228+ num_samples,
229+ sample_size
230+ ) ) ) ;
231+ }
232+
233+ let source = DataSource :: InMemory {
234+ data,
235+ cursor : 0 ,
236+ num_samples,
237+ sample_size,
238+ batches_yielded : 0 ,
239+ batch_limit,
240+ } ;
241+ Ok ( Self {
242+ engine,
243+ config,
244+ source,
245+ vector_len,
246+ } )
247+ }
248+
249+ /// Create a pipeline iterator from a Parquet file using streaming read (Phase 2b).
250+ /// Only `.parquet` is supported; reduces memory for large files by reading in chunks.
251+ /// Validates sample_size == vector_len after the first chunk.
252+ pub fn new_from_file_streaming < P : AsRef < Path > > (
253+ engine : QdpEngine ,
254+ path : P ,
255+ config : PipelineConfig ,
256+ batch_limit : usize ,
257+ ) -> Result < Self > {
258+ let path = path. as_ref ( ) ;
259+ if path_extension_lower ( path) . as_deref ( ) != Some ( "parquet" ) {
260+ return Err ( MahoutError :: InvalidInput ( format ! (
261+ "Streaming file loader supports only .parquet; got extension {:?}. Use .source_file(path) for other formats." ,
262+ path. extension( )
263+ ) ) ) ;
264+ }
265+
266+ let mut reader = ParquetStreamingReader :: new ( path, Some ( DEFAULT_PARQUET_ROW_GROUP_SIZE ) ) ?;
267+ let vector_len = vector_len ( config. num_qubits , & config. encoding_method ) ;
268+
269+ // Read first chunk to learn sample_size; reuse as initial buffer.
270+ const INITIAL_CHUNK_CAP : usize = 64 * 1024 ;
271+ let mut buffer = vec ! [ 0.0 ; INITIAL_CHUNK_CAP ] ;
272+ let written = reader. read_chunk ( & mut buffer) ?;
273+ if written == 0 {
274+ return Err ( MahoutError :: InvalidInput (
275+ "Parquet file is empty or contains no data." . to_string ( ) ,
276+ ) ) ;
277+ }
278+ let sample_size = reader. get_sample_size ( ) . ok_or_else ( || {
279+ MahoutError :: InvalidInput (
280+ "Parquet streaming reader did not set sample_size after first chunk." . to_string ( ) ,
281+ )
282+ } ) ?;
283+
284+ if sample_size != vector_len {
285+ return Err ( MahoutError :: InvalidInput ( format ! (
286+ "File feature length {} does not match vector_len {} for num_qubits={}, encoding={}" ,
287+ sample_size, vector_len, config. num_qubits, config. encoding_method
288+ ) ) ) ;
289+ }
290+
291+ buffer. truncate ( written) ;
292+ let read_chunk_scratch = vec ! [ 0.0 ; INITIAL_CHUNK_CAP ] ;
293+
294+ let source = DataSource :: Streaming {
295+ reader : Mutex :: new ( reader) ,
296+ buffer,
297+ buffer_cursor : 0 ,
298+ read_chunk_scratch,
299+ sample_size,
300+ batch_limit,
301+ batches_yielded : 0 ,
302+ } ;
303+ Ok ( Self {
304+ engine,
305+ config,
306+ source,
307+ vector_len,
308+ } )
309+ }
310+
311+ /// Yields the next batch data from the current source; `None` when exhausted.
312+ /// Returns (batch_data, batch_n, sample_size, num_qubits).
313+ fn take_batch_from_source ( & mut self ) -> Result < Option < BatchFromSource > > {
314+ Ok ( match & mut self . source {
100315 DataSource :: Synthetic {
101316 batch_index,
102317 total_batches,
103318 ..
104319 } => {
105320 if * batch_index >= * total_batches {
106- return Ok ( None ) ;
321+ None
322+ } else {
323+ let data = generate_batch ( & self . config , * batch_index, self . vector_len ) ;
324+ * batch_index += 1 ;
325+ Some ( (
326+ data,
327+ self . config . batch_size ,
328+ self . vector_len ,
329+ self . config . num_qubits as usize ,
330+ ) )
331+ }
332+ }
333+ DataSource :: InMemory {
334+ data,
335+ cursor,
336+ sample_size,
337+ batches_yielded,
338+ batch_limit,
339+ ..
340+ } => {
341+ if * batches_yielded >= * batch_limit {
342+ None
343+ } else {
344+ let remaining = ( data. len ( ) - * cursor) / * sample_size;
345+ if remaining == 0 {
346+ None
347+ } else {
348+ let batch_n = remaining. min ( self . config . batch_size ) ;
349+ let start = * cursor;
350+ let end = start + batch_n * * sample_size;
351+ * cursor = end;
352+ * batches_yielded += 1 ;
353+ let slice = data[ start..end] . to_vec ( ) ;
354+ Some ( (
355+ slice,
356+ batch_n,
357+ * sample_size,
358+ self . config . num_qubits as usize ,
359+ ) )
360+ }
107361 }
108- let data = generate_batch ( & self . config , * batch_index, self . vector_len ) ;
109- * batch_index += 1 ;
110- ( data, self . config . num_qubits as usize )
111362 }
363+ DataSource :: Streaming {
364+ reader,
365+ buffer,
366+ buffer_cursor,
367+ read_chunk_scratch,
368+ sample_size,
369+ batch_limit,
370+ batches_yielded,
371+ } => {
372+ if * batches_yielded >= * batch_limit {
373+ None
374+ } else {
375+ let required = self . config . batch_size * * sample_size;
376+ while ( buffer. len ( ) - * buffer_cursor) < required {
377+ let r = reader. get_mut ( ) . map_err ( |e| {
378+ MahoutError :: Io ( format ! ( "Streaming reader mutex poisoned: {}" , e) )
379+ } ) ?;
380+ let written = r. read_chunk ( read_chunk_scratch) ?;
381+ if written == 0 {
382+ break ;
383+ }
384+ buffer. extend_from_slice ( & read_chunk_scratch[ ..written] ) ;
385+ }
386+ let available = buffer. len ( ) - * buffer_cursor;
387+ let available_samples = available / * sample_size;
388+ if available_samples == 0 {
389+ None
390+ } else {
391+ let batch_n = available_samples. min ( self . config . batch_size ) ;
392+ let start = * buffer_cursor;
393+ let end = start + batch_n * * sample_size;
394+ * buffer_cursor = end;
395+ * batches_yielded += 1 ;
396+ let slice = buffer[ start..end] . to_vec ( ) ;
397+ if * buffer_cursor >= buffer. len ( ) / BUFFER_COMPACT_DENOM {
398+ buffer. drain ( ..* buffer_cursor) ;
399+ * buffer_cursor = 0 ;
400+ }
401+ Some ( (
402+ slice,
403+ batch_n,
404+ * sample_size,
405+ self . config . num_qubits as usize ,
406+ ) )
407+ }
408+ }
409+ }
410+ } )
411+ }
412+
413+ /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted.
414+ pub fn next_batch ( & mut self ) -> Result < Option < * mut DLManagedTensor > > {
415+ let Some ( ( batch_data, batch_n, sample_size, num_qubits) ) = self . take_batch_from_source ( ) ?
416+ else {
417+ return Ok ( None ) ;
112418 } ;
113419 let ptr = self . engine . encode_batch (
114420 & batch_data,
115- self . config . batch_size ,
116- self . vector_len ,
421+ batch_n ,
422+ sample_size ,
117423 num_qubits,
118424 & self . config . encoding_method ,
119425 ) ?;
0 commit comments