Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 67 additions & 10 deletions buckaroo/dataflow/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ def __init__(self, raw_df):
autocleaning_klass = SentinelAutocleaning
autoclean_conf = tuple()

# Generic per-scope stat skiplist. Maps scope name → set of stat
# names that should NOT run when computing that scope's SD. Used
# to keep expensive stats (e.g. ``histogram`` on xorq, where per-
# column engine queries dominate latency) out of the hot path
# while still rendering on the cached raw scope. Honoured by
# ``_summary_sd`` (filt scope) and ``_populate_sd_cache`` (raw +
# clean scopes). Default empty = run everything; XorqDataflow
# overrides this to skip histograms on filt + clean.
skip_stats_by_scope: TDict[str, set] = {}

command_config = Dict({}).tag(sync=True)
operation_results = Dict({'transformed_df':None,
'generated_py_code': ""})
Expand Down Expand Up @@ -237,7 +247,7 @@ def processed_sd(self) -> SDType:
return self.processed_result[1]
return {}

def _get_summary_sd(self, df:pd.DataFrame) -> Tuple[SDType, TAny]:
def _get_summary_sd(self, df:pd.DataFrame, skip_stat_names=None) -> Tuple[SDType, TAny]:
analysis_klasses = self.analysis_klasses
if analysis_klasses == "foo":
return {'some-col': {'foo':8}}, {}
Expand All @@ -264,7 +274,20 @@ def _summary_sd(self, change):
if (id(df), id(klasses)) == self._summary_sd_cache_key:
return
self._summary_sd_cache_key = (id(df), id(klasses))
result_summary_sd, errs = self._get_summary_sd(df)
# filt scope: ``summary_sd`` is what gets stored under the filt
# cache key in ``_populate_sd_cache``, so apply the filt skip
# here when a filter is active. We can't use ``operations`` for
# the filter check — it hasn't been updated yet in the cascade
# (``_operation_result`` sets ``self.cleaned`` first, triggering
# this observer via ``processed_result``, *then* sets
# ``self.operations``). ``quick_command_args`` is the upstream
# trait, set before ``_operation_result`` runs.
filter_active = bool(self.quick_command_args)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Determine filt-skip from realized filter ops

Use of bool(self.quick_command_args) here can misclassify "no filter" states as filtered. generate_quick_ops explicitly treats payloads like {'search': ['']} as no-op, so quick_command_args may be non-empty while no quick-command op exists; in that case _summary_sd still applies the filt skip, and because raw/clean/filt chains collapse to one key, _populate_sd_cache can store the skipped summary_sd under the shared key and skip recomputing raw, dropping bare stats (e.g. raw histogram) after clearing search.

Useful? React with 👍 / 👎.

filt_skip = (
self.skip_stats_by_scope.get('filt')
if (filter_active and self.skip_stats_by_scope)
else None)
result_summary_sd, errs = self._get_summary_sd(df, skip_stat_names=filt_skip)
self.summary_sd = result_summary_sd
self.errs = errs

Expand Down Expand Up @@ -500,13 +523,35 @@ def _compute_scope_df(self, scope: str):
return base
return pp_result[0] if pp_result else base

def _scope_cache_key(self, chain):
def _effective_skip(self, scope, chains):
"""The skip list to apply for ``scope`` given the current
``chains`` shape. Returns ``None`` when the scope is effectively
identical to its parent scope (no filter active → filt is filt-
of-clean; no cleaning → clean is just raw). In those degenerate
cases applying a separate per-scope skip would create a phantom
distinct cache entry, triggering unnecessary recomputes (e.g.
lazy postprocessor re-runs from ``_compute_scope_df``)."""
if not self.skip_stats_by_scope:
return None
if scope == 'raw':
return self.skip_stats_by_scope.get('raw')
if scope == 'clean':
if chains['clean'] == chains['raw']:
return None
return self.skip_stats_by_scope.get('clean')
if scope == 'filt':
if chains['filt'] == chains['clean']:
return None
return self.skip_stats_by_scope.get('filt')
return None

