From 75c613305b7425789aa3a3a39040061e986cfb56 Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Mon, 30 Mar 2026 14:56:19 -0700 Subject: [PATCH 01/10] feat(smurf_util): Fast cython implementation of stream data reader --- .gitignore | 5 + pyproject.toml | 9 + python/pysmurf/client/util/smurf_util.py | 231 +++++++----- .../client/util/stream_data_reader.pyx | 347 ++++++++++++++++++ 4 files changed, 491 insertions(+), 101 deletions(-) create mode 100644 python/pysmurf/client/util/stream_data_reader.pyx diff --git a/.gitignore b/.gitignore index 30df1179..b5e1e048 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,11 @@ python/pysmurf/_version.py *.dat *.png +# Cython build artifacts +*.c +*.so +*.html + # Editor *~ \#* diff --git a/pyproject.toml b/pyproject.toml index 21267b79..98743930 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,9 @@ requires = [ "hatch-vcs", "hatchling", + "hatch-cython", + "Cython>=0.29.0", + "numpy>=1.19.0", ] build-backend = "hatchling.build" @@ -73,3 +76,9 @@ include = [ packages = [ "/python/pysmurf", ] + +[tool.hatch.build.hooks.cython] +include_numpy = true + +# Cython compiler directives +directives = { boundscheck = false, wraparound = false, nonecheck = false, language_level = 3 } \ No newline at end of file diff --git a/python/pysmurf/client/util/smurf_util.py b/python/pysmurf/client/util/smurf_util.py index 553e73e5..b262972e 100644 --- a/python/pysmurf/client/util/smurf_util.py +++ b/python/pysmurf/client/util/smurf_util.py @@ -29,6 +29,13 @@ from pysmurf.client.util.SmurfFileReader import SmurfStreamReader from pysmurf.client.util.pub import set_action +# Try to import optimized Cython version, fall back to pure Python if not available +try: + from pysmurf.client.util.stream_data_reader import read_stream_data_cython + CYTHON_AVAILABLE = True +except ImportError: + CYTHON_AVAILABLE = False + class SmurfUtilMixin(SmurfBase): @set_action() @@ -1181,108 +1188,130 @@ def read_stream_data(self, datafile, channel=None, if channel is not None: self.log(f'Only reading channel {channel}') - # Flag to indicate we are about the read the fist frame from the disk - # The number of channel will be extracted from the first frame and the - # data structures will be build based on that - first_read = True - phase = [] - t = [] - with SmurfStreamReader(datafile, - isRogue=True, metaEnable=True) as file: - for header, data in file.records(): - if first_read: - # Update flag, so that we don't do this code again - first_read = False - - # Read in all used channels by default - if channel is None: - channel = np.arange(header.number_of_channels) - - channel = np.ravel(np.asarray(channel)) - n_chan = len(channel) - - # Indexes for input channels - channel_mask = np.zeros(n_chan, dtype=int) - for i, c in enumerate(channel): - channel_mask[i] = c - - #initialize data structure - phase=list() - if IQ_mode: - if n_chan % 2 != 0: - self.log("WARNING: it seems unlikely this dataset was taken in IQ streaming mode: there are an odd number of channels stored.") - self.log("removing last channel") - channel = channel[:-1] - for _,_ in enumerate(channel[::2]): - phase.append(list()) - ## IQ mode will log consecutive channels, - ## with channel A & channel A+1 corresponding to I and Q - ## want to condense those 2 channels into the original IQ data. - for i,j in enumerate(channel[::2]): - phase[i].append(data[j]+1j*data[j+1]) - else: - for _,_ in enumerate(channel): - phase.append(list()) - for i,_ in enumerate(channel): - phase[i].append(data[i]) - - t = [header.timestamp] - if return_header or return_tes_bias: - tmp_tes_bias = np.array(header.tesBias) - tes_bias = np.zeros((0,16)) - - # Get header values if requested - if return_header or return_tes_bias: - tmp_header_dict = {} - header_dict = {} - for i, h in enumerate(header._fields): - tmp_header_dict[h] = np.array(header[i]) - header_dict[h] = np.array([], - dtype=type(header[i])) - tmp_header_dict['tes_bias'] = np.array([header.tesBias]) - - - # Already loaded 1 element - counter = 1 - else: - if IQ_mode: - for i,j in enumerate(channel[::2]): - phase[i].append(data[j]+1j*data[j+1]) + # Use fully Cython-optimized version if available + if CYTHON_AVAILABLE: + if write_log: + self.log('Using Cython-optimized data reader (full C implementation)') + + # This version does ALL file I/O in C + t, phase, headers = read_stream_data_cython( + datafile, + channel=channel, + IQ_mode=IQ_mode, + return_header=return_header, + return_tes_bias=return_tes_bias, + write_log=write_log + ) + header_dict = {} + if return_header: + for k in headers[0].keys(): + header_dict[k] = np.array([h[k] for h in headers]) + + # Pure Python fallback + else: + if write_log: + self.log('Using pure Python data reader (Cython not available)') + # Fall back to original Python implementation + # Flag to indicate we are about the read the fist frame from the disk + # The number of channel will be extracted from the first frame and the + # data structures will be build based on that + first_read = True + with SmurfStreamReader(datafile, + isRogue=True, metaEnable=True) as file: + for header, data in file.records(): + if first_read: + # Update flag, so that we don't do this code again + first_read = False + + # Read in all used channels by default + if channel is None: + channel = np.arange(header.number_of_channels) + + channel = np.ravel(np.asarray(channel)) + n_chan = len(channel) + + # Indexes for input channels + channel_mask = np.zeros(n_chan, dtype=int) + for i, c in enumerate(channel): + channel_mask[i] = c + + #initialize data structure + phase=list() + if IQ_mode: + if n_chan % 2 != 0: + self.log("WARNING: it seems unlikely this dataset was taken in IQ streaming mode: there are an odd number of channels stored.") + self.log("removing last channel") + channel = channel[:-1] + for _,_ in enumerate(channel[::2]): + phase.append(list()) + ## IQ mode will log consecutive channels, + ## with channel A & channel A+1 corresponding to I and Q + ## want to condense those 2 channels into the original IQ data. + for i,j in enumerate(channel[::2]): + phase[i].append(data[j]+1j*data[j+1]) + else: + for _,_ in enumerate(channel): + phase.append(list()) + for i,_ in enumerate(channel): + phase[i].append(data[i]) + + t = [header.timestamp] + if return_header or return_tes_bias: + tmp_tes_bias = np.array(header.tesBias) + tes_bias = np.zeros((0,16)) + + # Get header values if requested + if return_header or return_tes_bias: + tmp_header_dict = {} + header_dict = {} + for i, h in enumerate(header._fields): + tmp_header_dict[h] = np.array(header[i]) + header_dict[h] = np.array([], + dtype=type(header[i])) + tmp_header_dict['tes_bias'] = np.array([header.tesBias]) + + + # Already loaded 1 element + counter = 1 else: - for i in range(n_chan): - phase[i].append(data[i]) - - t.append(header.timestamp) - - if return_header or return_tes_bias: - for i, h in enumerate(header._fields): - tmp_header_dict[h] = np.append(tmp_header_dict[h], - header[i]) - tmp_tes_bias = np.vstack((tmp_tes_bias, header.tesBias)) - - if counter % n_max == n_max - 1: - if write_log: - self.log(f'{counter+1} elements loaded') - - if return_header: - for k in header_dict.keys(): - header_dict[k] = np.append(header_dict[k], - tmp_header_dict[k]) - tmp_header_dict[k] = \ - np.array([], - dtype=type(header_dict[k][0])) - print(np.shape(tes_bias), np.shape(tmp_tes_bias)) - tes_bias = np.vstack((tes_bias, tmp_tes_bias)) - tmp_tes_bias = np.zeros((0, 16)) - - elif return_tes_bias: - tes_bias = np.vstack((tes_bias, tmp_tes_bias)) - tmp_tes_bias = np.zeros((0, 16)) - - counter += 1 - - phase=np.array(phase) - t=np.array(t) + if IQ_mode: + for i,j in enumerate(channel[::2]): + phase[i].append(data[j]+1j*data[j+1]) + else: + for i in range(n_chan): + phase[i].append(data[i]) + + t.append(header.timestamp) + + if return_header or return_tes_bias: + for i, h in enumerate(header._fields): + tmp_header_dict[h] = np.append(tmp_header_dict[h], + header[i]) + tmp_tes_bias = np.vstack((tmp_tes_bias, header.tesBias)) + + if counter % n_max == n_max - 1: + if write_log: + self.log(f'{counter+1} elements loaded') + + if return_header: + for k in header_dict.keys(): + header_dict[k] = np.append(header_dict[k], + tmp_header_dict[k]) + tmp_header_dict[k] = \ + np.array([], + dtype=type(header_dict[k][0])) + print(np.shape(tes_bias), np.shape(tmp_tes_bias)) + tes_bias = np.vstack((tes_bias, tmp_tes_bias)) + tmp_tes_bias = np.zeros((0, 16)) + + elif return_tes_bias: + tes_bias = np.vstack((tes_bias, tmp_tes_bias)) + tmp_tes_bias = np.zeros((0, 16)) + + counter += 1 + + phase=np.array(phase) + t=np.array(t) if return_header: for k in header_dict.keys(): diff --git a/python/pysmurf/client/util/stream_data_reader.pyx b/python/pysmurf/client/util/stream_data_reader.pyx new file mode 100644 index 00000000..ee93b6b3 --- /dev/null +++ b/python/pysmurf/client/util/stream_data_reader.pyx @@ -0,0 +1,347 @@ +# cython: language_level=3 +# cython: boundscheck=False +# cython: wraparound=False +# cython: cdivision=True +# cython: initializedcheck=False + +""" +Cython-optimized stream data reader for pysmurf +This module provides high-performance data reading for SMuRF stream data. +All file I/O and binary parsing is done in C for maximum performance. +""" + +import numpy as np +cimport numpy as cnp +from libc.stdlib cimport malloc, free +from libc.math cimport pi +from libc.stdio cimport (FILE, fopen, fclose, fread, fseek, ftell, + SEEK_END, SEEK_SET, SEEK_CUR, feof) +from libc.string cimport memcpy +from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t, int32_t + +cnp.import_array() + +# Frame Format Constants +cdef int SMURF_HEADER_SIZE = 128 +cdef int ROGUE_HEADER_SIZE = 8 +cdef int SMURF_CHANNEL_SIZE = 4 + +# C structures for binary parsing +cdef struct RogueHeader: + uint32_t size + uint16_t flags + uint8_t error + uint8_t channel + +cdef struct SmurfHeader: + uint8_t protocol_version + uint8_t crate_id + uint8_t slot_number + uint8_t timing_cond + uint32_t number_of_channels + uint64_t timestamp + int32_t flux_ramp_increment + int32_t flux_ramp_offset + uint32_t counter_0 + uint32_t counter_1 + uint64_t counter_2 + uint32_t reset_bits + uint32_t frame_counter + uint32_t tes_relays_config + uint64_t external_time_raw + uint8_t control_field + uint8_t test_params + uint16_t num_rows + uint16_t num_rows_reported + uint16_t row_length + uint16_t data_rate + + +cdef inline void parse_rogue_header(uint8_t* data, RogueHeader* header) noexcept nogil: + """Parse Rogue header from raw bytes - inline for speed""" + # Little-endian parsing + header.size = ((data[0]) | + (data[1] << 8) | + (data[2] << 16) | + (data[3] << 24)) + header.flags = (data[4]) | (data[5] << 8) + header.error = data[6] + header.channel = data[7] + + +cdef inline void parse_smurf_header(uint8_t* data, SmurfHeader* header) noexcept nogil: + """Parse SMURF header from raw bytes - inline for speed""" + cdef int pos = 0 + + # Parse basic fields (4 bytes) + header.protocol_version = data[0] + header.crate_id = data[1] + header.slot_number = data[2] + header.timing_cond = data[3] + pos = 4 + + # number_of_channels (4 bytes, little-endian uint32) + header.number_of_channels = ((data[pos]) | + (data[pos+1] << 8) | + (data[pos+2] << 16) | + (data[pos+3] << 24)) + pos += 4 + + # Skip 40 bytes (TesBias) + pos += 40 + + # timestamp (8 bytes, little-endian uint64) + header.timestamp = ((data[pos]) | + (data[pos+1] << 8) | + (data[pos+2] << 16) | + (data[pos+3] << 24) | + (data[pos+4] << 32) | + (data[pos+5] << 40) | + (data[pos+6] << 48) | + (data[pos+7] << 56)) + pos += 8 + + # flux_ramp_increment (4 bytes, int32) + header.flux_ramp_increment = (((data[pos]) | + (data[pos+1] << 8) | + (data[pos+2] << 16) | + (data[pos+3] << 24))) + pos += 4 + + # flux_ramp_offset (4 bytes, int32) + header.flux_ramp_offset = (((data[pos]) | + (data[pos+1] << 8) | + (data[pos+2] << 16) | + (data[pos+3] << 24))) + pos += 4 + + # Remaining fields follow similar pattern... + # For brevity, we'll just extract the essentials needed for processing + + +cdef class FastSmurfReader: + """ + Fast Cython-based SMURF stream data reader + Performs all file I/O and parsing in C for maximum speed + """ + cdef FILE* file_ptr + cdef bytes filename + cdef long file_size + cdef bint is_rogue + cdef long records_read + cdef uint8_t header_buffer[128] + cdef uint8_t rogue_buffer[8] + + def __cinit__(self, str filename, bint is_rogue=True): + self.filename = filename.encode('utf-8') + self.is_rogue = is_rogue + self.records_read = 0 + self.file_ptr = NULL + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + cdef void open(self) except *: + """Open the file and get size""" + self.file_ptr = fopen(self.filename, "rb") + if self.file_ptr == NULL: + raise IOError(f"Cannot open file: {self.filename.decode('utf-8')}") + + # Get file size + fseek(self.file_ptr, 0, SEEK_END) + self.file_size = ftell(self.file_ptr) + fseek(self.file_ptr, 0, SEEK_SET) + + cdef void close(self) noexcept nogil: + """Close the file""" + if self.file_ptr != NULL: + fclose(self.file_ptr) + self.file_ptr = NULL + + def read_all_records(self): + """ + Read all records from file and return as numpy arrays + This is the main high-performance function + + Returns + ------- + timestamps : ndarray + Array of timestamps + data : ndarray + 2D array of channel data [n_records, n_channels] + n_channels : int + Number of channels + """ + # Pre-allocate with estimates + cdef list timestamps = [] + cdef list data_list = [] + cdef list headers = [] + + cdef RogueHeader rogue_hdr + cdef cnp.ndarray[cnp.int32_t, ndim=1] channel_data + cdef size_t bytes_read + cdef long rec_end + cdef uint32_t rogue_payload + cdef int n_channels + cdef bint found_data + + # Read records + while True: + # Check if at end of file + if ftell(self.file_ptr) >= self.file_size: + break + + # Process Rogue header if needed + if self.is_rogue: + found_data = False + while not found_data: + # Check for EOF + if ftell(self.file_ptr) >= self.file_size: + break + + # Read Rogue header + bytes_read = fread(self.rogue_buffer, 1, ROGUE_HEADER_SIZE, self.file_ptr) + if bytes_read != ROGUE_HEADER_SIZE: + break + + # Parse header + with nogil: + parse_rogue_header(self.rogue_buffer, &rogue_hdr) + + rogue_payload = rogue_hdr.size - 4 + rec_end = ftell(self.file_ptr) + rogue_payload + + # If data channel, process it + if rogue_hdr.channel == 0: + found_data = True + break + else: + # Skip non-data channels + fseek(self.file_ptr, rec_end, SEEK_SET) + + if not found_data: + break + + # Read SMURF header + bytes_read = fread(self.header_buffer, 1, SMURF_HEADER_SIZE, self.file_ptr) + if bytes_read != SMURF_HEADER_SIZE: + break + + # Parse SMURF header + cdef SmurfHeader smurf_hdr + with nogil: + parse_smurf_header(self.header_buffer, &smurf_hdr) + + n_channels = smurf_hdr.number_of_channels + + # Read channel data using numpy fromfile for efficiency + channel_data = np.fromfile(self.filename, + dtype=np.int32, + count=n_channels, + offset=ftell(self.file_ptr)) + + # Advance file pointer + fseek(self.file_ptr, n_channels * SMURF_CHANNEL_SIZE, SEEK_CUR) + + # Store data + timestamps.append(smurf_hdr.timestamp) + data_list.append(channel_data) + headers.append(smurf_hdr) + + self.records_read += 1 + + # Convert to numpy arrays + timestamps_array = np.array(timestamps, dtype=np.uint64) + data_array = np.array(data_list, dtype=np.int32) + + return timestamps_array, data_array, headers, n_channels + + +def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False, + bint return_tes_bias=False, bint write_log=True): + """ + Ultra-fast Cython implementation that reads entire file in C. + This completely replaces the Python loop over SmurfStreamReader.records() + + Parameters + ---------- + datafile : str + Path to the data file + channel : array-like or None + Channels to read (if None, reads all) + IQ_mode : bool + Whether data is in IQ streaming mode + return_tes_bias : bool + Whether to return TES bias data + write_log : bool + Whether to print progress messages + + Returns + ------- + t : ndarray + Timestamps + phase : ndarray + Phase data (or IQ data if in IQ mode) + headers : list + SMuRF headers. + """ + cdef: + FastSmurfReader reader + cnp.ndarray[cnp.uint64_t, ndim=1] timestamps + cnp.ndarray[cnp.int32_t, ndim=2] data_array + cnp.ndarray[cnp.float64_t, ndim=2] phase + cnp.ndarray[cnp.complex128_t, ndim=2] iq_data + int n_channels, n_records + int i, j, chan_idx + cnp.ndarray[cnp.int64_t, ndim=1] channel_indices + + if write_log: + print(f"Reading {datafile} with Cython optimization...") + + # Open file and read all records in C + reader = FastSmurfReader(datafile, is_rogue=True) + with reader: + timestamps, data_array, headers, n_channels = reader.read_all_records() + + n_records = len(timestamps) + + if write_log: + print(f"Read {n_records} records with {n_channels} channels") + + # Process channel selection + if channel is None: + channel_indices = np.arange(n_channels, dtype=np.int64) + else: + channel_indices = np.asarray(channel, dtype=np.int64) + + # Extract selected channels and convert to phase + if IQ_mode: + # IQ mode: pair consecutive channels + # TODO add some checks + n_output_channels = len(channel_indices) // 2 + iq_data = np.empty((n_output_channels, n_records), dtype=np.complex128) + + for i in range(n_records): + for j in range(n_output_channels): + chan_idx = channel_indices[j * 2] + iq_data[j, i] = data_array[i, chan_idx] + 1j * data_array[i, chan_idx + 1] + + return timestamps, iq_data, headers + else: + # Normal mode: convert to phase + n_output_channels = len(channel_indices) + phase = np.empty((n_output_channels, n_records), dtype=np.float64) + + for i in range(n_records): + for j in range(n_output_channels): + chan_idx = channel_indices[j] + phase[j, i] = data_array[i, chan_idx] / (2.0**15) * pi + + return timestamps, phase, headers + + + From be93c37961c3dd48b2b0b351dde9e7732450b265 Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Tue, 7 Apr 2026 11:51:03 -0700 Subject: [PATCH 02/10] Add a cython build step to pyproject.toml --- .gitignore | 1 + hatch_build.py | 41 +++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 16 ++++++++++------ 3 files changed, 52 insertions(+), 6 deletions(-) create mode 100644 hatch_build.py diff --git a/.gitignore b/.gitignore index b5e1e048..f331a2f2 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ python/pysmurf/_version.py # Cython build artifacts *.c +*.cpp *.so *.html diff --git a/hatch_build.py b/hatch_build.py new file mode 100644 index 00000000..749fec1f --- /dev/null +++ b/hatch_build.py @@ -0,0 +1,41 @@ +"""Hook for hatchling to build Cython extensions""" +from hatchling.builders.hooks.plugin.interface import BuildHookInterface +from Cython.Build import cythonize +from setuptools import Extension +from setuptools.command.build_ext import build_ext +from setuptools import Distribution +import numpy + + +class CythonHook(BuildHookInterface): + + def initialize(self, version, build_data): + """Build Cython extensions""" + print("[CythonHook] Building Cython extensions...") + + # Define extension + ext = Extension( + "pysmurf.client.util.stream_data_reader", + ["python/pysmurf/client/util/stream_data_reader.pyx"], + include_dirs=[numpy.get_include()], + define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")], + language="c++", + ) + + # Cythonize + ext_modules = cythonize([ext], compiler_directives={ + 'language_level': "3", 'boundscheck': False, 'wraparound': False, + 'cdivision': True, 'initializedcheck': False, + }) + + # Build + dist = Distribution({'ext_modules': ext_modules}) + cmd = build_ext(dist) + # point to root of python source + # this will place the compiled extension where editable installs can find it + cmd.build_lib = "python" + cmd.finalize_options() + cmd.run() + + build_data['pure_python'] = False + build_data['infer_tag'] = True \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 98743930..6c5b539a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,9 +12,9 @@ requires = [ "hatch-vcs", "hatchling", - "hatch-cython", "Cython>=0.29.0", "numpy>=1.19.0", + "setuptools", ] build-backend = "hatchling.build" @@ -48,6 +48,7 @@ dependencies = [ "schema", "scipy", "seaborn", + "Cython", ] [project.urls] @@ -76,9 +77,12 @@ include = [ packages = [ "/python/pysmurf", ] +# include cython extensions +artifacts = [ + "python/pysmurf/**/*.so", + "python/pysmurf/**/*.pyd", +] -[tool.hatch.build.hooks.cython] -include_numpy = true - -# Cython compiler directives -directives = { boundscheck = false, wraparound = false, nonecheck = false, language_level = 3 } \ No newline at end of file +# build cython extensions +[tool.hatch.build.hooks.custom] +path = "hatch_build.py" \ No newline at end of file From 0abf432c2c87d7f0ffb6df2463ff2f45b1fcd3e7 Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Tue, 7 Apr 2026 15:31:28 -0700 Subject: [PATCH 03/10] fix(util.stream_data_reader): Reimplement functionality. Added to fast reader: - parse TES bias - format headers - read selection of channels - handle IQ_mode --- python/pysmurf/client/util/smurf_util.py | 34 +- .../client/util/stream_data_reader.pyx | 291 +++++++++--------- 2 files changed, 168 insertions(+), 157 deletions(-) diff --git a/python/pysmurf/client/util/smurf_util.py b/python/pysmurf/client/util/smurf_util.py index b262972e..98caa2c5 100644 --- a/python/pysmurf/client/util/smurf_util.py +++ b/python/pysmurf/client/util/smurf_util.py @@ -31,7 +31,7 @@ # Try to import optimized Cython version, fall back to pure Python if not available try: - from pysmurf.client.util.stream_data_reader import read_stream_data_cython + from pysmurf.client.util.stream_data_reader import read_stream_data_cython, parse_tes_bias_from_headers CYTHON_AVAILABLE = True except ImportError: CYTHON_AVAILABLE = False @@ -1126,7 +1126,7 @@ def read_stream_data(self, datafile, channel=None, return_header=False, return_tes_bias=False, write_log=True, n_max=2048, make_freq_mask=False, - gcp_mode=False, IQ_mode=False): + gcp_mode=False, IQ_mode=False, fast_reader=False): """ Loads data taken with the function stream_data_on. Gives back the resonator data in units of phase. Also @@ -1189,29 +1189,29 @@ def read_stream_data(self, datafile, channel=None, self.log(f'Only reading channel {channel}') # Use fully Cython-optimized version if available - if CYTHON_AVAILABLE: - if write_log: - self.log('Using Cython-optimized data reader (full C implementation)') + if CYTHON_AVAILABLE and fast_reader: # This version does ALL file I/O in C t, phase, headers = read_stream_data_cython( datafile, channel=channel, IQ_mode=IQ_mode, - return_header=return_header, - return_tes_bias=return_tes_bias, - write_log=write_log ) header_dict = {} if return_header: - for k in headers[0].keys(): - header_dict[k] = np.array([h[k] for h in headers]) + # parse into dict for backwards compatibility + for k in headers.dtype.fields: + if k[:8] != "_padding": + header_dict[k] = headers[k] + tes_bias = parse_tes_bias_from_headers(headers) # Pure Python fallback else: - if write_log: - self.log('Using pure Python data reader (Cython not available)') - # Fall back to original Python implementation + if fast_reader: + self.log( + 'Fast Cython reader not available. Falling back on slow reader.', + self.LOG_ERROR + ) # Flag to indicate we are about the read the fist frame from the disk # The number of channel will be extracted from the first frame and the # data structures will be build based on that @@ -1313,19 +1313,19 @@ def read_stream_data(self, datafile, channel=None, phase=np.array(phase) t=np.array(t) - if return_header: + if return_header and not fast_reader: for k in header_dict.keys(): header_dict[k] = np.append(header_dict[k], tmp_header_dict[k]) tes_bias = np.vstack((tes_bias, tmp_tes_bias)) tes_bias = np.transpose(tes_bias) - - elif return_tes_bias: + elif return_tes_bias and not fast_reader: tes_bias = np.vstack((tes_bias, tmp_tes_bias)) tes_bias = np.transpose(tes_bias) # rotate and transform to phase - if not IQ_mode: + # the cython reader handles this already + if not IQ_mode and not fast_reader: phase = phase.astype(float) / 2**15 * np.pi if np.size(phase) == 0: diff --git a/python/pysmurf/client/util/stream_data_reader.pyx b/python/pysmurf/client/util/stream_data_reader.pyx index ee93b6b3..1de6777b 100644 --- a/python/pysmurf/client/util/stream_data_reader.pyx +++ b/python/pysmurf/client/util/stream_data_reader.pyx @@ -25,6 +25,41 @@ cnp.import_array() cdef int SMURF_HEADER_SIZE = 128 cdef int ROGUE_HEADER_SIZE = 8 cdef int SMURF_CHANNEL_SIZE = 4 +SMURF_DATA_DTYPE = np.dtype(np.int32) + +# SMuRF Header as NumPy structured dtype +SMURF_HEADER_DTYPE = np.dtype([ + ('protocol_version', np.uint8), # Offset 0 + ('crate_id', np.uint8), # Offset 1 + ('slot_number', np.uint8), # Offset 2 + ('timing_cond', np.uint8), # Offset 3 + ('number_of_channels', np.uint32), # Offset 4 + ('tes_bias', np.uint8, 40), # Offset 8-47 + ('timestamp', np.uint64), # Offset 48 + ('flux_ramp_increment', np.int32), # Offset 56 + ('flux_ramp_offset', np.int32), # Offset 60 + ('counter_0', np.uint32), # Offset 64 + ('counter_1', np.uint32), # Offset 68 + ('counter_2', np.uint64), # Offset 72 + ('reset_bits', np.uint32), # Offset 80 + ('frame_counter', np.uint32), # Offset 84 + ('tes_relays_config', np.uint32), # Offset 88 + ('_padding_92_95', np.uint8, 4), # Offset 92 (unused) + ('external_time_raw', np.uint64), # Offset 96 (only 5 bytes used) + ('control_field', np.uint8), # Offset 104 + ('test_params', np.uint8), # Offset 105 + ('_padding_106_111', np.uint8, 6), # Offset 106 (unused) + ('num_rows', np.uint16), # Offset 112 + ('num_rows_reported', np.uint16), # Offset 114 + ('_padding_116_119', np.uint8, 4), # Offset 116 (unused) + ('row_length', np.uint16), # Offset 120 + ('data_rate', np.uint16), # Offset 122 + ('_padding_124_127', np.uint8, 4), # Offset 124 (unused) +]) + +# Validate dtype size +assert SMURF_HEADER_DTYPE.itemsize == 128, \ + f"SMURF_HEADER_DTYPE must be 128 bytes, got {SMURF_HEADER_DTYPE.itemsize}" # C structures for binary parsing cdef struct RogueHeader: @@ -33,91 +68,10 @@ cdef struct RogueHeader: uint8_t error uint8_t channel -cdef struct SmurfHeader: - uint8_t protocol_version - uint8_t crate_id - uint8_t slot_number - uint8_t timing_cond - uint32_t number_of_channels - uint64_t timestamp - int32_t flux_ramp_increment - int32_t flux_ramp_offset - uint32_t counter_0 - uint32_t counter_1 - uint64_t counter_2 - uint32_t reset_bits - uint32_t frame_counter - uint32_t tes_relays_config - uint64_t external_time_raw - uint8_t control_field - uint8_t test_params - uint16_t num_rows - uint16_t num_rows_reported - uint16_t row_length - uint16_t data_rate - cdef inline void parse_rogue_header(uint8_t* data, RogueHeader* header) noexcept nogil: """Parse Rogue header from raw bytes - inline for speed""" - # Little-endian parsing - header.size = ((data[0]) | - (data[1] << 8) | - (data[2] << 16) | - (data[3] << 24)) - header.flags = (data[4]) | (data[5] << 8) - header.error = data[6] - header.channel = data[7] - - -cdef inline void parse_smurf_header(uint8_t* data, SmurfHeader* header) noexcept nogil: - """Parse SMURF header from raw bytes - inline for speed""" - cdef int pos = 0 - - # Parse basic fields (4 bytes) - header.protocol_version = data[0] - header.crate_id = data[1] - header.slot_number = data[2] - header.timing_cond = data[3] - pos = 4 - - # number_of_channels (4 bytes, little-endian uint32) - header.number_of_channels = ((data[pos]) | - (data[pos+1] << 8) | - (data[pos+2] << 16) | - (data[pos+3] << 24)) - pos += 4 - - # Skip 40 bytes (TesBias) - pos += 40 - - # timestamp (8 bytes, little-endian uint64) - header.timestamp = ((data[pos]) | - (data[pos+1] << 8) | - (data[pos+2] << 16) | - (data[pos+3] << 24) | - (data[pos+4] << 32) | - (data[pos+5] << 40) | - (data[pos+6] << 48) | - (data[pos+7] << 56)) - pos += 8 - - # flux_ramp_increment (4 bytes, int32) - header.flux_ramp_increment = (((data[pos]) | - (data[pos+1] << 8) | - (data[pos+2] << 16) | - (data[pos+3] << 24))) - pos += 4 - - # flux_ramp_offset (4 bytes, int32) - header.flux_ramp_offset = (((data[pos]) | - (data[pos+1] << 8) | - (data[pos+2] << 16) | - (data[pos+3] << 24))) - pos += 4 - - # Remaining fields follow similar pattern... - # For brevity, we'll just extract the essentials needed for processing - + header[0] = (data)[0] cdef class FastSmurfReader: """ @@ -162,33 +116,44 @@ cdef class FastSmurfReader: fclose(self.file_ptr) self.file_ptr = NULL - def read_all_records(self): + def read_all_records(self, channel=None): """ Read all records from file and return as numpy arrays This is the main high-performance function + Arguments + --------- + channel : array-like or None + Channels to read (if None, reads all) + Returns ------- - timestamps : ndarray - Array of timestamps - data : ndarray - 2D array of channel data [n_records, n_channels] - n_channels : int - Number of channels + timestamps : ndarray (uint64) + Array of timestamps extracted from headers + data : ndarray (int32, shape=[n_records, n_channels]) + 2D array of channel data + headers : ndarray (structured array) + Structured array of SMuRF headers with dtype SMURF_HEADER_DTYPE + Access fields like: headers['timestamp'], headers['frame_counter'] + Individual headers: headers[i] returns a single header record """ # Pre-allocate with estimates - cdef list timestamps = [] + cdef bytearray header_bytes = bytearray() cdef list data_list = [] - cdef list headers = [] cdef RogueHeader rogue_hdr cdef cnp.ndarray[cnp.int32_t, ndim=1] channel_data cdef size_t bytes_read cdef long rec_end cdef uint32_t rogue_payload - cdef int n_channels + cdef int n_channels = -1 + cdef uint32_t current_num_channels cdef bint found_data + # only save requested channels to memory + if channel is not None: + channel = np.atleast_1d(np.asarray(channel, dtype=np.int64)) + # Read records while True: # Check if at end of file @@ -220,6 +185,7 @@ cdef class FastSmurfReader: found_data = True break else: + # TODO support reading metadata channels # Skip non-data channels fseek(self.file_ptr, rec_end, SEEK_SET) @@ -231,38 +197,51 @@ cdef class FastSmurfReader: if bytes_read != SMURF_HEADER_SIZE: break - # Parse SMURF header - cdef SmurfHeader smurf_hdr - with nogil: - parse_smurf_header(self.header_buffer, &smurf_hdr) + # Store raw header bytes for later parsing + header_bytes.extend(self.header_buffer[:SMURF_HEADER_SIZE]) + + # Read number_of_channels directly from buffer for validation + current_num_channels = (&self.header_buffer[4])[0] - n_channels = smurf_hdr.number_of_channels + if n_channels == -1: + n_channels = current_num_channels + if n_channels != current_num_channels: + raise ValueError( + f"Channel count mismatch: previous frame had {n_channels}, got {current_num_channels}" + ) # Read channel data using numpy fromfile for efficiency channel_data = np.fromfile(self.filename, - dtype=np.int32, + dtype=SMURF_DATA_DTYPE, count=n_channels, offset=ftell(self.file_ptr)) + # subset if requested + if channel is not None: + channel_data = channel_data[channel] + # Advance file pointer - fseek(self.file_ptr, n_channels * SMURF_CHANNEL_SIZE, SEEK_CUR) + fseek(self.file_ptr, n_channels * SMURF_DATA_DTYPE.itemsize, SEEK_CUR) # Store data - timestamps.append(smurf_hdr.timestamp) data_list.append(channel_data) - headers.append(smurf_hdr) self.records_read += 1 - # Convert to numpy arrays - timestamps_array = np.array(timestamps, dtype=np.uint64) + # Parse all headers at once from collected bytes + headers_array = np.frombuffer(header_bytes, dtype=SMURF_HEADER_DTYPE) + + # Mask external_time_raw to 5 bytes (40 bits) + headers_array['external_time_raw'] &= 0xFFFFFFFFFF + + # Convert other data to numpy arrays + timestamps_array = headers_array['timestamp'] data_array = np.array(data_list, dtype=np.int32) - return timestamps_array, data_array, headers, n_channels + return timestamps_array, data_array, headers_array -def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False, - bint return_tes_bias=False, bint write_log=True): +def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False): """ Ultra-fast Cython implementation that reads entire file in C. This completely replaces the Python loop over SmurfStreamReader.records() @@ -271,14 +250,10 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False, ---------- datafile : str Path to the data file - channel : array-like or None + channel : int or array-like or None Channels to read (if None, reads all) IQ_mode : bool Whether data is in IQ streaming mode - return_tes_bias : bool - Whether to return TES bias data - write_log : bool - Whether to print progress messages Returns ------- @@ -299,49 +274,85 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False, int i, j, chan_idx cnp.ndarray[cnp.int64_t, ndim=1] channel_indices - if write_log: - print(f"Reading {datafile} with Cython optimization...") - # Open file and read all records in C - reader = FastSmurfReader(datafile, is_rogue=True) - with reader: - timestamps, data_array, headers, n_channels = reader.read_all_records() + with FastSmurfReader(datafile, is_rogue=True) as reader: + timestamps, data_array, headers = reader.read_all_records(channel=channel) n_records = len(timestamps) - - if write_log: - print(f"Read {n_records} records with {n_channels} channels") - - # Process channel selection - if channel is None: - channel_indices = np.arange(n_channels, dtype=np.int64) - else: - channel_indices = np.asarray(channel, dtype=np.int64) + n_chan = data_array.shape[1] # Extract selected channels and convert to phase if IQ_mode: # IQ mode: pair consecutive channels - # TODO add some checks - n_output_channels = len(channel_indices) // 2 - iq_data = np.empty((n_output_channels, n_records), dtype=np.complex128) - - for i in range(n_records): - for j in range(n_output_channels): - chan_idx = channel_indices[j * 2] - iq_data[j, i] = data_array[i, chan_idx] + 1j * data_array[i, chan_idx + 1] + if n_chan % 2 != 0: + raise ValueError(f"In IQ mode, number of channels should be even. Found {n_chan}.") + iq_data = np.empty((n_chan // 2, n_records), dtype=np.complex128) + iq_data = (data_array[:, ::2] + 1j * data_array[:, 1::2]).T return timestamps, iq_data, headers else: # Normal mode: convert to phase - n_output_channels = len(channel_indices) - phase = np.empty((n_output_channels, n_records), dtype=np.float64) + phase = np.empty((n_chan, n_records), dtype=np.float64) - for i in range(n_records): - for j in range(n_output_channels): - chan_idx = channel_indices[j] - phase[j, i] = data_array[i, chan_idx] / (2.0**15) * pi + phase = data_array.T / (2.0**15) * pi return timestamps, phase, headers +def parse_tes_bias_from_headers(headers): + """ + Extract TES bias data from headers and parse into 16 TES bias values. + + The tes_bias field contains 40 bytes encoding 16 TES bias values as 20-bit + signed integers. Each pair of values fits in 5 bytes: + - Even index (0, 2, 4, ...): bytes 0-2, lower 20 bits + - Odd index (1, 3, 5, ...): bytes 2-4, upper 20 bits (shifted right 4) + + Parameters + ---------- + headers : ndarray (structured array) + Headers array with 'tes_bias' field (40 bytes per header) + Returns + ------- + tes_bias_array : ndarray (int32, shape=[16, n_headers]) + Parsed TES bias values, 16 values per header + """ + cdef int n_headers = len(headers) + cdef int i, j, b + cdef cnp.ndarray[cnp.int32_t, ndim=2] tes_bias_array = np.empty((16, n_headers), dtype=np.int32) + cdef cnp.ndarray[cnp.uint8_t, ndim=1] raw_bytes + cdef uint32_t tmp + + # Process each header + for j in range(n_headers): + raw_bytes = headers['tes_bias'][j] + + # Parse 16 TES bias values from 40 bytes + for i in range(16): + b = i // 2 # Which 5-byte group (0-7) + + # 2 TES values fit in 5 bytes + # Each pair (bytes 0-4): 00 00 01 11 11 + # Even (i%2==0): bytes 0-2, mask to 20 bits: 00 00 0x + # Odd (i%2==1): bytes 2-4, shift right 4, mask to 20 bits: x1 11 11 + + if i % 2 == 0: # Even index + # Read bytes 0-2 of the 5-byte group + tmp = ((raw_bytes[b*5]) | + (raw_bytes[b*5 + 1] << 8) | + (raw_bytes[b*5 + 2] << 16)) & 0xFFFFF + else: # Odd index + # Read bytes 2-4 of the 5-byte group, shift right 4 bits + tmp = (((raw_bytes[b*5 + 2]) | + (raw_bytes[b*5 + 3] << 8) | + (raw_bytes[b*5 + 4] << 16)) >> 4) & 0xFFFFF + + # Convert to signed 20-bit value + # If bit 19 is set, it's negative (two's complement) + if tmp >= 0x80000: + tmp -= 0x100000 + + tes_bias_array[i, j] = tmp + + return tes_bias_array From a0875436f1ae39912fa69ecbacd670c598761619 Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Tue, 7 Apr 2026 16:51:46 -0700 Subject: [PATCH 04/10] feat(util.stream_data_reader): Read metadata if requested. --- python/pysmurf/client/util/smurf_util.py | 3 +- .../client/util/stream_data_reader.pyx | 38 ++++++++++++++----- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/python/pysmurf/client/util/smurf_util.py b/python/pysmurf/client/util/smurf_util.py index 98caa2c5..2ce87b41 100644 --- a/python/pysmurf/client/util/smurf_util.py +++ b/python/pysmurf/client/util/smurf_util.py @@ -1192,7 +1192,7 @@ def read_stream_data(self, datafile, channel=None, if CYTHON_AVAILABLE and fast_reader: # This version does ALL file I/O in C - t, phase, headers = read_stream_data_cython( + t, phase, headers, _meta = read_stream_data_cython( datafile, channel=channel, IQ_mode=IQ_mode, @@ -1313,6 +1313,7 @@ def read_stream_data(self, datafile, channel=None, phase=np.array(phase) t=np.array(t) + # These are handled earlier in the Cython reader if return_header and not fast_reader: for k in header_dict.keys(): header_dict[k] = np.append(header_dict[k], diff --git a/python/pysmurf/client/util/stream_data_reader.pyx b/python/pysmurf/client/util/stream_data_reader.pyx index 1de6777b..a2fe6cfa 100644 --- a/python/pysmurf/client/util/stream_data_reader.pyx +++ b/python/pysmurf/client/util/stream_data_reader.pyx @@ -10,6 +10,7 @@ This module provides high-performance data reading for SMuRF stream data. All file I/O and binary parsing is done in C for maximum performance. """ +import yaml import numpy as np cimport numpy as cnp from libc.stdlib cimport malloc, free @@ -116,7 +117,7 @@ cdef class FastSmurfReader: fclose(self.file_ptr) self.file_ptr = NULL - def read_all_records(self, channel=None): + def read_all_records(self, channel=None, skip_meta=True): """ Read all records from file and return as numpy arrays This is the main high-performance function @@ -125,6 +126,8 @@ cdef class FastSmurfReader: --------- channel : array-like or None Channels to read (if None, reads all) + skip_meta : bool, optional, default True + Whether to skip metadata parsing Returns ------- @@ -136,10 +139,13 @@ cdef class FastSmurfReader: Structured array of SMuRF headers with dtype SMURF_HEADER_DTYPE Access fields like: headers['timestamp'], headers['frame_counter'] Individual headers: headers[i] returns a single header record + metadata : dict + Dictionary of metadata read from Rogue headers (if skip_meta=False) """ # Pre-allocate with estimates cdef bytearray header_bytes = bytearray() cdef list data_list = [] + cdef list meta_list = [] cdef RogueHeader rogue_hdr cdef cnp.ndarray[cnp.int32_t, ndim=1] channel_data @@ -160,7 +166,7 @@ cdef class FastSmurfReader: if ftell(self.file_ptr) >= self.file_size: break - # Process Rogue header if needed + # Read Rogue headers to find data channel if self.is_rogue: found_data = False while not found_data: @@ -178,16 +184,19 @@ cdef class FastSmurfReader: parse_rogue_header(self.rogue_buffer, &rogue_hdr) rogue_payload = rogue_hdr.size - 4 - rec_end = ftell(self.file_ptr) + rogue_payload # If data channel, process it if rogue_hdr.channel == 0: found_data = True break else: - # TODO support reading metadata channels - # Skip non-data channels - fseek(self.file_ptr, rec_end, SEEK_SET) + # read into metadata buffer + if skip_meta: + fseek(self.file_ptr, rogue_payload, SEEK_CUR) + else: + meta_buffer = bytearray(rogue_payload) + fread(meta_buffer, 1, rogue_payload, self.file_ptr) + meta_list.append((self.records_read, meta_buffer.decode('utf-8'))) if not found_data: break @@ -238,10 +247,13 @@ cdef class FastSmurfReader: timestamps_array = headers_array['timestamp'] data_array = np.array(data_list, dtype=np.int32) - return timestamps_array, data_array, headers_array + # parse metadata stream from YAML + metadata = {i: yaml.safe_load(m) for i, m in meta_list} + return timestamps_array, data_array, headers_array, metadata -def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False): + +def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False, bint skip_meta=True): """ Ultra-fast Cython implementation that reads entire file in C. This completely replaces the Python loop over SmurfStreamReader.records() @@ -254,6 +266,8 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False): Channels to read (if None, reads all) IQ_mode : bool Whether data is in IQ streaming mode + skip_meta : bool, optional, default True + Whether to skip metadata parsing Returns ------- @@ -263,6 +277,8 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False): Phase data (or IQ data if in IQ mode) headers : list SMuRF headers. + meta : dict + Metadata read from file. dict keys are the corresponding data index. """ cdef: FastSmurfReader reader @@ -276,7 +292,9 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False): # Open file and read all records in C with FastSmurfReader(datafile, is_rogue=True) as reader: - timestamps, data_array, headers = reader.read_all_records(channel=channel) + timestamps, data_array, headers, meta = reader.read_all_records( + skip_meta=skip_meta, channel=channel + ) n_records = len(timestamps) n_chan = data_array.shape[1] @@ -296,7 +314,7 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False): phase = data_array.T / (2.0**15) * pi - return timestamps, phase, headers + return timestamps, phase, headers, meta def parse_tes_bias_from_headers(headers): """ From a55ccd663d181d83d01382897da71e993ade7aab Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Wed, 8 Apr 2026 10:17:42 -0700 Subject: [PATCH 05/10] fix(hatch_build.py): Rename build dir to not conflict with make --- hatch_build.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hatch_build.py b/hatch_build.py index 749fec1f..bb5d32ff 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -34,8 +34,9 @@ def initialize(self, version, build_data): # point to root of python source # this will place the compiled extension where editable installs can find it cmd.build_lib = "python" + cmd.build_temp = "build_cython" cmd.finalize_options() cmd.run() build_data['pure_python'] = False - build_data['infer_tag'] = True \ No newline at end of file + build_data['infer_tag'] = True From e0e1f95a9acbdf8c5582bb1fd758562ff239d119 Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Wed, 8 Apr 2026 13:33:18 -0700 Subject: [PATCH 06/10] fix(smurf_data_reader): Make more efficient. Use memory views for speed and less memory use. Also get dimensions from file to pre-allocate read arrays. --- .../client/util/stream_data_reader.pyx | 106 ++++++++++++++---- 1 file changed, 85 insertions(+), 21 deletions(-) diff --git a/python/pysmurf/client/util/stream_data_reader.pyx b/python/pysmurf/client/util/stream_data_reader.pyx index a2fe6cfa..13e18473 100644 --- a/python/pysmurf/client/util/stream_data_reader.pyx +++ b/python/pysmurf/client/util/stream_data_reader.pyx @@ -117,6 +117,48 @@ cdef class FastSmurfReader: fclose(self.file_ptr) self.file_ptr = NULL + cdef _get_dimensions(self, int32_t* n_chan, int32_t* n_record): + cdef uint32_t rogue_payload + cdef RogueHeader rogue_hdr + n_chan[0] = -1 + + # set file pointer back to beginning + fseek(self.file_ptr, 0, SEEK_SET) + + # Read records + while True: + # Check if at end of file + if ftell(self.file_ptr) >= self.file_size: + break + + # Check for EOF + if ftell(self.file_ptr) >= self.file_size: + break + + # Read Rogue header + bytes_read = fread(self.rogue_buffer, 1, ROGUE_HEADER_SIZE, self.file_ptr) + if bytes_read != ROGUE_HEADER_SIZE: + break + + # Parse header + parse_rogue_header(self.rogue_buffer, &rogue_hdr) + + rogue_payload = rogue_hdr.size - 4 + + # If data channel, increment counter + if rogue_hdr.channel == 0: + n_record[0] += 1 + + # read number of channels once + if n_chan[0] == -1: + bytes_read = fread(self.header_buffer, 1, SMURF_HEADER_SIZE, self.file_ptr) + if bytes_read != SMURF_HEADER_SIZE: + break + n_chan[0] = (&self.header_buffer[4])[0] + + # advance file pointer + fseek(self.file_ptr, rogue_payload, SEEK_CUR) + def read_all_records(self, channel=None, skip_meta=True): """ Read all records from file and return as numpy arrays @@ -142,25 +184,45 @@ cdef class FastSmurfReader: metadata : dict Dictionary of metadata read from Rogue headers (if skip_meta=False) """ - # Pre-allocate with estimates cdef bytearray header_bytes = bytearray() - cdef list data_list = [] + cdef int32_t[:, :] data_view + cdef int32_t[:] channel_data cdef list meta_list = [] cdef RogueHeader rogue_hdr - cdef cnp.ndarray[cnp.int32_t, ndim=1] channel_data cdef size_t bytes_read - cdef long rec_end cdef uint32_t rogue_payload - cdef int n_channels = -1 + cdef int32_t n_channels, n_records cdef uint32_t current_num_channels cdef bint found_data + cdef int32_t i, j, ch + + print("reading file dimensions") + # first pass over file to get dimensions + self._get_dimensions(&n_channels, &n_records) + print(f"File has {n_channels} channels and {n_records} records") + + # allocate data buffer for reading from disk + channel_data_array = np.zeros(n_channels, dtype=np.int32) + channel_data = channel_data_array # only save requested channels to memory if channel is not None: channel = np.atleast_1d(np.asarray(channel, dtype=np.int64)) + else: + channel = np.arange(n_channels) + + # allocate data array for storing in memory + data_array = np.empty((n_records, channel.size), dtype=np.int32) + data_view = data_array + + print("reading from file") + + # set file pointer back to beginning + fseek(self.file_ptr, 0, SEEK_SET) # Read records + i = 0 while True: # Check if at end of file if ftell(self.file_ptr) >= self.file_size: @@ -195,7 +257,9 @@ cdef class FastSmurfReader: fseek(self.file_ptr, rogue_payload, SEEK_CUR) else: meta_buffer = bytearray(rogue_payload) - fread(meta_buffer, 1, rogue_payload, self.file_ptr) + bytes_read = fread(meta_buffer, 1, rogue_payload, self.file_ptr) + if bytes_read != rogue_payload: + break meta_list.append((self.records_read, meta_buffer.decode('utf-8'))) if not found_data: @@ -219,23 +283,23 @@ cdef class FastSmurfReader: f"Channel count mismatch: previous frame had {n_channels}, got {current_num_channels}" ) - # Read channel data using numpy fromfile for efficiency - channel_data = np.fromfile(self.filename, - dtype=SMURF_DATA_DTYPE, - count=n_channels, - offset=ftell(self.file_ptr)) - - # subset if requested - if channel is not None: - channel_data = channel_data[channel] - - # Advance file pointer - fseek(self.file_ptr, n_channels * SMURF_DATA_DTYPE.itemsize, SEEK_CUR) + # read all channels into buffer + bytes_read = fread(&channel_data[0], 4, n_channels, self.file_ptr) + if bytes_read != n_channels: + print("Failed to read data from file") + print(f"read {bytes_read} bytes. Expected {n_channels * 4}.") + break - # Store data - data_list.append(channel_data) + # copy into array with subsetting + for j in range(channel.size): + ch = channel[j] + data_view[i, j] = channel_data[ch] self.records_read += 1 + i += 1 + + print("done reading") + print("converting into arrays") # Parse all headers at once from collected bytes headers_array = np.frombuffer(header_bytes, dtype=SMURF_HEADER_DTYPE) @@ -245,10 +309,10 @@ cdef class FastSmurfReader: # Convert other data to numpy arrays timestamps_array = headers_array['timestamp'] - data_array = np.array(data_list, dtype=np.int32) # parse metadata stream from YAML metadata = {i: yaml.safe_load(m) for i, m in meta_list} + print("returning") return timestamps_array, data_array, headers_array, metadata From 175ac5d1ae20603d014a088b4a59af1845c448c9 Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Wed, 8 Apr 2026 13:34:39 -0700 Subject: [PATCH 07/10] fix(hatch_build.py): Remove c++ requirement --- hatch_build.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hatch_build.py b/hatch_build.py index bb5d32ff..05e4a2ab 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -19,7 +19,6 @@ def initialize(self, version, build_data): ["python/pysmurf/client/util/stream_data_reader.pyx"], include_dirs=[numpy.get_include()], define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")], - language="c++", ) # Cythonize From 35ade864588f50dd933bfd8de295aaf12c976d5e Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Wed, 8 Apr 2026 13:35:25 -0700 Subject: [PATCH 08/10] fix(read_stream_data_cython): Make more efficient with memory views --- .../client/util/stream_data_reader.pyx | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/python/pysmurf/client/util/stream_data_reader.pyx b/python/pysmurf/client/util/stream_data_reader.pyx index 13e18473..fae46574 100644 --- a/python/pysmurf/client/util/stream_data_reader.pyx +++ b/python/pysmurf/client/util/stream_data_reader.pyx @@ -347,18 +347,18 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False, bint cdef: FastSmurfReader reader cnp.ndarray[cnp.uint64_t, ndim=1] timestamps - cnp.ndarray[cnp.int32_t, ndim=2] data_array - cnp.ndarray[cnp.float64_t, ndim=2] phase - cnp.ndarray[cnp.complex128_t, ndim=2] iq_data - int n_channels, n_records - int i, j, chan_idx - cnp.ndarray[cnp.int64_t, ndim=1] channel_indices + cnp.int32_t[:, :] data_view + cnp.float64_t[:, :] phase_view + cnp.complex128_t[:, :] iq_data_view + int n_chan, n_records + int i, j # Open file and read all records in C with FastSmurfReader(datafile, is_rogue=True) as reader: timestamps, data_array, headers, meta = reader.read_all_records( skip_meta=skip_meta, channel=channel ) + data_view = data_array # point memory view to array n_records = len(timestamps) n_chan = data_array.shape[1] @@ -368,15 +368,23 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False, bint # IQ mode: pair consecutive channels if n_chan % 2 != 0: raise ValueError(f"In IQ mode, number of channels should be even. Found {n_chan}.") + iq_data = np.empty((n_chan // 2, n_records), dtype=np.complex128) - iq_data = (data_array[:, ::2] + 1j * data_array[:, 1::2]).T + iq_data_view = iq_data # point memory view to array + + for i in range(n_records): + for j in range(n_chan // 2): + iq_data_view[j, i] = (data_view[i, 2 * j] + 1j * data_view[i, 2 * j + 1]) return timestamps, iq_data, headers else: # Normal mode: convert to phase phase = np.empty((n_chan, n_records), dtype=np.float64) + phase_view = phase # point memory view to array - phase = data_array.T / (2.0**15) * pi + for i in range(n_records): + for j in range(n_chan): + phase_view[j, i] = data_view[i, j] / (2.0**15) * pi return timestamps, phase, headers, meta From f056bc03e21049a919e317cca8283613a86bdf7a Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Wed, 8 Apr 2026 15:10:18 -0700 Subject: [PATCH 09/10] fix(stream_data_reader): Fix bugs and performance issues --- .../client/util/stream_data_reader.pyx | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/python/pysmurf/client/util/stream_data_reader.pyx b/python/pysmurf/client/util/stream_data_reader.pyx index fae46574..97ca92ae 100644 --- a/python/pysmurf/client/util/stream_data_reader.pyx +++ b/python/pysmurf/client/util/stream_data_reader.pyx @@ -25,8 +25,6 @@ cnp.import_array() # Frame Format Constants cdef int SMURF_HEADER_SIZE = 128 cdef int ROGUE_HEADER_SIZE = 8 -cdef int SMURF_CHANNEL_SIZE = 4 -SMURF_DATA_DTYPE = np.dtype(np.int32) # SMuRF Header as NumPy structured dtype SMURF_HEADER_DTYPE = np.dtype([ @@ -117,10 +115,11 @@ cdef class FastSmurfReader: fclose(self.file_ptr) self.file_ptr = NULL - cdef _get_dimensions(self, int32_t* n_chan, int32_t* n_record): - cdef uint32_t rogue_payload + cdef void _get_dimensions(self, int32_t* n_chan, int32_t* n_record): + cdef uint32_t rogue_payload, bytes_read cdef RogueHeader rogue_hdr n_chan[0] = -1 + n_record[0] = 0 # set file pointer back to beginning fseek(self.file_ptr, 0, SEEK_SET) @@ -131,10 +130,6 @@ cdef class FastSmurfReader: if ftell(self.file_ptr) >= self.file_size: break - # Check for EOF - if ftell(self.file_ptr) >= self.file_size: - break - # Read Rogue header bytes_read = fread(self.rogue_buffer, 1, ROGUE_HEADER_SIZE, self.file_ptr) if bytes_read != ROGUE_HEADER_SIZE: @@ -155,6 +150,9 @@ cdef class FastSmurfReader: if bytes_read != SMURF_HEADER_SIZE: break n_chan[0] = (&self.header_buffer[4])[0] + # Adjust seek since we read the header + fseek(self.file_ptr, rogue_payload - SMURF_HEADER_SIZE, SEEK_CUR) + continue # Skip the normal seek below # advance file pointer fseek(self.file_ptr, rogue_payload, SEEK_CUR) @@ -184,11 +182,14 @@ cdef class FastSmurfReader: metadata : dict Dictionary of metadata read from Rogue headers (if skip_meta=False) """ - cdef bytearray header_bytes = bytearray() + # declare memory views for storing data cdef int32_t[:, :] data_view cdef int32_t[:] channel_data + cdef char[:] headers_view + # just use a list for yaml metadata cdef list meta_list = [] + # variables for controlling read loop cdef RogueHeader rogue_hdr cdef size_t bytes_read cdef uint32_t rogue_payload @@ -197,27 +198,32 @@ cdef class FastSmurfReader: cdef bint found_data cdef int32_t i, j, ch - print("reading file dimensions") # first pass over file to get dimensions self._get_dimensions(&n_channels, &n_records) - print(f"File has {n_channels} channels and {n_records} records") # allocate data buffer for reading from disk channel_data_array = np.zeros(n_channels, dtype=np.int32) channel_data = channel_data_array + # allocate buffer for headers + headers_array = np.zeros(n_records, dtype=SMURF_HEADER_DTYPE) + headers_view = headers_array.view(np.int8) + cdef int header_offset = 0 + # only save requested channels to memory if channel is not None: - channel = np.atleast_1d(np.asarray(channel, dtype=np.int64)) + channel = np.atleast_1d(np.asarray(channel, dtype=np.int32)) else: - channel = np.arange(n_channels) + channel = np.arange(n_channels, dtype=np.int32) + + # Cache size and create memory view + cdef int32_t n_sel_channels = channel.size + cdef int32_t[:] channel_view = channel # allocate data array for storing in memory - data_array = np.empty((n_records, channel.size), dtype=np.int32) + data_array = np.empty((n_records, n_sel_channels), dtype=np.int32) data_view = data_array - print("reading from file") - # set file pointer back to beginning fseek(self.file_ptr, 0, SEEK_SET) @@ -271,7 +277,8 @@ cdef class FastSmurfReader: break # Store raw header bytes for later parsing - header_bytes.extend(self.header_buffer[:SMURF_HEADER_SIZE]) + memcpy(&headers_view[header_offset], self.header_buffer, SMURF_HEADER_SIZE) + header_offset += SMURF_HEADER_SIZE # Read number_of_channels directly from buffer for validation current_num_channels = (&self.header_buffer[4])[0] @@ -286,33 +293,23 @@ cdef class FastSmurfReader: # read all channels into buffer bytes_read = fread(&channel_data[0], 4, n_channels, self.file_ptr) if bytes_read != n_channels: - print("Failed to read data from file") - print(f"read {bytes_read} bytes. Expected {n_channels * 4}.") break # copy into array with subsetting - for j in range(channel.size): - ch = channel[j] + for j in range(n_sel_channels): + ch = channel_view[j] data_view[i, j] = channel_data[ch] self.records_read += 1 i += 1 - print("done reading") - print("converting into arrays") - - # Parse all headers at once from collected bytes - headers_array = np.frombuffer(header_bytes, dtype=SMURF_HEADER_DTYPE) - # Mask external_time_raw to 5 bytes (40 bits) headers_array['external_time_raw'] &= 0xFFFFFFFFFF - # Convert other data to numpy arrays timestamps_array = headers_array['timestamp'] # parse metadata stream from YAML metadata = {i: yaml.safe_load(m) for i, m in meta_list} - print("returning") return timestamps_array, data_array, headers_array, metadata From 97caace1a38e01dfde34b59d4cf43649f1f10056 Mon Sep 17 00:00:00 2001 From: Tristan Pinsonneault-Marotte Date: Thu, 9 Apr 2026 10:02:04 -0700 Subject: [PATCH 10/10] fix(stream_data_reader): Remove is_rogue flag --- .../client/util/stream_data_reader.pyx | 65 +++++++++---------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/python/pysmurf/client/util/stream_data_reader.pyx b/python/pysmurf/client/util/stream_data_reader.pyx index 97ca92ae..1506d02c 100644 --- a/python/pysmurf/client/util/stream_data_reader.pyx +++ b/python/pysmurf/client/util/stream_data_reader.pyx @@ -80,14 +80,12 @@ cdef class FastSmurfReader: cdef FILE* file_ptr cdef bytes filename cdef long file_size - cdef bint is_rogue cdef long records_read cdef uint8_t header_buffer[128] cdef uint8_t rogue_buffer[8] - def __cinit__(self, str filename, bint is_rogue=True): + def __cinit__(self, str filename): self.filename = filename.encode('utf-8') - self.is_rogue = is_rogue self.records_read = 0 self.file_ptr = NULL @@ -235,41 +233,40 @@ cdef class FastSmurfReader: break # Read Rogue headers to find data channel - if self.is_rogue: - found_data = False - while not found_data: - # Check for EOF - if ftell(self.file_ptr) >= self.file_size: - break + found_data = False + while not found_data: + # Check for EOF + if ftell(self.file_ptr) >= self.file_size: + break - # Read Rogue header - bytes_read = fread(self.rogue_buffer, 1, ROGUE_HEADER_SIZE, self.file_ptr) - if bytes_read != ROGUE_HEADER_SIZE: - break + # Read Rogue header + bytes_read = fread(self.rogue_buffer, 1, ROGUE_HEADER_SIZE, self.file_ptr) + if bytes_read != ROGUE_HEADER_SIZE: + break - # Parse header - with nogil: - parse_rogue_header(self.rogue_buffer, &rogue_hdr) + # Parse header + with nogil: + parse_rogue_header(self.rogue_buffer, &rogue_hdr) - rogue_payload = rogue_hdr.size - 4 + rogue_payload = rogue_hdr.size - 4 - # If data channel, process it - if rogue_hdr.channel == 0: - found_data = True - break - else: - # read into metadata buffer - if skip_meta: - fseek(self.file_ptr, rogue_payload, SEEK_CUR) - else: - meta_buffer = bytearray(rogue_payload) - bytes_read = fread(meta_buffer, 1, rogue_payload, self.file_ptr) - if bytes_read != rogue_payload: - break - meta_list.append((self.records_read, meta_buffer.decode('utf-8'))) - - if not found_data: + # If data channel, process it + if rogue_hdr.channel == 0: + found_data = True break + else: + # read into metadata buffer + if skip_meta: + fseek(self.file_ptr, rogue_payload, SEEK_CUR) + else: + meta_buffer = bytearray(rogue_payload) + bytes_read = fread(meta_buffer, 1, rogue_payload, self.file_ptr) + if bytes_read != rogue_payload: + break + meta_list.append((self.records_read, meta_buffer.decode('utf-8'))) + + if not found_data: + break # Read SMURF header bytes_read = fread(self.header_buffer, 1, SMURF_HEADER_SIZE, self.file_ptr) @@ -351,7 +348,7 @@ def read_stream_data_cython(str datafile, channel=None, bint IQ_mode=False, bint int i, j # Open file and read all records in C - with FastSmurfReader(datafile, is_rogue=True) as reader: + with FastSmurfReader(datafile) as reader: timestamps, data_array, headers, meta = reader.read_all_records( skip_meta=skip_meta, channel=channel )