Skip to content

Split DistributedPhysicalOptimizerRule into multiple, more scoped ones.#357

Open
gabotechs wants to merge 4 commits intomainfrom
gabrielmusat/multiple-rule-split
Open

Split DistributedPhysicalOptimizerRule into multiple, more scoped ones.#357
gabotechs wants to merge 4 commits intomainfrom
gabrielmusat/multiple-rule-split

Conversation

@gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Feb 24, 2026

Closes #347

This PR ships two main things:

Split the DistributedPhysicalOptimizerRule into multiple, more scoped ones

Previously, all the planning in the project was done in one rule. Now, it's split in multiple more composable ones, so that users are free to insert their own rules in the middle:

StartDistributedContext
AddCoalesceOnTop
InsertBroadcast
InjectNetworkBoundaryPlaceholders
ApplyNetworkBoundaries
BatchCoalesceBelowBoundaries
EndDistributedContext

The big bulk of the work happens in:

  • InjectNetworkBoundaryPlaceholders which injects network boundary placeholders in the middle of the plan, that just contain information about the network boundary type and the number of input tasks.
  • ApplyNetworkBoundaries which reads the network boundary placeholders and inject the actual network boundaries.

Expose the NetworkBoundaryPlaceholder as part of the public API

In #347, we discussed about exposing the annotations system as part of the public API, and apache/datafusion#20396 upstream aims to provide such a public API in vanilla DataFusion.

While that's powerful, after some consideration, this project would have a hard time exposing a stable API if the full power of the annotations is exposed, so instead, another approach is attempted:

  • All the annotation system (AnnotatedPan struct, etc...) is made private, is not even exposed outside of the rule_4_inject_network_boundary_placeholders.rs file, so it's private even to this project itself.
  • The new API for customizing distributed plan is given in the form of NetworkBoundaryPlaceholders, which are planning-time ExecutionPlan implementations with a very narrow API:
    • The NetworkBoundary kind to inject (coalesce, shuffle, or broadcast)
    • The input task count of the stage below

The chain of optimizer rules use this same NetworkBoundaryPlaceholder API for injecting its network boundaries, which is what allows making AnnotatedPlan private even within the project, and at the same time, allows users to inject their own rules in between InjectNetworkBoundaryPlaceholders and ApplyNetworkBoundaries, or even replace InjectNetworkBoundaryPlaceholders all together with their own custom implementation.

This test demonstrates how to use the NetworkBoundaryPlaceholder API for customizing a distributed plan:
https://github.com/datafusion-contrib/datafusion-distributed/blob/2dd97539bf736e83c1697b763c5ba1c724a1e69d/tests/custom_network_boundary_placeholders.rs

@gabrielkerr
Copy link
Contributor

Thank you for making this! Looking to review this over the next day.

@gabotechs gabotechs force-pushed the gabrielmusat/multiple-rule-split branch from 070287d to 4512cb1 Compare February 25, 2026 06:31
Copy link
Contributor

@gabrielkerr gabrielkerr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like where this is headed! Looking forward to your thoughts on the feedback.

/// Note that there are restrictions around where can the [NetworkBoundaryPlaceholder]s be placed,
/// for example:
/// - A [NetworkBoundaryKind::Broadcast] needs to be placed right above a `BroadcastExec` node.
/// - A [NetworkBoundaryKind::Shuffle] needs to be placed right about a `RepartitionExec` node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// - A [NetworkBoundaryKind::Shuffle] needs to be placed right about a `RepartitionExec` node.
/// - A [NetworkBoundaryKind::Shuffle] needs to be placed right above a `RepartitionExec` node.

/// where should the network boundaries be placed, and what task count should be used for the
/// stage below.
///
/// Note that there are restrictions around where can the [NetworkBoundaryPlaceholder]s be placed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Note that there are restrictions around where can the [NetworkBoundaryPlaceholder]s be placed,
/// Note that there are restrictions around where the [NetworkBoundaryPlaceholder]s can be placed,

pub kind: NetworkBoundaryKind,
/// The task count for the input stage of this network boundary.
///
/// Note that the task count for this network boundary is decided by the other network boundary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this placeholder sits between Stage N (below) and Stage N+1 (above):

Field Refers to
input The ExecutionPlan for Stage N
input_task_count Task count for Stage N
"task count for this network boundary" Task count for Stage N+1 (output side)
"boundary immediately above" The boundary between Stage N+1 and N+2