def _scope_cache_key(self, chain, scope=None, chains=None):
"""Hash that identifies a scope's SD-input identity.

Includes the op chain *and* an identifier for the source
dataframe (``id(sampled_df)``) *and* the post-processing method
— all three are inputs to the scope df, and a cache hit must
mean "same SD-producing inputs" not just "same chain".
*and* the per-scope effective skip — a cache hit must mean
"same SD-producing inputs" not just "same chain".

- sampled_df identity addresses codex P1 on #783: a ``raw_df``
swap with an unchanged chain must invalidate.
Expand All @@ -515,13 +560,19 @@ def _scope_cache_key(self, chain):
post-processing replaces the df entirely (e.g. ``hide_post``
→ ``SENTINEL_DF``), the raw scope's SD must reflect that
new df, not the pre-post-processing one.
- effective skip: keeps the no-filter / no-cleaning case
collapsing raw + clean + filt under one key (see
``_effective_skip``) while still segregating when the skip
actually applies.

analysis_klasses is *not* included here; that's a separate
invariant (codex P2, deferred — see follow-up issue).
"""
sampled_id = id(self.sampled_df) if self.sampled_df is not None else 0
pp = self.post_processing_method or ''
return hash_chain(chain, extra=f"{sampled_id}|{pp}")
scope_skip = self._effective_skip(scope, chains) if (scope and chains is not None) else None
skip_part = '|'.join(sorted(scope_skip)) if scope_skip else ''
return hash_chain(chain, extra=f"{sampled_id}|{pp}|{skip_part}")

@observe('summary_sd', 'operations', 'analysis_klasses')
@exception_protect('sd-cache-protector')
Expand All @@ -548,7 +599,7 @@ def _populate_sd_cache(self, _change):
if self.processed_df is None:
return
chains = split_chain_by_scope(self.operations)
keys = {scope: self._scope_cache_key(chain)
keys = {scope: self._scope_cache_key(chain, scope=scope, chains=chains)
for scope, chain in chains.items()}
new_cache = dict(self.summary_stats_cache)
cache_grew = False
Expand All @@ -565,7 +616,8 @@ def _populate_sd_cache(self, _change):
scope_df = self._compute_scope_df(scope)
if scope_df is None:
continue
sd, _errs = self._get_summary_sd(scope_df)
scope_skip = self._effective_skip(scope, chains)
sd, _errs = self._get_summary_sd(scope_df, skip_stat_names=scope_skip)
new_cache[keys[scope]] = sd
cache_grew = True

Expand Down Expand Up @@ -605,11 +657,16 @@ def _build_error_dataframe(self, e):
### start summary stats block
#TAny closer to some error type
@override
def _get_summary_sd(self, processed_df:pd.DataFrame) -> Tuple[SDType, TDict[str, TAny]]:
def _get_summary_sd(self, processed_df:pd.DataFrame, skip_stat_names=None) -> Tuple[SDType, TDict[str, TAny]]:
# ``skip_stat_names`` is threaded through from the per-scope
# ``skip_stats_by_scope`` config — lets a dataflow declare
# "don't run stat X on scope Y" without touching analysis_klasses
# globally. DfStatsV2 / XorqDfStatsV2 forward this to the
# underlying pipeline.
stats = self.DFStatsClass(
processed_df,
self.analysis_klasses,
self.df_name, debug=self.debug)
self.df_name, debug=self.debug, skip_stat_names=skip_stat_names)
sdf = stats.sdf
if stats.errs:
if self.debug:
Expand Down
12 changes: 11 additions & 1 deletion buckaroo/pluggable_analysis_framework/analysis_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,24 @@ def verify_analysis_objects(kls, col_analysis_objs:AObjs):
kls.ap_class(col_analysis_objs)

