Skip to content

[server] Add server-side filter execution and client-side integration#2951

Open
platinumhamburg wants to merge 6 commits intoapache:mainfrom
platinumhamburg:filter-server-client-integration
Open

[server] Add server-side filter execution and client-side integration#2951
platinumhamburg wants to merge 6 commits intoapache:mainfrom
platinumhamburg:filter-server-client-integration

Conversation

@platinumhamburg
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #2950 2950

Brief change log

Tests

API and Format

Documentation

@platinumhamburg platinumhamburg force-pushed the filter-server-client-integration branch 2 times, most recently from 43fc542 to 3cd1f6d Compare March 29, 2026 13:45
@platinumhamburg platinumhamburg force-pushed the filter-server-client-integration branch from 3cd1f6d to 9502649 Compare March 30, 2026 05:55
- Fix filter variable reset in Replica catch block to ensure graceful
  fallback on partial initialization failure
- Remove unused fullRowType field in LogFetcher
- Move RecordBatchFilterTest to fluss-server to use PredicateSchemaResolver
- Add Javadoc for LogTablet.read()
- Defer MultiBytesView.Builder allocation until first included batch
- Rate-limit filter evaluation warnings in LogSegment
- Fix Yoda condition style in LogFetcher
- Rename ambiguous identifiers (schemaId -> filterSchemaId,
  lastFilteredSkipOffset -> lastFilteredEndOffset,
  getSchemaId -> getTargetSchemaId)
- Extract sentinel -1 to NO_FILTERED_END_OFFSET constant
- Add null check for schema lookup in Replica filter init
- Remove redundant maxPosition guard in readWithFilter
- Narrow 7-param LogSegment.read() to package-private
- Add @NotThreadSafe on PredicateSchemaResolver
- Add debug log for filtered segment skip in LocalLog
- Strengthen test assertions to concrete values
- Remove redundant test, add Javadoc and proto comments
projectedColumns,
schemaGetter);
schemaGetter,
recordBatchFilter);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw exception in all the other createXxxScanner if the recordBatchFilter is not null, because they all don't support the filter.

Comment on lines +73 to +74
default Scan filter(@Nullable Predicate predicate) {
throw new UnsupportedOperationException("Filter pushdown is not supported by this Scan.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to have default implementation, because it is not a user-defined interface. All the implementation of this interface are provided by Fluss itself, so a simple interface without default implemenation is much cleaner here.

@ParameterizedTest
@ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
void testProjectRecordBatchNoStatisticsClearing(byte recordBatchMagic) throws Exception {
// Test that statistics clearing only happens for V2+ versions
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have changed the stats to V1, we should update the tests to reflect that. IIUC, this should test only V0 and we may also need to update other tests.

MultiBytesView.Builder builder = null;
int accumulatedSize = 0;
FileChannelLogRecordBatch firstIncludedBatch = null;
FileChannelLogRecordBatch lastIncludedBatch = null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastIncludedBatch is not used, remove

accumulatedSize += batchSize;
} else {
// With projection: project first, then check size with projected size
BytesView projectedBytesView = projection.projectRecordBatch(batch);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw exception if logFormat != LogFormat.ARROW like how we do in readWithoutFilter.

if (result == null) {
result = new HashMap<>();
}
int schemaId = tableReq.hasFilterSchemaId() ? tableReq.getFilterSchemaId() : -1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is -1 valid? If not, we should fail-fast by throwing exception.

Comment on lines +205 to +209
if (respForBucket.hasFilteredEndOffset()
&& respForBucket.getFilteredEndOffset() >= 0) {
fetchLogResultForBucket =
new FetchLogResultForBucket(
tb,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When filteredEndOffset is set, any records in the response are currently silently ignored. Although records are typically unset when filteredEndOffset is present, a specific edge case leads to inefficiency: if the first batch in a log file matches the filter but subsequent batches do not, the system returns only the next offset of the matched batch without including the FilteredEndOffset. Consequently, these already-filtered batches must be re-evaluated in the next request, resulting in unnecessary resource consumption and performance degradation.

}

@Override
public Long[] getNullCounts() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can optimize getNullCounts() return a primitive int[].

  1. int count is enough as you only use 4 bytes for each field null count.
  2. int rather than boxed type Integer to avoid boxing (double size), as you use -1 as non-exist.

This can reduce a lot of memory size for cachedNullCounts, statsNullCounts. Especially for cachedNullCounts when total field count is very large (1000+).

* FileChannelLogRecordBatch and DefaultLogRecordBatch to read null counts from Arrow metadata
* instead of the statistics binary format (V2).
*/
class ArrowNullCountReader {
Copy link
Copy Markdown
Member

@wuchong wuchong Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you revert the commit of get null count from arrow meta, this pull request has already been very large. I think we can review this in an separate pull request. Besides, I have some concerns on the cache and deserialization performance.

Comment on lines +143 to +145
static final class FieldNodeMappingKey {
private final int schemaId;
private final int[] statsIndexMapping;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache key lacks tableId, it has issues when there are multiple tables.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add server-side record batch filter execution

2 participants