Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use datafusion_distributed::{
ChannelResolver, DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule,
Worker, WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
ChannelResolver, DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, Worker,
WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics,
};
use futures::{StreamExt, TryFutureExt};
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_default_features()
.with_runtime_env(Arc::clone(&runtime_env))
.with_distributed_worker_resolver(Ec2WorkerResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.with_distributed_broadcast_joins(cmd.broadcast_joins)?
.build();
let ctx = SessionContext::from(state);
Expand Down
7 changes: 2 additions & 5 deletions benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ use datafusion::prelude::*;
use datafusion_distributed::test_utils::benchmarks_common;
use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver;
use datafusion_distributed::test_utils::{clickbench, tpcds, tpch};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, NetworkBoundaryExt, Worker,
};
use datafusion_distributed::{DistributedExt, NetworkBoundaryExt, SessionStateBuilderExt, Worker};
use std::error::Error;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use structopt::StructOpt;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -179,7 +176,7 @@ impl RunOpt {
.with_default_features()
.with_config(self.config()?)
.with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone()))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.with_distributed_files_per_task(
self.files_per_task.unwrap_or(get_available_parallelism()),
)?
Expand Down
4 changes: 2 additions & 2 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_cli::{
use datafusion_distributed::test_utils::in_memory_channel_resolver::{
InMemoryChannelResolver, InMemoryWorkerResolver,
};
use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule};
use datafusion_distributed::{DistributedExt, SessionStateBuilderExt};
use std::env;
use std::path::Path;
use std::process::ExitCode;
Expand Down Expand Up @@ -148,7 +148,7 @@ async fn main_inner() -> Result<()> {
.with_default_features()
.with_config(session_config)
.with_runtime_env(runtime_env)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.with_distributed_worker_resolver(InMemoryWorkerResolver::new(16))
.with_distributed_channel_resolver(InMemoryChannelResolver::default())
.build();
Expand Down
4 changes: 2 additions & 2 deletions console/examples/console_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver, display_plan_ascii,
DistributedExt, SessionStateBuilderExt, WorkerResolver, display_plan_ascii,
};
use futures::TryStreamExt;
use std::error::Error;
Expand Down Expand Up @@ -39,7 +39,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.with_distributed_files_per_task(1)?
.build();