Could we clarify the comments with explicit stage references?

  • input_task_count: The number of tasks in Stage N (the stage below this boundary that executes input)
  • The task count for Stage N+1 (this boundary's output/consumer stage) is determined by the input_task_count of the
    NetworkBoundaryPlaceholder above this one, if any.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case input_task_count is the task count for Stage N+1, not Stage N. Network boundaries have no say in the amount of tasks they run on, they just control the amount of tasks below them.

{
return Ok(Transformed::yes(Arc::new(NetworkBoundaryPlaceholder {
kind: NetworkBoundaryKind::Coalesce,
input_task_count: nb.input_task_count.div_ceil(2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my comment on NetworkBoundaryPlaceholder above, task_count/2 is the number of tasks the Arc::new(NetworkBoundaryPlaceholder {...} is expecting to receive from the (now) child coalesce boundary? If so, maybe it makes more sense to rename input_task_count to output_task_count?

Please correct me I misunderstand 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I think it really is input_task_count the name we are looking for. It's the amount of tasks that will input data to the network boundary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe with the comment above is more clear?

use std::sync::Arc;

#[tokio::test]
async fn custom_network_boundary_placeholder() -> Result<(), Box<dyn Error>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very helpful, thank you for adding this!

self
}

fn set_distributed_physical_optimizer_rules(&mut self) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a method for inserting custom rules similarly to how you've done insert_before in the test. Could we have something like below? I imagine that it would automatically add rules between rules 4 and 5 since that is the intended order described in your comments.

fn with_custom_network_boundary_rules(
      self,
      rules: impl IntoIterator<Item = Arc<dyn PhysicalOptimizerRule + Send + Sync>>
  ) -> Self;

Used like

  let builder = SessionStateBuilder::new()
      .with_distributed_physical_optimizer_rules()
      .with_custom_network_boundary_rules([Arc::new(TreeReductionRule::new(type_checker))]);

Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this might be worth it

plan: Arc<dyn ExecutionPlan>,
_: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If this is in the context of custom distributed rules, should this call Distributed::ensure? Even a comment describing that it should be called here might be enough to show the intended pattern if that is difficult to do in this test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 lets discuss first the approach and see if it makes sense to have a DistributedContext at all.

/// free to place their own, either by having more rules in between, or straight await replacing the
/// `InjectNetworkBoundaryPlaceholders` with a custom one.
#[derive(Debug)]
pub struct ApplyNetworkBoundaries;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking my understanding here:
Before rule_5, the user can insert custom PhysicalOptimizerRules. These custom rules can insert NetworkBoundaryPlaceholders and any ExecutionPlan nodes it likes. Then, rule_5 will convert the network boundary placeholders to the appropriate network boundary exec and leave all other ExecutionPlan nodes untouched. Do I have that right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I have that right?

Mostly yes, but converting to the appropriate network boundary exec does perform further modifications to the plans.

For example, NetworkShuffleExec will scale up the output partitions of its RepartitionExec below.

mod rule_6_batch_coalesce_below_boundaries;
mod rule_7_end_distributed_context;

pub use rule_1_start_distributed_context::StartDistributedContext;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you've split up the rule into smaller rules with clear responsibilities. However, I feel like splitting this up into 7 different rules has introduced a lot of complexity when there is really a single extension point between rules 4 and 5. I think swinging back towards a single optimizer rule makes more sense. I'll try to explain my thoughts below.

The current interface places the responsibility on the user to add custom physical optimizer rules between rules 4 and 5. Users should only worry about creating their custom rule, with the library inserting it in the correct position between rules 4 and 5.

I think we can keep the seven rules, but hide them (make them private to the crate) from the user. I think a cleaner interface would be to wrap these 7 rules in a single DistributedPhysicalOptimizerRule and add a single method for adding custom rules. This abstracts away the non-customizable details from the user and delegates inserting the custom rules in the correct place to the library.

Maybe something like this:

pub struct DistributedPhysicalOptimizerRule {
    custom_boundary_rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
}

impl DistributedPhysicalOptimizerRule {
    pub fn new() -> Self {
        Self { custom_boundary_rules: vec![] }
    }

    pub fn with_custom_boundary_rule(
        mut self,
        rule: impl PhysicalOptimizerRule + Send + Sync + 'static
    ) -> Self {
        self.custom_boundary_rules.push(Arc::new(rule));
        self
    }
}

The internal optimize() would then:

  1. Wrap in DistributedContext (current rule 1)
  2. Add coalesce on top (current rule 2)
  3. Insert broadcast (current rule 3)
  4. Run each custom boundary rule in order
  5. Apply network boundaries (current rule 5)
  6. Batch coalesce below boundaries (current rule 6)
  7. Unwrap DistributedContext (current rule 7)

You'll notice I've replaced rule 4 with the custom rules. The current rule 4 adds some placeholders that the user may not want, introducing complexity for removing these boundaries if the user wants to completely customize the network boundary placeholders from the start. In this case I think rule 4 is a reasonable default, but should be able to be replaced (or added to I suppose) by the users custom rules.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I feel like splitting this up into 7 different rules has introduced a lot of complexity when there is really a single extension point between rules 4 and 5

Yes... realistically speaking, I'd only expect users to either replace rule 4 with their own, or inject an additional one between 4 and 5.

I think we can keep the seven rules, but hide them (make them private to the crate) from the user. I think a cleaner interface would be to wrap these 7 rules in a single DistributedPhysicalOptimizerRule and add a single method for adding custom rules

👍 this sounds reasonable. It would significantly simplify the code, as we wouldn't need to do that DistributedContext workaround for propagating the original single-node plan.

Copy link
Contributor

@kurtvolmar kurtvolmar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for tackling this. A concern I have is that a custom PhysicalOptimizerRule using the NetworkBoundaryPlaceholder paradigm will likely collide with the InjectNetworkBoundaryPlaceholders. In the way this is set up now, we would need to at plan-time decide whether to use the custom rule or the default InjectNetworkBoundaryPlaceholders (this would probably be yet another PhysicalOptimizerRule we need to implement). I would love to explore either further extensibility into rule 4 or some checks to whether a previous rule has inserted NetworkBoundaryPlaceholders around a node before adding more.

Comment on lines +31 to +32
/// The input [ExecutionPlan] that will run remotely on the stage below.
pub input: Arc<dyn ExecutionPlan>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, will this allow a custom PhysicalOptimizerRule using the NetworkBoundaryPlaceholder to rewrite its children?

For our extensibility use case are looking to replace the shuffle with a hierarchical aggregation, which will require us to rewrite the children below the placeholder. I'm not sure if this is what you're intending here, but we will likely use this field to this.

Copy link
Collaborator Author

@gabotechs gabotechs Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, will this allow a custom PhysicalOptimizerRule using the NetworkBoundaryPlaceholder to rewrite its children?

The scope of the NetworkBoundaryPlaceholder is just to inject network boundaries in the plan so that the ApplyNetworkBoundaries rule can then transform them to the equivalent Network*Exec, but nothing else.

Taking execution plans unrelated to Distributed DataFusion, like the classical Aggregate(partial) + RepartitionExec + Aggregate(final) and replacing them with a hierarchical pattern sounds like something that is better done in a previous rule, and at that point it should be a matter of just using the existing DataFusion tools for rewriting plans. I don't think this project has a say about that.

#[derive(Debug)]
pub struct InjectNetworkBoundaryPlaceholders;

impl PhysicalOptimizerRule for InjectNetworkBoundaryPlaceholders {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this model, if I create a PhysicalOptimizerRule with a NetworkBoundaryPlaceholder targeting a hash RepartitionExec and insert it before InjectNetworkBoundaryPlaceholders, this will cause a "collision" on on the custom NetworkBoundaryPlaceholder and the the Shuffle logic in this rule. Both rules will be targeting the same node and attempting to inject NetworkBoundaryPlaceholders.

I'd be interested in seeing some detection whether a NetworkBoundaryPlaceholder already exists for a node, so that custom rules can play nicely with InjectNetworkBoundaryPlaceholders.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, this would be a nice addition, although i'd argue that if you have a custom rule that is injecting its own shuffles in the plan... you probably don't want the default InjectNetworkBoundaryPlaceholders to kick it at all.

A valid stance that comes to mind is that it's the users responsibility to ensure the plan is correct once it reaches ApplyNetworkBoundaries

@gabotechs
Copy link
Collaborator Author

Thanks for the feedback guys! I'll get back to this one shortly.

@gabotechs
Copy link
Collaborator Author

Bringing back the discussion to #347

pub(super) children: Vec<AnnotatedPlan>,
children: Vec<AnnotatedPlan>,

// annotation fields
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover comment I believe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Need for Customizable Plan Annotation and Network Boundary Logic in DFD

4 participants