def __init__(self, df_stats_df:pd.DataFrame, col_analysis_objs:AObjs,
operating_df_name:str=None, debug:bool=False) -> None:
operating_df_name:str=None, debug:bool=False, skip_stat_names=None) -> None:
# ``skip_stat_names`` is accepted for API parity with DfStatsV2 /
# XorqDfStatsV2 (it's threaded through from the dataflow's
# ``skip_stats_by_scope`` config). The v1 ``AnalysisPipeline``
# doesn't support per-stat skipping, so we strip at the output
# level instead.
self.df = self.get_operating_df(df_stats_df, force_full_eval=False)
self.col_order = self.df.columns
self.ap = self.ap_class(col_analysis_objs)
self.operating_df_name = operating_df_name
self.debug = debug

self.sdf, self.errs = self.ap.process_df(self.df, self.debug)
if skip_stat_names:
for col_stats in self.sdf.values():
for k in list(col_stats.keys()):
if k in skip_stat_names:
del col_stats[k]
if self.errs:
output_full_reproduce(self.errs, self.sdf, operating_df_name)

Expand Down
14 changes: 9 additions & 5 deletions buckaroo/pluggable_analysis_framework/df_stats_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ def verify_analysis_objects(cls, col_analysis_objs: AObjs) -> None:
cls.ap_class(col_analysis_objs)

def __init__(self, df_stats_df: pd.DataFrame, col_analysis_objs: AObjs, operating_df_name: str = None,
debug: bool = False) -> None:
debug: bool = False, skip_stat_names=None) -> None:
self.df = self.get_operating_df(df_stats_df, force_full_eval=False)
self.col_order = self.df.columns
self.ap = self.ap_class(col_analysis_objs)
self.operating_df_name = operating_df_name
self.debug = debug

# Process using v1-compatible output format
self.sdf, self.errs = self.ap.process_df_v1_compat(self.df, self.debug)
# ``skip_stat_names`` is the per-scope skiplist threaded through
# from the dataflow's ``skip_stats_by_scope`` config.
self.sdf, self.errs = self.ap.process_df_v1_compat(self.df, self.debug,
skip_stat_names=skip_stat_names)
self.stat_errors = []

if self.errs:
Expand Down Expand Up @@ -94,10 +96,12 @@ def get_operating_df(self, df):
return df.sample(n=min(50_000, rows), seed=42)
return df

