@@ -29,7 +29,7 @@ use crate::{
2929use arrow:: array:: { Array , RecordBatch , UInt32Array } ;
3030use arrow:: compute:: { take, TakeOptions } ;
3131use arrow:: datatypes:: DataType as ArrowDataType ;
32- use datafusion:: common:: ScalarValue ;
32+ use datafusion:: common:: { Result as DataFusionResult , ScalarValue } ;
3333use datafusion:: execution:: disk_manager:: DiskManagerMode ;
3434use datafusion:: execution:: memory_pool:: MemoryPool ;
3535use datafusion:: execution:: runtime_env:: RuntimeEnvBuilder ;
@@ -73,6 +73,7 @@ use std::path::PathBuf;
7373use std:: time:: { Duration , Instant } ;
7474use std:: { sync:: Arc , task:: Poll } ;
7575use tokio:: runtime:: Runtime ;
76+ use tokio:: sync:: mpsc;
7677
7778use crate :: execution:: memory_pools:: {
7879 create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config, MemoryPoolConfig ,
@@ -136,6 +137,8 @@ struct ExecutionContext {
136137 pub input_sources : Vec < Arc < GlobalRef > > ,
137138 /// The record batch stream to pull results from
138139 pub stream : Option < SendableRecordBatchStream > ,
140+ /// Receives batches from a spawned tokio task (async I/O path)
141+ pub batch_receiver : Option < mpsc:: Receiver < DataFusionResult < RecordBatch > > > ,
139142 /// Native metrics
140143 pub metrics : Arc < GlobalRef > ,
141144 // The interval in milliseconds to update metrics
@@ -287,6 +290,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
287290 scans : vec ! [ ] ,
288291 input_sources,
289292 stream : None ,
293+ batch_receiver : None ,
290294 metrics,
291295 metrics_update_interval,
292296 metrics_last_update_time : Instant :: now ( ) ,
@@ -530,21 +534,62 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
530534 // Each Comet native execution corresponds to a single Spark partition,
531535 // so we should always execute partition 0.
532536 let stream = root_op. native_plan . execute ( 0 , task_ctx) ?;
533- exec_context. stream = Some ( stream) ;
537+
538+ if exec_context. scans . is_empty ( ) {
539+ // No JVM data sources — spawn onto tokio so the executor
540+ // thread parks in blocking_recv instead of busy-polling.
541+ //
542+ // Channel capacity of 2 allows the producer to work one batch
543+ // ahead while the consumer processes the current one via JNI,
544+ // without buffering excessive memory. Increasing this would
545+ // trade memory for latency hiding if JNI/FFI overhead dominates;
546+ // decreasing to 1 would serialize production and consumption.
547+ let ( tx, rx) = mpsc:: channel ( 2 ) ;
548+ let mut stream = stream;
549+ get_runtime ( ) . spawn ( async move {
550+ while let Some ( batch) = stream. next ( ) . await {
551+ if tx. send ( batch) . await . is_err ( ) {
552+ break ;
553+ }
554+ }
555+ } ) ;
556+ exec_context. batch_receiver = Some ( rx) ;
557+ } else {
558+ exec_context. stream = Some ( stream) ;
559+ }
534560 } else {
535561 // Pull input batches
536562 pull_input_batches ( exec_context) ?;
537563 }
538564
539- // Enter the runtime once for the entire polling loop to avoid repeated
540- // Runtime::enter() overhead
565+ if let Some ( rx) = & mut exec_context. batch_receiver {
566+ match rx. blocking_recv ( ) {
567+ Some ( Ok ( batch) ) => {
568+ update_metrics ( & mut env, exec_context) ?;
569+ return prepare_output (
570+ & mut env,
571+ array_addrs,
572+ schema_addrs,
573+ batch,
574+ exec_context. debug_native ,
575+ ) ;
576+ }
577+ Some ( Err ( e) ) => {
578+ return Err ( e. into ( ) ) ;
579+ }
580+ None => {
581+ log_plan_metrics ( exec_context, stage_id, partition) ;
582+ return Ok ( -1 ) ;
583+ }
584+ }
585+ }
586+
587+ // ScanExec path: busy-poll to interleave JVM batch pulls with stream polling
541588 get_runtime ( ) . block_on ( async {
542589 loop {
543- // Polling the stream.
544590 let next_item = exec_context. stream . as_mut ( ) . unwrap ( ) . next ( ) ;
545591 let poll_output = poll ! ( next_item) ;
546592
547- // update metrics at interval
548593 // Only check time every 100 polls to reduce syscall overhead
549594 if let Some ( interval) = exec_context. metrics_update_interval {
550595 exec_context. poll_count_since_metrics_check += 1 ;
@@ -560,7 +605,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
560605
561606 match poll_output {
562607 Poll :: Ready ( Some ( output) ) => {
563- // prepare output for FFI transfer
564608 return prepare_output (
565609 & mut env,
566610 array_addrs,
@@ -570,43 +614,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
570614 ) ;
571615 }
572616 Poll :: Ready ( None ) => {
573- // Reaches EOF of output.
574- if exec_context. explain_native {
575- if let Some ( plan) = & exec_context. root_op {
576- let formatted_plan_str = DisplayableExecutionPlan :: with_metrics (
577- plan. native_plan . as_ref ( ) ,
578- )
579- . indent ( true ) ;
580- info ! (
581- "Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\
582- \n plan creation took {:?}:\
583- \n {formatted_plan_str:}",
584- plan. plan_id, stage_id, partition, exec_context. plan_creation_time
585- ) ;
586- }
587- }
617+ log_plan_metrics ( exec_context, stage_id, partition) ;
588618 return Ok ( -1 ) ;
589619 }
590- // A poll pending means the stream is not ready yet.
591620 Poll :: Pending => {
592- if exec_context. scans . is_empty ( ) {
593- // Pure async I/O (e.g., IcebergScanExec, DataSourceExec)
594- // Yield to let the executor drive I/O instead of busy-polling
595- tokio:: task:: yield_now ( ) . await ;
596- } else {
597- // Has ScanExec operators
598- // Busy-poll to pull batches from JVM
599- // TODO: Investigate if JNI calls are safe without block_in_place.
600- // block_in_place prevents Tokio from migrating this task to another thread,
601- // which is necessary because JNI env is thread-local. If we can guarantee
602- // thread safety another way, we could remove this wrapper for better perf.
603- tokio:: task:: block_in_place ( || {
604- pull_input_batches ( exec_context)
605- } ) ?;
606- }
607-
608- // Output not ready yet
609- continue ;
621+ // JNI call to pull batches from JVM into ScanExec operators.
622+ // block_in_place lets tokio move other tasks off this worker
623+ // while we wait for JVM data.
624+ tokio:: task:: block_in_place ( || pull_input_batches ( exec_context) ) ?;
610625 }
611626 }
612627 }
@@ -648,6 +663,21 @@ fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> Come
648663 }
649664}
650665
666+ fn log_plan_metrics ( exec_context : & ExecutionContext , stage_id : jint , partition : jint ) {
667+ if exec_context. explain_native {
668+ if let Some ( plan) = & exec_context. root_op {
669+ let formatted_plan_str =
670+ DisplayableExecutionPlan :: with_metrics ( plan. native_plan . as_ref ( ) ) . indent ( true ) ;
671+ info ! (
672+ "Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\
673+ \n plan creation took {:?}:\
674+ \n {formatted_plan_str:}",
675+ plan. plan_id, stage_id, partition, exec_context. plan_creation_time
676+ ) ;
677+ }
678+ }
679+ }
680+
651681fn convert_datatype_arrays (
652682 env : & ' _ mut JNIEnv < ' _ > ,
653683 serialized_datatypes : JObjectArray ,
0 commit comments