Skip to content

feat(datafusion): support fetch contract in PaimonTableScan for limit pushdown#223

Closed
sundapeng wants to merge 1 commit intoapache:mainfrom
sundapeng:feat/datafusion-fetch-contract
Closed

feat(datafusion): support fetch contract in PaimonTableScan for limit pushdown#223
sundapeng wants to merge 1 commit intoapache:mainfrom
sundapeng:feat/datafusion-fetch-contract

Conversation

@sundapeng
Copy link
Copy Markdown
Member

Purpose

Implements the DataFusion fetch contract in PaimonTableScan, enabling DataFusion's optimizer to push LIMIT directly into the scan node and eliminate the separate GlobalLimitExec operator.

Closes #220.

Changes

crates/integrations/datafusion/src/physical_plan/scan.rs

  • Added supports_limit_pushdown() -> true to opt into DataFusion's limit pushdown optimization
  • Added with_fetch(limit) to return a new scan node with the given row limit
  • Added fetch() to return the current limit
  • Added apply_limit() helper function using async_stream to truncate the record batch stream at the row limit during execute(), with proper batch slicing for partial batches

crates/integrations/datafusion/Cargo.toml

  • Added async-stream = "0.3" dependency for clean async stream generation

crates/integrations/datafusion/tests/read_tables.rs

  • Added test_fetch_contract_eliminates_limit_exec integration test that verifies:
    • PaimonTableScan shows limit/fetch in plan display
    • GlobalLimitExec is eliminated from the physical plan
    • Correct row count is returned

How it works

When DataFusion encounters SELECT ... LIMIT N, the optimizer calls supports_limit_pushdown() on the scan node. Since we return true, it calls with_fetch(N) to create a new scan with the limit baked in, eliminating the need for a separate GlobalLimitExec.

During execution, apply_limit() wraps the underlying record batch stream and:

  1. Counts rows across batches
  2. Slices the last batch if it would exceed the limit
  3. Stops the stream once the limit is reached

Testing

  • cargo build -p paimon-datafusion — builds clean
  • cargo test -p paimon-datafusion --lib — 19 passed, 4 pre-existing failures (need warehouse data)

… pushdown

Implement the DataFusion fetch contract (supports_limit_pushdown,
with_fetch, fetch) on PaimonTableScan, allowing DataFusion to push
LIMIT directly into the scan node and eliminate the separate
GlobalLimitExec operator.

Changes:
- Add `supports_limit_pushdown() -> true` to opt into the contract
- Add `with_fetch()` to create a new scan node with the given limit
- Add `fetch()` to expose the current limit
- Add `apply_limit()` helper that truncates the record batch stream
  at the row-level limit per partition during execute()
- Add integration test verifying GlobalLimitExec is eliminated

Closes apache#220

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@sundapeng sundapeng closed this Apr 7, 2026
@sundapeng sundapeng deleted the feat/datafusion-fetch-contract branch April 7, 2026 06:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support fetch contract in PaimonTableScan for DataFusion limit pushdown

1 participant