Expand Down
4 changes: 2 additions & 2 deletions console/examples/tpcds_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use async_trait::async_trait;
use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver};
use datafusion_distributed::{DistributedExt, SessionStateBuilderExt, WorkerResolver};
use std::error::Error;
use std::path::Path;
use std::sync::Arc;
Expand Down Expand Up @@ -74,7 +74,7 @@ async fn run_queries(
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.build();

let ctx = SessionContext::from(state);
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/channel-resolver.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() {
let state = SessionStateBuilder::new()
// these two are mandatory.
.with_distributed_worker_resolver(todo!())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
// the CustomChannelResolver needs to be passed here once...
.with_distributed_channel_resolver(channel_resolver.clone())
.build();
Expand Down
17 changes: 9 additions & 8 deletions docs/source/user-guide/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ You'll see these concepts mentioned extensively across the documentation and the

Some other more tangible concepts are the structs and traits exposed publicly, the most important are:

## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs)

A physical optimizer rule that transforms single-node DataFusion query plans into distributed query plans. It reads
a fully formed physical plan and injects the appropriate nodes to execute the query in a distributed fashion.

It builds the distributed plan from bottom to top, injecting network boundaries at appropriate locations based on
the nodes present in the original plan.

## [Worker](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/flight_service/worker.rs)

Arrow Flight server implementation that integrates with the Tonic ecosystem and listens to serialized plans that get
Expand Down Expand Up @@ -70,3 +62,12 @@ data. If you are on the task with index 2, you might want to return the last 1/3

Optional extension trait that allows to customize how connections are established to workers. Given one of the
URLs returned by the `WorkerResolver`, it builds an Arrow Flight client ready for serving queries.

## [NetworkBoundaryPlaceholder](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/network_boundary_placeholder.rs)

A custom ExecutionPlan implementation that acts as a placeholder for the distributed planner to place network boundaries
that spawn the specified amount of tasks.

Users can create their own physical optimizer rules and place them before this project's ApplyNetworkBoundaries rule
in order to customize their distributed plan.

4 changes: 2 additions & 2 deletions docs/source/user-guide/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl WorkerResolver for LocalhostWorkerResolver {
}
```

Register both the `WorkerResolver` implementation and the `DistributedPhysicalOptimizerRule` in DataFusion's
Register both the `WorkerResolver` implementation and the distributed physical optimization rules in DataFusion's
`SessionStateBuilder` to enable distributed query planning:

```rs
Expand All @@ -59,7 +59,7 @@ let localhost_worker_resolver = LocalhostWorkerResolver {

let state = SessionStateBuilder::new()
.with_distributed_worker_resolver(localhost_worker_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.build();

let ctx = SessionContext::from(state);
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/worker-resolver.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl WorkerResolver for CustomWorkerResolver {
async fn main() {
let state = SessionStateBuilder::new()
.with_distributed_worker_resolver(CustomWorkerResolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.build();
}
```
Expand Down
6 changes: 3 additions & 3 deletions examples/custom_execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use datafusion_distributed::test_utils::in_memory_channel_resolver::{
InMemoryChannelResolver, InMemoryWorkerResolver,
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedTaskContext, TaskEstimation,
TaskEstimator, WorkerQueryContext, display_plan_ascii,
DistributedExt, DistributedTaskContext, SessionStateBuilderExt, TaskEstimation, TaskEstimator,
WorkerQueryContext, display_plan_ascii,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf;
Expand Down Expand Up @@ -382,7 +382,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_config(config)
.with_distributed_worker_resolver(worker_resolver)
.with_distributed_channel_resolver(channel_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.with_distributed_user_codec(NumbersExecCodec)
.with_distributed_task_estimator(NumbersTaskEstimator)
.build();
Expand Down
5 changes: 2 additions & 3 deletions examples/in_memory_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, Worker,
BoxCloneSyncChannel, ChannelResolver, DistributedExt, SessionStateBuilderExt, Worker,
WorkerQueryContext, WorkerResolver, create_flight_client, display_plan_ascii,
};
use futures::TryStreamExt;
use hyper_util::rt::TokioIo;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use tonic::transport::{Endpoint, Server};

Expand All @@ -38,7 +37,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_default_features()
.with_distributed_worker_resolver(InMemoryWorkerResolver)
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.with_distributed_files_per_task(1)?
.build();

Expand Down
5 changes: 2 additions & 3 deletions examples/localhost_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver, display_plan_ascii,
DistributedExt, SessionStateBuilderExt, WorkerResolver, display_plan_ascii,
};
use futures::TryStreamExt;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use url::Url;

Expand Down Expand Up @@ -39,7 +38,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_physical_optimizer_rules()
.with_distributed_files_per_task(1)?
.build();

Expand Down
14 changes: 6 additions & 8 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ pub trait DistributedExt: Sized {
/// # use datafusion::prelude::SessionConfig;
/// # use url::Url;
/// # use std::sync::Arc;
/// # use datafusion_distributed::{BoxCloneSyncChannel, WorkerResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext};
/// # use datafusion_distributed::{BoxCloneSyncChannel, WorkerResolver, DistributedExt, SessionStateBuilderExt, WorkerQueryContext};
///
/// struct CustomWorkerResolver;
///
Expand All @@ -212,9 +212,8 @@ pub trait DistributedExt: Sized {
/// // This tweaks the SessionState so that it can plan for distributed queries and execute them.
/// let state = SessionStateBuilder::new()
/// .with_distributed_worker_resolver(CustomWorkerResolver)
/// // the DistributedPhysicalOptimizerRule also needs to be passed so that query plans
/// // get distributed.
/// .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
/// // Adds the necessary rules so that the query gets distributed.
/// .with_distributed_physical_optimizer_rules()
/// .build();
/// ```
fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
Expand Down Expand Up @@ -243,7 +242,7 @@ pub trait DistributedExt: Sized {
/// # use datafusion::prelude::SessionConfig;
/// # use url::Url;
/// # use std::sync::Arc;
/// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext};
/// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, WorkerQueryContext, SessionStateBuilderExt};
///
/// struct CustomChannelResolver;
///
Expand All @@ -258,9 +257,8 @@ pub trait DistributedExt: Sized {
/// // This tweaks the SessionState so that it can plan for distributed queries and execute them.
/// let state = SessionStateBuilder::new()
/// .with_distributed_channel_resolver(CustomChannelResolver)
/// // the DistributedPhysicalOptimizerRule also needs to be passed so that query plans
/// // get distributed.
/// .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
/// // Adds the necessary rules so that the query gets distributed.
/// .with_distributed_physical_optimizer_rules()
/// .build();
///
/// // This function can be provided to a Worker so that, upon receiving a distributed
Expand Down
4 changes: 2 additions & 2 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ impl ConfigExtension for DistributedConfig {
// FIXME: Ideally, both ChannelResolverExtension and TaskEstimators would be passed as
// extensions in SessionConfig's AnyMap instead of the ConfigOptions. However, we need
// to pass this as ConfigOptions as we need these two fields to be present during
// planning in the DistributedPhysicalOptimizerRule, and the signature of the optimize()
// method there accepts a ConfigOptions instead of a SessionConfig.
// distributed planning, and the signature of the optimize() method there accepts a
// ConfigOptions instead of a SessionConfig.
// The following PR addresses this: https://github.com/apache/datafusion/pull/18168
// but it still has not been accepted or merged.
// Because of this, all the boilerplate trait implementations below are needed.
Expand Down
Loading