Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 131 additions & 29 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ pub struct NestedLoopJoinExec {
/// Each output stream waits on the `OnceAsync` to signal the completion of
/// the build(left) side data, and buffer them all for later joining.
build_side_data: OnceAsync<JoinLeftData>,
/// Shared left-side spill data for OOM fallback.
///
/// When `build_side_data` fails with OOM, the first partition to
/// initiate fallback spills the entire left side to disk. Other
/// partitions share the same spill file via this `OnceAsync`,
/// avoiding redundant re-execution of the left child.
left_spill_data: Arc<OnceAsync<LeftSpillData>>,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
/// Projection to apply to the output of the join
Expand Down Expand Up @@ -290,6 +297,7 @@ impl NestedLoopJoinExecBuilder {
join_type,
join_schema,
build_side_data: Default::default(),
left_spill_data: Arc::new(OnceAsync::default()),
column_indices,
projection,
metrics: Default::default(),
Expand Down Expand Up @@ -492,6 +500,7 @@ impl NestedLoopJoinExec {
right,
metrics: ExecutionPlanMetricsSet::new(),
build_side_data: Default::default(),
left_spill_data: Arc::new(OnceAsync::default()),
cache: Arc::clone(&self.cache),
filter: self.filter.clone(),
join_type: self.join_type,
Expand Down Expand Up @@ -655,6 +664,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
SpillState::Pending {
left_plan: Arc::clone(&self.left),
task_context: Arc::clone(&context),
left_spill_data: Arc::clone(&self.left_spill_data),
}
} else {
SpillState::Disabled
Expand Down Expand Up @@ -863,6 +873,20 @@ enum NLJState {
EmitLeftUnmatched,
Done,
}
/// Shared data for the left-side spill fallback.
///
/// When the in-memory `OnceFut` path fails with OOM, the first partition
/// spills the entire left side to disk. This struct holds the spill file
/// reference so other partitions can read from the same file.
pub(crate) struct LeftSpillData {
/// SpillManager used to read the spill file (has the left schema)
spill_manager: SpillManager,
/// The spill file containing all left-side batches
spill_file: RefCountedTempFile,
/// Left-side schema
schema: SchemaRef,
}

/// Tracks the state of the memory-limited spill fallback for NLJ.
///
/// The NLJ always starts with the standard OnceFut path. If the in-memory
Expand All @@ -882,6 +906,9 @@ pub(crate) enum SpillState {
left_plan: Arc<dyn ExecutionPlan>,
/// TaskContext for re-execution and SpillManager creation
task_context: Arc<TaskContext>,
/// Shared OnceAsync for left-side spill data. The first partition
/// to initiate fallback spills the left side; others share the file.
left_spill_data: Arc<OnceAsync<LeftSpillData>>,
},

/// Fallback has been triggered. Left data is being loaded in chunks
Expand All @@ -892,16 +919,20 @@ pub(crate) enum SpillState {
/// State for active memory-limited spill execution.
/// Boxed inside [`SpillState::Active`] to reduce enum size.
pub(crate) struct SpillStateActive {
/// Left input stream for incremental buffering
left_stream: SendableRecordBatchStream,
/// Shared future for left-side spill data. All partitions wait on
/// the same future — the first to poll triggers the actual spill.
left_spill_fut: OnceFut<LeftSpillData>,
/// Left input stream for incremental chunk reading (from spill file).
/// None until `left_spill_fut` resolves.
left_stream: Option<SendableRecordBatchStream>,
/// Left-side schema (set once `left_spill_fut` resolves)
left_schema: Option<SchemaRef>,
/// Memory reservation for left-side buffering
reservation: MemoryReservation,
/// Accumulated left batches for the current chunk
pending_batches: Vec<RecordBatch>,
/// Left-side schema (for concat_batches)
left_schema: SchemaRef,
/// SpillManager for right-side spilling
spill_manager: SpillManager,
right_spill_manager: SpillManager,
/// In-progress spill file for writing right batches during first pass
right_spill_in_progress: Option<InProgressSpillFile>,
/// Completed right-side spill file (available after first pass)
Expand Down Expand Up @@ -1263,29 +1294,61 @@ impl NestedLoopJoinStream {

/// Switch from the standard OnceFut path to memory-limited mode.
///
/// Re-executes the left child to get a fresh stream, creates a
/// SpillManager for right-side spilling, and transitions the spill
/// state from `Pending` to `Active`. The next call to
/// `handle_buffering_left` will dispatch to
/// `handle_buffering_left_memory_limited`.
/// Uses the shared `left_spill_data` OnceAsync so that only the first
/// partition to reach this point re-executes the left child and spills
/// it to disk. Other partitions share the same spill file.
fn initiate_fallback(&mut self) -> Result<()> {
// Take ownership of Pending state
let (left_plan, context) =
let (left_plan, context, left_spill_data) =
match std::mem::replace(&mut self.spill_state, SpillState::Disabled) {
SpillState::Pending {
left_plan,
task_context,
} => (left_plan, task_context),
left_spill_data,
} => (left_plan, task_context, left_spill_data),
_ => {
return internal_err!(
"initiate_fallback called in non-Pending spill state"
);
}
};

// Re-execute left child to get a fresh stream
let left_stream = left_plan.execute(0, Arc::clone(&context))?;
let left_schema = left_stream.schema();
// Use OnceAsync to ensure only the first partition spills the left
// side. Other partitions will get the same OnceFut that resolves
// to the shared spill file.
let left_spill_fut = left_spill_data.try_once(|| {
let plan = Arc::clone(&left_plan);
let ctx = Arc::clone(&context);
let spill_metrics = self.metrics.spill_metrics.clone();
Ok(async move {
let mut stream = plan.execute(0, Arc::clone(&ctx))?;
let schema = stream.schema();
let left_spill_manager = SpillManager::new(
ctx.runtime_env(),
spill_metrics,
Arc::clone(&schema),
)
.with_compression_type(ctx.session_config().spill_compression());

let result = left_spill_manager
.spill_record_batch_stream_and_return_max_batch_memory(
&mut stream,
"NestedLoopJoin left spill",
)
.await?;

match result {
Some((file, _max_batch_memory)) => Ok(LeftSpillData {
spill_manager: left_spill_manager,
spill_file: file,
schema,
}),
None => {
internal_err!("Left side produced no data to spill")
}
}
})
})?;

// Create reservation with can_spill for fair memory allocation
let reservation = MemoryConsumer::new("NestedLoopJoinLoad[fallback]".to_string())
Expand All @@ -1294,19 +1357,20 @@ impl NestedLoopJoinStream {

// Create SpillManager for right-side spilling
let right_schema = self.right_data.schema();
let spill_manager = SpillManager::new(
let right_spill_manager = SpillManager::new(
context.runtime_env(),
self.metrics.spill_metrics.clone(),
right_schema,
)
.with_compression_type(context.session_config().spill_compression());

self.spill_state = SpillState::Active(Box::new(SpillStateActive {
left_stream,
left_spill_fut,
left_stream: None,
left_schema: None,
reservation,
pending_batches: Vec::new(),
left_schema,
spill_manager,
right_spill_manager,
right_spill_in_progress: None,
right_spill_file: None,
right_max_batch_memory: 0,
Expand Down Expand Up @@ -1378,11 +1442,44 @@ impl NestedLoopJoinStream {
);
};

// On first entry (or after re-entry for a new chunk pass when
// left_stream was consumed), wait for the shared left spill
// future to resolve and then open a stream from the spill file.
if active.left_stream.is_none() {
match active.left_spill_fut.get_shared(cx) {
Poll::Ready(Ok(spill_data)) => {
match spill_data
.spill_manager
.read_spill_as_stream(spill_data.spill_file.clone(), None)
{
Ok(stream) => {
active.left_schema = Some(Arc::clone(&spill_data.schema));
active.left_stream = Some(stream);
}
Err(e) => {
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
}
}
}
Poll::Ready(Err(e)) => {
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
}
Poll::Pending => {
return ControlFlow::Break(Poll::Pending);
}
}
}

let left_stream = active
.left_stream
.as_mut()
.expect("left_stream must be set after spill future resolves");

// Poll left stream for more batches.
// Note: pending_batches may already contain a batch from the
// previous chunk iteration (the batch that triggered the memory limit).
loop {
match active.left_stream.poll_next_unpin(cx) {
match left_stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
if batch.num_rows() == 0 {
continue;
Expand Down Expand Up @@ -1431,13 +1528,18 @@ impl NestedLoopJoinStream {
return ControlFlow::Continue(());
}

let merged_batch =
match concat_batches(&active.left_schema, &active.pending_batches) {
Ok(batch) => batch,
Err(e) => {
return ControlFlow::Break(Poll::Ready(Some(Err(e.into()))));
}
};
let merged_batch = match concat_batches(
active
.left_schema
.as_ref()
.expect("left_schema must be set"),
&active.pending_batches,
) {
Ok(batch) => batch,
Err(e) => {
return ControlFlow::Break(Poll::Ready(Some(Err(e.into()))));
}
};
active.pending_batches.clear();

// Build visited bitmap if needed for this join type
Expand Down Expand Up @@ -1472,7 +1574,7 @@ impl NestedLoopJoinStream {
// Set up right-side stream for this pass
if !active.is_first_right_pass {
if let Some(file) = active.right_spill_file.as_ref() {
match active.spill_manager.read_spill_as_stream(
match active.right_spill_manager.read_spill_as_stream(
file.clone(),
Some(active.right_max_batch_memory),
) {
Expand All @@ -1487,7 +1589,7 @@ impl NestedLoopJoinStream {
} else {
// First pass: create InProgressSpillFile for right side
match active
.spill_manager
.right_spill_manager
.create_in_progress_file("NestedLoopJoin right spill")
{
Ok(file) => {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/sqllogictest/test_files/nested_loop_join_spill.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ INNER JOIN generate_series(1, 1) AS t2(v2)
100000 1 100000

# --- Verify spill metrics via EXPLAIN ANALYZE ---
# The NestedLoopJoinExec line should show spill_count=1, confirming
# the memory-limited fallback path was taken and right side was spilled.
# The NestedLoopJoinExec line should show spill_count=2, confirming
# the memory-limited fallback path was taken (left side spilled once,
# right side spilled once).
query TT
EXPLAIN ANALYZE SELECT count(*)
FROM generate_series(1, 100000) AS t1(v1)
Expand All @@ -50,7 +51,7 @@ INNER JOIN generate_series(1, 1) AS t2(v2)
Plan with Metrics
01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)], metrics=[<slt:ignore>]
02)--AggregateExec: mode=Single, gby=[], aggr=[count(Int64(1))], metrics=[<slt:ignore>]
03)----NestedLoopJoinExec: join_type=Inner, filter=v1@0 + v2@1 > 0, projection=[], metrics=[output_rows=100.0 K, <slt:ignore> spill_count=1, <slt:ignore>]
03)----NestedLoopJoinExec: join_type=Inner, filter=v1@0 + v2@1 > 0, projection=[], metrics=[output_rows=100.0 K, <slt:ignore> spill_count=2, <slt:ignore>]
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the left side spills during OOM, so spill_count increases from 1 to 2.

04)------ProjectionExec: expr=[value@0 as v1], metrics=[<slt:ignore>]
05)--------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192], metrics=[<slt:ignore>]
06)------ProjectionExec: expr=[value@0 as v2], metrics=[<slt:ignore>]
Expand Down
Loading