[server] Add server-side filter execution and client-side integration#2951
[server] Add server-side filter execution and client-side integration#2951platinumhamburg wants to merge 6 commits intoapache:mainfrom
Conversation
43fc542 to
3cd1f6d
Compare
3cd1f6d to
9502649
Compare
- Fix filter variable reset in Replica catch block to ensure graceful fallback on partial initialization failure - Remove unused fullRowType field in LogFetcher - Move RecordBatchFilterTest to fluss-server to use PredicateSchemaResolver - Add Javadoc for LogTablet.read() - Defer MultiBytesView.Builder allocation until first included batch - Rate-limit filter evaluation warnings in LogSegment - Fix Yoda condition style in LogFetcher
- Rename ambiguous identifiers (schemaId -> filterSchemaId, lastFilteredSkipOffset -> lastFilteredEndOffset, getSchemaId -> getTargetSchemaId) - Extract sentinel -1 to NO_FILTERED_END_OFFSET constant - Add null check for schema lookup in Replica filter init - Remove redundant maxPosition guard in readWithFilter - Narrow 7-param LogSegment.read() to package-private - Add @NotThreadSafe on PredicateSchemaResolver - Add debug log for filtered segment skip in LocalLog - Strengthen test assertions to concrete values - Remove redundant test, add Javadoc and proto comments
| projectedColumns, | ||
| schemaGetter); | ||
| schemaGetter, | ||
| recordBatchFilter); |
There was a problem hiding this comment.
throw exception in all the other createXxxScanner if the recordBatchFilter is not null, because they all don't support the filter.
| default Scan filter(@Nullable Predicate predicate) { | ||
| throw new UnsupportedOperationException("Filter pushdown is not supported by this Scan."); |
There was a problem hiding this comment.
We don't need to have default implementation, because it is not a user-defined interface. All the implementation of this interface are provided by Fluss itself, so a simple interface without default implemenation is much cleaner here.
| @ParameterizedTest | ||
| @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) | ||
| void testProjectRecordBatchNoStatisticsClearing(byte recordBatchMagic) throws Exception { | ||
| // Test that statistics clearing only happens for V2+ versions |
There was a problem hiding this comment.
Since we have changed the stats to V1, we should update the tests to reflect that. IIUC, this should test only V0 and we may also need to update other tests.
| MultiBytesView.Builder builder = null; | ||
| int accumulatedSize = 0; | ||
| FileChannelLogRecordBatch firstIncludedBatch = null; | ||
| FileChannelLogRecordBatch lastIncludedBatch = null; |
There was a problem hiding this comment.
lastIncludedBatch is not used, remove
| accumulatedSize += batchSize; | ||
| } else { | ||
| // With projection: project first, then check size with projected size | ||
| BytesView projectedBytesView = projection.projectRecordBatch(batch); |
There was a problem hiding this comment.
throw exception if logFormat != LogFormat.ARROW like how we do in readWithoutFilter.
| if (result == null) { | ||
| result = new HashMap<>(); | ||
| } | ||
| int schemaId = tableReq.hasFilterSchemaId() ? tableReq.getFilterSchemaId() : -1; |
There was a problem hiding this comment.
Is -1 valid? If not, we should fail-fast by throwing exception.
| if (respForBucket.hasFilteredEndOffset() | ||
| && respForBucket.getFilteredEndOffset() >= 0) { | ||
| fetchLogResultForBucket = | ||
| new FetchLogResultForBucket( | ||
| tb, |
There was a problem hiding this comment.
When filteredEndOffset is set, any records in the response are currently silently ignored. Although records are typically unset when filteredEndOffset is present, a specific edge case leads to inefficiency: if the first batch in a log file matches the filter but subsequent batches do not, the system returns only the next offset of the matched batch without including the FilteredEndOffset. Consequently, these already-filtered batches must be re-evaluated in the next request, resulting in unnecessary resource consumption and performance degradation.
| } | ||
|
|
||
| @Override | ||
| public Long[] getNullCounts() { |
There was a problem hiding this comment.
I think we can optimize getNullCounts() return a primitive int[].
intcount is enough as you only use 4 bytes for each field null count.intrather than boxed typeIntegerto avoid boxing (double size), as you use-1as non-exist.
This can reduce a lot of memory size for cachedNullCounts, statsNullCounts. Especially for cachedNullCounts when total field count is very large (1000+).
| * FileChannelLogRecordBatch and DefaultLogRecordBatch to read null counts from Arrow metadata | ||
| * instead of the statistics binary format (V2). | ||
| */ | ||
| class ArrowNullCountReader { |
There was a problem hiding this comment.
Could you revert the commit of get null count from arrow meta, this pull request has already been very large. I think we can review this in an separate pull request. Besides, I have some concerns on the cache and deserialization performance.
| static final class FieldNodeMappingKey { | ||
| private final int schemaId; | ||
| private final int[] statsIndexMapping; |
There was a problem hiding this comment.
The cache key lacks tableId, it has issues when there are multiple tables.
This reverts commit 63db1b5.
Purpose
Linked issue: close #2950 2950
Brief change log
Tests
API and Format
Documentation