def __init__(self, df, col_analysis_objs, operating_df_name=None, debug=False):
def __init__(self, df, col_analysis_objs, operating_df_name=None, debug=False,
skip_stat_names=None):
self.df = self.get_operating_df(df)
self.ap = StatPipeline(col_analysis_objs, unit_test=False)
self.sdf, self.errs = self.ap.process_df_v1_compat(self.df, debug)
self.sdf, self.errs = self.ap.process_df_v1_compat(self.df, debug,
skip_stat_names=skip_stat_names)
self.stat_errors = []
if self.errs:
output_full_reproduce(self.errs, self.sdf, operating_df_name)
Expand Down
43 changes: 34 additions & 9 deletions buckaroo/pluggable_analysis_framework/stat_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,19 @@ def __init__(self, stat_funcs: list, unit_test: bool = True, record_timings: boo
self._unit_test_result = self.unit_test()

def process_column(self, column_name: str, column_dtype, raw_series=None, sampled_series=None, raw_dataframe=None,
initial_stats: Optional[Dict[str, Any]] = None) -> Tuple[Dict[str, Any], List[StatError]]:
initial_stats: Optional[Dict[str, Any]] = None,
skip_stat_names=None) -> Tuple[Dict[str, Any], List[StatError]]:
"""Process a single column through the stat DAG.

1. Filters stat functions by column dtype
2. Executes in topological order with Ok/Err accumulator
3. Returns (plain_dict, errors)
2. Filters by ``skip_stat_names`` — explicit per-stat-name
skiplist threaded through from the dataflow's
``skip_stats_by_scope`` config (e.g. histograms off on
filt/clean for xorq)
3. Executes in topological order with Ok/Err accumulator
4. Returns (plain_dict, errors)
"""
skip_stat_names = skip_stat_names or set()
# Build column-specific DAG (filters by dtype)
external = set(self.EXTERNAL_KEYS)
if initial_stats:
Expand All @@ -255,6 +261,8 @@ def process_column(self, column_name: str, column_dtype, raw_series=None, sample
accumulator[k] = Ok(v)
record_timings = self.record_timings
for sf in column_funcs:
if sf.name in skip_stat_names:
continue
if record_timings:
t0 = time.perf_counter()
_execute_stat_func(sf, accumulator, column_name, raw_series=raw_series, sampled_series=sampled_series,
Expand All @@ -268,11 +276,26 @@ def process_column(self, column_name: str, column_dtype, raw_series=None, sample
for sk in sf.provides:
col_key_to_func[sk.name] = sf

return resolve_accumulator(accumulator, column_name, col_key_to_func)

def process_df(self, df: pd.DataFrame, debug: bool = False) -> Tuple[SDType, List[StatError]]:
result, errors = resolve_accumulator(accumulator, column_name, col_key_to_func)
# Output-level strip — covers v1 ColAnalysis wrappers where one
# ``StatFunc.name`` (e.g. ``DefaultSummaryStats__series``)
# provides many keys (``mean``, ``max``, ...). The input-level
# skip above can't tell which wrapper to drop; the output-level
# strip removes any key the caller asked to skip regardless of
# which producing stat func it came from.
if skip_stat_names:
for k in list(result.keys()):
if k in skip_stat_names:
del result[k]
return result, errors

def process_df(self, df: pd.DataFrame, debug: bool = False,
skip_stat_names=None) -> Tuple[SDType, List[StatError]]:
"""Process all columns of a DataFrame.

``skip_stat_names`` is the per-stat-name skiplist threaded
through from the dataflow's ``skip_stats_by_scope`` config.

Returns:
(summary_dict, all_errors) where summary_dict is SDType-compatible
(column_name -> {stat_name -> value}).
Expand All @@ -292,22 +315,24 @@ def process_df(self, df: pd.DataFrame, debug: bool = False) -> Tuple[SDType, Lis

col_result, col_errors = self.process_column(column_name=rewritten_col_name, column_dtype=col_dtype,
raw_series=ser, sampled_series=ser, raw_dataframe=df,
initial_stats={'orig_col_name': orig_col_name, 'rewritten_col_name': rewritten_col_name})
initial_stats={'orig_col_name': orig_col_name, 'rewritten_col_name': rewritten_col_name},
skip_stat_names=skip_stat_names)

summary[rewritten_col_name] = col_result
all_errors.extend(col_errors)

return summary, all_errors

def process_df_v1_compat(self, df: pd.DataFrame, debug: bool = False) -> Tuple[SDType, ErrDict]:
def process_df_v1_compat(self, df: pd.DataFrame, debug: bool = False,
skip_stat_names=None) -> Tuple[SDType, ErrDict]:
"""Process DataFrame with v1-compatible error format.

Returns (SDType, ErrDict) matching the v1 AnalysisPipeline interface.
Used by DfStatsV2/PlDfStatsV2 (and via _find_v1_class, by any caller
that mixes ColAnalysis subclasses into the input list — DataFlow,
autocleaning, server.data_loading, polars_buckaroo).
"""
summary, errors = self.process_df(df, debug=debug)
summary, errors = self.process_df(df, debug=debug, skip_stat_names=skip_stat_names)

# Convert StatError list to v1 ErrDict format
errs: ErrDict = {}
Expand Down
29 changes: 23 additions & 6 deletions buckaroo/pluggable_analysis_framework/xorq_stat_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ def unit_test(self) -> Tuple[bool, List[StatError]]:
finally:
self.backend = saved_backend

def process_table(self, table) -> Tuple[SDType, List[StatError]]:
def process_table(self, table, skip_stat_names=None) -> Tuple[SDType, List[StatError]]:
"""Run the pipeline. ``skip_stat_names`` is an explicit per-
stat-name skiplist — used by the dataflow's
``skip_stats_by_scope`` config (e.g. histograms on filt/clean
scopes for xorq)."""
skip_stat_names = skip_stat_names or set()
schema = table.schema()
columns = list(table.columns)

Expand All @@ -188,6 +193,8 @@ def process_table(self, table) -> Tuple[SDType, List[StatError]]:
for sf in self.ordered_stat_funcs:
if not _is_batch_func(sf):
continue
if sf.name in skip_stat_names:
continue
xorq_col_param = next(r.name for r in sf.requires if r.type is XorqColumn)
for col in columns:
col_dtype = schema[col]
Expand Down Expand Up @@ -253,6 +260,10 @@ def process_table(self, table) -> Tuple[SDType, List[StatError]]:
# (typically the batch-phase stats).
if sf.provides and all(sk.name in col_accum for sk in sf.provides):
continue
# Per-scope name skiplist (e.g. ``histogram`` not run
# on filt/clean scopes for xorq).
if sf.name in skip_stat_names:
continue
_execute_stat_func(sf, col_accum, col, raw_series=None, sampled_series=None, raw_dataframe=None,
xorq_expr=table, xorq_execute=self._execute)

Expand Down Expand Up @@ -306,14 +317,15 @@ def add_stat(self, stat_func_or_class) -> Tuple[bool, List[StatError]]:

return True, []

def process_table_v1_compat(self, table) -> Tuple[SDType, ErrDict]:
def process_table_v1_compat(self, table, skip_stat_names=None) -> Tuple[SDType, ErrDict]:
"""Run process_table and convert errors to v1 ErrDict shape.

Used by XorqDfStatsV2 / DataFlow consumers expecting the same
``{(col, stat): (Exception, kls)}`` shape that AnalysisPipeline
produced.
produced. ``skip_stat_names`` threads through the per-scope
skiplist.
"""
summary, errors = self.process_table(table)
summary, errors = self.process_table(table, skip_stat_names=skip_stat_names)
errs: ErrDict = {}
for se in errors:
kls = _find_v1_class(se.stat_func, self._original_inputs) if se.stat_func else None
Expand Down Expand Up @@ -346,7 +358,8 @@ def verify_analysis_objects(cls, objs):
# (issue #709). DAG validation still runs as part of __init__.
XorqStatPipeline(objs, unit_test=False)

def __init__(self, table, col_analysis_objs, operating_df_name=None, debug=False):
def __init__(self, table, col_analysis_objs, operating_df_name=None, debug=False,
skip_stat_names=None):
self.table = table
# Skip the unit_test PERVERSE_DF run on each widget construction —
# it doubles the SQL query count (issue #709). The DAG-validation
Expand All @@ -355,7 +368,11 @@ def __init__(self, table, col_analysis_objs, operating_df_name=None, debug=False
self.ap = XorqStatPipeline(col_analysis_objs, unit_test=False)
self.operating_df_name = operating_df_name
self.debug = debug
self.sdf, self.errs = self.ap.process_table_v1_compat(self.table)
# skip_stat_names is the per-scope "don't run this stat" filter
# the dataflow passes when e.g. histograms are excluded from the
# filt/clean scopes. None means run everything.
self.sdf, self.errs = self.ap.process_table_v1_compat(self.table,
skip_stat_names=skip_stat_names)
self.stat_errors = []
if self.errs:
output_full_reproduce(self.errs, self.sdf, operating_df_name)
Expand Down
Loading
Loading