Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions include/paimon/read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string>
#include <vector>

#include "arrow/c/abi.h"
#include "paimon/cache/cache.h"
#include "paimon/predicate/predicate.h"
#include "paimon/result.h"
Expand All @@ -44,7 +45,7 @@ class FileSystem;
class PAIMON_EXPORT ReadContext {
public:
ReadContext(const std::string& path, const std::string& branch,
const std::vector<std::string>& read_schema,
const std::vector<std::string>& read_field_names,
const std::vector<int32_t>& read_field_ids,
const std::shared_ptr<Predicate>& predicate, bool enable_predicate_filter,
bool enable_prefetch, uint32_t prefetch_batch_count,
Expand Down Expand Up @@ -75,8 +76,8 @@ class PAIMON_EXPORT ReadContext {
return options_;
}

const std::vector<std::string>& GetReadSchema() const {
return read_schema_;
const std::vector<std::string>& GetReadFieldNames() const {
return read_field_names_;
}

const std::vector<int32_t>& GetReadFieldIds() const {
Expand Down Expand Up @@ -130,10 +131,25 @@ class PAIMON_EXPORT ReadContext {
return cache_;
}

/// Whether a read schema (C ArrowSchema) for nested column pruning was provided.
bool HasReadSchema() const {
return read_schema_ != nullptr;
}

/// Get the read schema as a mutable C ArrowSchema pointer.
/// ImportSchema will consume (release) the schema content.
ArrowSchema* GetReadSchema() {
return read_schema_;
}

/// Set the read schema from a C ArrowSchema pointer. Does NOT take ownership.
/// Called internally by ReadContextBuilder.
void SetReadSchema(ArrowSchema* schema);
Comment thread
lucasfang marked this conversation as resolved.

private:
std::string path_;
std::string branch_;
std::vector<std::string> read_schema_;
std::vector<std::string> read_field_names_;
std::vector<int32_t> read_field_ids_;
std::shared_ptr<Predicate> predicate_;
bool enable_predicate_filter_;
Expand All @@ -151,6 +167,7 @@ class PAIMON_EXPORT ReadContext {
PrefetchCacheMode prefetch_cache_mode_;
CacheConfig cache_config_;
std::shared_ptr<Cache> cache_;
ArrowSchema* read_schema_ = nullptr;
};

/// `ReadContextBuilder` used to build a `ReadContext`, has input validation.
Expand All @@ -173,9 +190,9 @@ class PAIMON_EXPORT ReadContextBuilder {
///
/// @param read_field_names Vector of field names to read from the table.
/// @return Reference to this builder for method chaining.
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection
ReadContextBuilder& SetReadSchema(const std::vector<std::string>& read_field_names);
/// @note Currently supports top-level field selection. For nested field selection
/// use SetReadSchema(ArrowSchema*) instead.
ReadContextBuilder& SetReadFieldNames(const std::vector<std::string>& read_field_names);
/// Set the schema fields to read from the table.
///

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetReadSchema(const std::vector<std::string>&) is part of the exported ReadContextBuilder API, but this PR renames it to SetReadFieldNames(...). That breaks existing callers that use top-level projection, while the PR states there is no incompatible public API removal. Please update pr description.

/// If not set, all fields from the table schema will be read. This is useful for
Expand All @@ -184,12 +201,28 @@ class PAIMON_EXPORT ReadContextBuilder {
///
/// @param read_field_ids Vector of field ids to read from the table.
/// @return Reference to this builder for method chaining.
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection.
/// @note SetReadFieldIds() and SetReadSchema() are mutually exclusive.
/// Calling both will ignore the read schema set by SetReadSchema().
/// @note Currently supports top-level field selection.
/// @note SetReadFieldIds() and SetReadFieldNames() are mutually exclusive.
/// Calling both will ignore the read schema set by SetReadFieldNames().
ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>& read_field_ids);

/// Set the read Arrow Schema for nested column pruning.
///
/// The read schema is an Arrow C Data Interface schema where STRUCT types
/// may contain only a subset of the original sub-fields, enabling nested column
/// pruning to reduce I/O. Field matching is based on field name: the system
/// looks up each field by name in the table schema and rebuilds the aligned
/// schema using the table schema's type and metadata. Metadata propagation
/// from the user-provided schema is whitelist-based: currently only
/// "paimon.map.selected-keys" is preserved and merged into the final aligned
/// schema.
///

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider modify comments for preserved.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May add paimon.map.selected-keys usage or helper utils to generate proper read schema.

/// @param read_schema Arrow C Schema. The caller retains ownership.
/// @return Reference to this builder for method chaining.
/// @note Priority: read_schema > read_field_ids > read_field_names.
/// When set, read_field_ids and read_field_names are ignored.
ReadContextBuilder& SetReadSchema(ArrowSchema* read_schema);

/// Set a configuration options map to set some option entries which are not defined in the
/// table schema or whose values you want to overwrite.
/// @note The options map will clear the options added by `AddOption()` before.
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ set(PAIMON_CORE_SRCS
core/utils/blob_view_lookup.cpp
core/utils/consumer_manager.cpp
core/utils/field_mapping.cpp
core/utils/nested_projection_utils.cpp
core/utils/file_store_path_factory.cpp
core/utils/file_utils.cpp
core/utils/manifest_meta_reader.cpp
Expand Down Expand Up @@ -735,6 +736,7 @@ if(PAIMON_BUILD_TESTS)
core/utils/consumer_manager_test.cpp
core/utils/file_store_path_factory_cache_test.cpp
core/utils/field_mapping_test.cpp
core/utils/nested_projection_utils_test.cpp
core/utils/file_store_path_factory_test.cpp
core/utils/file_utils_test.cpp
core/utils/manifest_meta_reader_test.cpp
Expand Down
5 changes: 1 addition & 4 deletions src/paimon/common/memory/memory_segment_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,7 @@ TEST(MemorySegmentTest, TestDoubleAccess) {
delete[] occupied;
}

// ------------------------------------------------------------------------
// Bulk Byte Movements
// ------------------------------------------------------------------------

// Bulk Byte Movements
TEST(MemorySegmentTest, TestBulkByteAccess) {
auto pool = paimon::GetDefaultPool();
// test expected correct behavior with default offset / length
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/common/types/data_field.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class DataField : public Jsonizable<DataField> {

static constexpr char FIELD_ID[] = "paimon.id";
static constexpr char DESCRIPTION[] = "paimon.description";
/// Metadata key for map field selected keys. The value is a comma-separated
/// string of key names, e.g. 'key1,key2'. Only string-keyed maps are supported.
static constexpr char MAP_SELECTED_KEYS[] = "paimon.map.selected-keys";

public:
static std::shared_ptr<arrow::Field> ConvertDataFieldToArrowField(const DataField& field);
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/global_index/global_index_write_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Result<std::unique_ptr<BatchReader>> CreateBatchReader(
.WithFileSystem(core_options.GetFileSystem())
.EnablePrefetch(true)
.WithMemoryPool(pool)
.SetReadSchema({field_name, SpecialFields::RowId().Name()});
.SetReadFieldNames({field_name, SpecialFields::RowId().Name()});
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReadContext> read_context,
read_context_builder.Finish());
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<TableRead> table_read,
Expand Down
67 changes: 50 additions & 17 deletions src/paimon/core/io/field_mapping_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <cassert>
#include <cstddef>
#include <set>
#include <utility>

#include "arrow/api.h"
Expand All @@ -35,6 +36,7 @@
#include "paimon/core/casting/cast_executor.h"
#include "paimon/core/casting/casting_utils.h"
#include "paimon/core/utils/field_mapping.h"
#include "paimon/core/utils/nested_projection_utils.h"
#include "paimon/memory/bytes.h"
#include "paimon/reader/batch_reader.h"

Expand Down Expand Up @@ -74,6 +76,23 @@ FieldMappingReader::FieldMappingReader(int32_t field_count,
non_partition_info_.non_partition_read_schema[i].Name()) {
need_mapping_ = true;
}
// Map selected-keys metadata also requires mapping so that
// FilterMapArrayBySelectedKeys can filter out unwanted entries.
if (!need_mapping_ &&
non_partition_info_.non_partition_read_schema[i].Type()->id() == arrow::Type::MAP) {
auto selected_keys_or = NestedProjectionUtils::GetMapSelectedKeys(
non_partition_info_.non_partition_read_schema[i].ArrowField());
if (!selected_keys_or.ok()) {
// Keep mapping enabled so the parse error can be surfaced in
// MappingFields where Status can be returned.
need_mapping_ = true;
continue;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we delay the error until the actual I/O read happens? Shouldn’t this be validated and reported during construction instead?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May add Create for FieldMappingReader

}
auto& selected_keys = selected_keys_or.value();
if (!selected_keys.empty()) {
need_mapping_ = true;
}
}
}
}

Expand Down Expand Up @@ -142,9 +161,9 @@ Result<BatchReader::ReadBatchWithBitmap> FieldMappingReader::NextBatchWithBitmap
// mapping non-partition array
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> casted_non_partition_array,
CastNonPartitionArrayIfNeed(non_partition_array));
Comment thread
lucasfang marked this conversation as resolved.
MappingFields(casted_non_partition_array, non_partition_info_.non_partition_read_schema,
non_partition_info_.idx_in_target_read_schema, &target_array,
&target_field_names);
PAIMON_RETURN_NOT_OK(MappingFields(
casted_non_partition_array, non_partition_info_.non_partition_read_schema,
non_partition_info_.idx_in_target_read_schema, &target_array, &target_field_names));

// mapping partition array
if (partition_info_ != std::nullopt) {
Expand All @@ -153,9 +172,9 @@ Result<BatchReader::ReadBatchWithBitmap> FieldMappingReader::NextBatchWithBitmap
GeneratePartitionArray(non_partition_array->length()));
}
auto trim_partition_array = partition_array_->Slice(0, non_partition_array->length());
MappingFields(trim_partition_array, partition_info_.value().partition_read_schema,
partition_info_.value().idx_in_target_read_schema, &target_array,
&target_field_names);
PAIMON_RETURN_NOT_OK(MappingFields(
trim_partition_array, partition_info_.value().partition_read_schema,
partition_info_.value().idx_in_target_read_schema, &target_array, &target_field_names));
}
// mapping non-exist array
if (non_exist_field_info_ != std::nullopt) {
Expand All @@ -164,9 +183,10 @@ Result<BatchReader::ReadBatchWithBitmap> FieldMappingReader::NextBatchWithBitmap
GenerateNonExistArray(non_partition_array->length()));
}
auto trim_non_exist_array = non_exist_array_->Slice(0, non_partition_array->length());
MappingFields(trim_non_exist_array, non_exist_field_info_.value().non_exist_read_schema,
non_exist_field_info_.value().idx_in_target_read_schema, &target_array,
&target_field_names);
PAIMON_RETURN_NOT_OK(MappingFields(trim_non_exist_array,
non_exist_field_info_.value().non_exist_read_schema,
non_exist_field_info_.value().idx_in_target_read_schema,
&target_array, &target_field_names));
}

// construct target array
Expand Down Expand Up @@ -283,20 +303,33 @@ Result<std::shared_ptr<arrow::Array>> FieldMappingReader::GenerateNonExistArray(
return arrow_array;
}

void FieldMappingReader::MappingFields(const std::shared_ptr<arrow::Array>& data_array,
const std::vector<DataField>& read_fields_of_data_array,
const std::vector<int32_t>& idx_in_target_schema,
arrow::ArrayVector* target_array,
std::vector<std::string>* target_field_names) {
Status FieldMappingReader::MappingFields(const std::shared_ptr<arrow::Array>& data_array,
const std::vector<DataField>& read_fields_of_data_array,
const std::vector<int32_t>& idx_in_target_schema,
arrow::ArrayVector* target_array,
std::vector<std::string>* target_field_names) {
auto* struct_array = arrow::internal::checked_cast<arrow::StructArray*>(data_array.get());
assert(struct_array);
assert(struct_array->fields().size() == idx_in_target_schema.size());
for (size_t i = 0; i < idx_in_target_schema.size(); i++) {
// target type may be string type, but after adapter transform, type may be dictionary,
// need reconstruct struct type
(*target_array)[idx_in_target_schema[i]] = struct_array->field(i);
std::shared_ptr<arrow::Array> field_array = struct_array->field(i);

// Filter map entries by selected keys if metadata is present.
if (field_array->type()->id() == arrow::Type::MAP) {
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> selected_keys,
NestedProjectionUtils::GetMapSelectedKeys(
read_fields_of_data_array[i].ArrowField()));
if (!selected_keys.empty()) {
PAIMON_ASSIGN_OR_RAISE(field_array,
NestedProjectionUtils::FilterMapArrayBySelectedKeys(
field_array, selected_keys));
}
}

(*target_array)[idx_in_target_schema[i]] = std::move(field_array);
(*target_field_names)[idx_in_target_schema[i]] = read_fields_of_data_array[i].Name();
}
return Status::OK();
}

} // namespace paimon
10 changes: 5 additions & 5 deletions src/paimon/core/io/field_mapping_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ class FieldMappingReader : public FileBatchReader {
Result<std::shared_ptr<arrow::Array>> CastNonPartitionArrayIfNeed(
const std::shared_ptr<arrow::Array>& src_array) const;

static void MappingFields(const std::shared_ptr<arrow::Array>& src_array,
const std::vector<DataField>& read_fields_of_data_array,
const std::vector<int32_t>& idx_in_target_schema,
arrow::ArrayVector* target_array,
std::vector<std::string>* target_field_names);
static Status MappingFields(const std::shared_ptr<arrow::Array>& src_array,
const std::vector<DataField>& read_fields_of_data_array,
const std::vector<int32_t>& idx_in_target_schema,
arrow::ArrayVector* target_array,
std::vector<std::string>* target_field_names);

private:
bool need_mapping_ = false;
Expand Down
1 change: 1 addition & 0 deletions src/paimon/core/io/field_mapping_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "paimon/memory/memory_pool.h"
#include "paimon/predicate/literal.h"
#include "paimon/predicate/predicate_builder.h"
#include "paimon/testing/mock/mock_file_batch_reader.h"
#include "paimon/testing/utils/binary_row_generator.h"
#include "paimon/testing/utils/read_result_collector.h"
#include "paimon/testing/utils/testharness.h"
Expand Down
Loading
Loading