diff --git a/bindings/c/src/result.rs b/bindings/c/src/result.rs index a4fd62b..19d523c 100644 --- a/bindings/c/src/result.rs +++ b/bindings/c/src/result.rs @@ -66,6 +66,12 @@ pub struct paimon_result_record_batch_reader { pub error: *mut paimon_error, } +#[repr(C)] +pub struct paimon_result_predicate { + pub predicate: *mut paimon_predicate, + pub error: *mut paimon_error, +} + #[repr(C)] pub struct paimon_result_next_batch { pub batch: paimon_arrow_batch, diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs index 6f6637d..c6496f6 100644 --- a/bindings/c/src/table.rs +++ b/bindings/c/src/table.rs @@ -20,23 +20,18 @@ use std::ffi::c_void; use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; use arrow_array::{Array, StructArray}; use futures::StreamExt; +use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder}; use paimon::table::{ArrowRecordBatchStream, Table}; use paimon::Plan; -use crate::error::{check_non_null, paimon_error}; +use crate::error::{check_non_null, paimon_error, validate_cstr, PaimonErrorCode}; use crate::result::{ - paimon_result_new_read, paimon_result_next_batch, paimon_result_plan, + paimon_result_new_read, paimon_result_next_batch, paimon_result_plan, paimon_result_predicate, paimon_result_read_builder, paimon_result_record_batch_reader, paimon_result_table_scan, }; use crate::runtime; use crate::types::*; -// Helper to box a Table clone into a wrapper struct and return a raw pointer. -unsafe fn box_table_wrapper(table: &Table, make: impl FnOnce(*mut c_void) -> T) -> *mut T { - let inner = Box::into_raw(Box::new(table.clone())) as *mut c_void; - Box::into_raw(Box::new(make(inner))) -} - // Helper to free a wrapper struct that contains a Table clone. unsafe fn free_table_wrapper(ptr: *mut T, get_inner: impl FnOnce(&T) -> *mut c_void) { if !ptr.is_null() { @@ -89,6 +84,7 @@ pub unsafe extern "C" fn paimon_table_new_read_builder( let state = ReadBuilderState { table: table_ref.clone(), projected_columns: None, + filter: None, }; paimon_result_read_builder { read_builder: box_read_builder_state(state), @@ -157,6 +153,36 @@ pub unsafe extern "C" fn paimon_read_builder_with_projection( std::ptr::null_mut() } +/// Set a filter predicate for scan planning. +/// +/// The predicate is consumed (ownership transferred to the read builder). +/// Pass null to clear any previously set filter. +/// +/// # Safety +/// `rb` must be a valid pointer from `paimon_table_new_read_builder`, or null (returns error). +/// `predicate` must be a valid pointer from a `paimon_predicate_*` function, or null. +#[no_mangle] +pub unsafe extern "C" fn paimon_read_builder_with_filter( + rb: *mut paimon_read_builder, + predicate: *mut paimon_predicate, +) -> *mut paimon_error { + if let Err(e) = check_non_null(rb, "rb") { + return e; + } + + let state = &mut *((*rb).inner as *mut ReadBuilderState); + + if predicate.is_null() { + state.filter = None; + return std::ptr::null_mut(); + } + + let pred_wrapper = Box::from_raw(predicate); + let pred = Box::from_raw(pred_wrapper.inner as *mut Predicate); + state.filter = Some(*pred); + std::ptr::null_mut() +} + /// Create a new TableScan from a ReadBuilder. /// /// # Safety @@ -172,8 +198,13 @@ pub unsafe extern "C" fn paimon_read_builder_new_scan( }; } let state = &*((*rb).inner as *const ReadBuilderState); + let scan_state = TableScanState { + table: state.table.clone(), + filter: state.filter.clone(), + }; + let inner = Box::into_raw(Box::new(scan_state)) as *mut c_void; paimon_result_table_scan { - scan: box_table_wrapper(&state.table, |inner| paimon_table_scan { inner }), + scan: Box::into_raw(Box::new(paimon_table_scan { inner })), error: std::ptr::null_mut(), } } @@ -201,11 +232,17 @@ pub unsafe extern "C" fn paimon_read_builder_new_read( rb_rust.with_projection(&col_refs); } + // Apply filter if set + if let Some(ref filter) = state.filter { + rb_rust.with_filter(filter.clone()); + } + match rb_rust.new_read() { Ok(table_read) => { let read_state = TableReadState { table: state.table.clone(), read_type: table_read.read_type().to_vec(), + data_predicates: table_read.data_predicates().to_vec(), }; paimon_result_new_read { read: box_table_read_state(read_state), @@ -227,7 +264,12 @@ pub unsafe extern "C" fn paimon_read_builder_new_read( /// Only call with a scan returned from `paimon_read_builder_new_scan`. #[no_mangle] pub unsafe extern "C" fn paimon_table_scan_free(scan: *mut paimon_table_scan) { - free_table_wrapper(scan, |s| s.inner); + if !scan.is_null() { + let wrapper = Box::from_raw(scan); + if !wrapper.inner.is_null() { + drop(Box::from_raw(wrapper.inner as *mut TableScanState)); + } + } } /// Execute a scan plan to get splits. @@ -244,8 +286,11 @@ pub unsafe extern "C" fn paimon_table_scan_plan( error: e, }; } - let table = &*((*scan).inner as *const Table); - let rb = table.new_read_builder(); + let scan_state = &*((*scan).inner as *const TableScanState); + let mut rb = scan_state.table.new_read_builder(); + if let Some(ref filter) = scan_state.filter { + rb.with_filter(filter.clone()); + } let table_scan = rb.new_scan(); match runtime().block_on(table_scan.plan()) { @@ -349,10 +394,11 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow( let end = (offset.saturating_add(length)).min(all_splits.len()); let selected = &all_splits[start..end]; - // C bindings currently persist only the projection, so reconstructing the - // read uses an empty predicate set. - let table_read = - paimon::table::TableRead::new(&state.table, state.read_type.clone(), Vec::new()); + let table_read = paimon::table::TableRead::new( + &state.table, + state.read_type.clone(), + state.data_predicates.clone(), + ); match table_read.to_arrow(selected) { Ok(stream) => { @@ -476,3 +522,511 @@ pub unsafe extern "C" fn paimon_arrow_batch_free(batch: paimon_arrow_batch) { drop(Box::from_raw(batch.schema as *mut FFI_ArrowSchema)); } } + +// ======================= Predicate =============================== + +/// Convert a C datum to a Rust Datum. +unsafe fn datum_from_c(d: &paimon_datum) -> Result { + match d.tag { + 0 => Ok(Datum::Bool(d.int_val != 0)), + 1 => Ok(Datum::TinyInt(d.int_val as i8)), + 2 => Ok(Datum::SmallInt(d.int_val as i16)), + 3 => Ok(Datum::Int(d.int_val as i32)), + 4 => Ok(Datum::Long(d.int_val)), + 5 => Ok(Datum::Float(d.double_val as f32)), + 6 => Ok(Datum::Double(d.double_val)), + 7 => { + if d.str_len == 0 { + return Ok(Datum::String(String::new())); + } + if d.str_data.is_null() { + return Err(paimon_error::new( + PaimonErrorCode::InvalidInput, + "null string data in datum with non-zero length".to_string(), + )); + } + let bytes = std::slice::from_raw_parts(d.str_data, d.str_len); + let s = std::str::from_utf8(bytes).map_err(|e| { + paimon_error::new( + PaimonErrorCode::InvalidInput, + format!("invalid UTF-8 in datum string: {e}"), + ) + })?; + Ok(Datum::String(s.to_string())) + } + 8 => Ok(Datum::Date(d.int_val as i32)), + 9 => Ok(Datum::Time(d.int_val as i32)), + 10 => Ok(Datum::Timestamp { + millis: d.int_val, + nanos: d.int_val2 as i32, + }), + 11 => Ok(Datum::LocalZonedTimestamp { + millis: d.int_val, + nanos: d.int_val2 as i32, + }), + 12 => { + let unscaled = ((d.int_val2 as i128) << 64) | (d.int_val as u64 as i128); + Ok(Datum::Decimal { + unscaled, + precision: d.uint_val, + scale: d.uint_val2, + }) + } + 13 => { + if d.str_data.is_null() && d.str_len > 0 { + return Err(paimon_error::new( + PaimonErrorCode::InvalidInput, + "null bytes data in datum".to_string(), + )); + } + let bytes = if d.str_len > 0 { + std::slice::from_raw_parts(d.str_data, d.str_len).to_vec() + } else { + Vec::new() + }; + Ok(Datum::Bytes(bytes)) + } + _ => Err(paimon_error::new( + PaimonErrorCode::InvalidInput, + format!("unknown datum tag: {}", d.tag), + )), + } +} + +/// Coerce an integer-family datum to match the target column's integer type. +/// +/// FFI callers (e.g. Go) often pass a narrower integer literal (Int) for a +/// wider column (BigInt). This function widens or narrows the datum to match, +/// checking range for narrowing conversions. +/// +/// Non-integer datums or non-integer columns are returned as-is. +fn coerce_integer_datum( + datum: Datum, + fields: &[DataField], + column: &str, +) -> Result { + let val = match &datum { + Datum::TinyInt(v) => *v as i64, + Datum::SmallInt(v) => *v as i64, + Datum::Int(v) => *v as i64, + Datum::Long(v) => *v, + _ => return Ok(datum), + }; + + let Some(field) = fields.iter().find(|f| f.name() == column) else { + // Column not found; let PredicateBuilder produce the proper error. + return Ok(datum); + }; + + match field.data_type() { + DataType::TinyInt(_) if !matches!(datum, Datum::TinyInt(_)) => { + if val < i8::MIN as i64 || val > i8::MAX as i64 { + Err(paimon_error::new( + PaimonErrorCode::InvalidInput, + format!("value {val} out of range for TinyInt column '{column}'"), + )) + } else { + Ok(Datum::TinyInt(val as i8)) + } + } + DataType::SmallInt(_) if !matches!(datum, Datum::SmallInt(_)) => { + if val < i16::MIN as i64 || val > i16::MAX as i64 { + Err(paimon_error::new( + PaimonErrorCode::InvalidInput, + format!("value {val} out of range for SmallInt column '{column}'"), + )) + } else { + Ok(Datum::SmallInt(val as i16)) + } + } + DataType::Int(_) if !matches!(datum, Datum::Int(_)) => { + if val < i32::MIN as i64 || val > i32::MAX as i64 { + Err(paimon_error::new( + PaimonErrorCode::InvalidInput, + format!("value {val} out of range for Int column '{column}'"), + )) + } else { + Ok(Datum::Int(val as i32)) + } + } + DataType::BigInt(_) if !matches!(datum, Datum::Long(_)) => Ok(Datum::Long(val)), + _ => Ok(datum), + } +} + +/// Helper to build a leaf predicate that takes a datum, via PredicateBuilder. +unsafe fn build_leaf_predicate_datum( + table: *const paimon_table, + column: *const std::ffi::c_char, + datum: &paimon_datum, + build_fn: impl FnOnce(&PredicateBuilder, &str, Datum) -> paimon::Result, +) -> paimon_result_predicate { + if let Err(e) = check_non_null(table, "table") { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + }; + } + let col_name = match validate_cstr(column, "column") { + Ok(s) => s, + Err(e) => { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + } + } + }; + + let d = match datum_from_c(datum) { + Ok(d) => d, + Err(e) => { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + } + } + }; + + let table_ref = &*((*table).inner as *const Table); + let fields = table_ref.schema().fields(); + + let d = match coerce_integer_datum(d, fields, &col_name) { + Ok(d) => d, + Err(e) => { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + } + } + }; + + let pb = PredicateBuilder::new(fields); + match build_fn(&pb, &col_name, d) { + Ok(pred) => { + let inner = Box::into_raw(Box::new(pred)) as *mut c_void; + paimon_result_predicate { + predicate: Box::into_raw(Box::new(paimon_predicate { inner })), + error: std::ptr::null_mut(), + } + } + Err(e) => paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: paimon_error::from_paimon(e), + }, + } +} + +/// Helper to build a leaf predicate without a datum (IS NULL / IS NOT NULL). +unsafe fn build_leaf_predicate( + table: *const paimon_table, + column: *const std::ffi::c_char, + build_fn: impl FnOnce(&PredicateBuilder, &str) -> paimon::Result, +) -> paimon_result_predicate { + if let Err(e) = check_non_null(table, "table") { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + }; + } + let col_name = match validate_cstr(column, "column") { + Ok(s) => s, + Err(e) => { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + } + } + }; + let table_ref = &*((*table).inner as *const Table); + let pb = PredicateBuilder::new(table_ref.schema().fields()); + match build_fn(&pb, &col_name) { + Ok(pred) => { + let inner = Box::into_raw(Box::new(pred)) as *mut c_void; + paimon_result_predicate { + predicate: Box::into_raw(Box::new(paimon_predicate { inner })), + error: std::ptr::null_mut(), + } + } + Err(e) => paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: paimon_error::from_paimon(e), + }, + } +} + +/// Create an equality predicate: `column = datum`. +/// +/// # Safety +/// `table` and `column` must be valid pointers. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_equal( + table: *const paimon_table, + column: *const std::ffi::c_char, + datum: paimon_datum, +) -> paimon_result_predicate { + build_leaf_predicate_datum(table, column, &datum, |pb, col, d| pb.equal(col, d)) +} + +/// Create a not-equal predicate: `column != datum`. +/// +/// # Safety +/// `table` and `column` must be valid pointers. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_not_equal( + table: *const paimon_table, + column: *const std::ffi::c_char, + datum: paimon_datum, +) -> paimon_result_predicate { + build_leaf_predicate_datum(table, column, &datum, |pb, col, d| pb.not_equal(col, d)) +} + +/// Create a less-than predicate: `column < datum`. +/// +/// # Safety +/// `table` and `column` must be valid pointers. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_less_than( + table: *const paimon_table, + column: *const std::ffi::c_char, + datum: paimon_datum, +) -> paimon_result_predicate { + build_leaf_predicate_datum(table, column, &datum, |pb, col, d| pb.less_than(col, d)) +} + +/// Create a less-or-equal predicate: `column <= datum`. +/// +/// # Safety +/// `table` and `column` must be valid pointers. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_less_or_equal( + table: *const paimon_table, + column: *const std::ffi::c_char, + datum: paimon_datum, +) -> paimon_result_predicate { + build_leaf_predicate_datum(table, column, &datum, |pb, col, d| pb.less_or_equal(col, d)) +} + +/// Create a greater-than predicate: `column > datum`. +/// +/// # Safety +/// `table` and `column` must be valid pointers. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_greater_than( + table: *const paimon_table, + column: *const std::ffi::c_char, + datum: paimon_datum, +) -> paimon_result_predicate { + build_leaf_predicate_datum(table, column, &datum, |pb, col, d| pb.greater_than(col, d)) +} + +/// Create a greater-or-equal predicate: `column >= datum`. +/// +/// # Safety +/// `table` and `column` must be valid pointers. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_greater_or_equal( + table: *const paimon_table, + column: *const std::ffi::c_char, + datum: paimon_datum, +) -> paimon_result_predicate { + build_leaf_predicate_datum(table, column, &datum, |pb, col, d| { + pb.greater_or_equal(col, d) + }) +} + +/// Create an IS NULL predicate. +/// +/// # Safety +/// `table` and `column` must be valid pointers. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_is_null( + table: *const paimon_table, + column: *const std::ffi::c_char, +) -> paimon_result_predicate { + build_leaf_predicate(table, column, |pb, col| pb.is_null(col)) +} + +/// Create an IS NOT NULL predicate. +/// +/// # Safety +/// `table` and `column` must be valid pointers. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_is_not_null( + table: *const paimon_table, + column: *const std::ffi::c_char, +) -> paimon_result_predicate { + build_leaf_predicate(table, column, |pb, col| pb.is_not_null(col)) +} + +/// Create an IN predicate: `column IN (datum1, datum2, ...)`. +/// +/// # Safety +/// `table`, `column`, and `datums` must be valid pointers. `datums_len` must be the length. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_is_in( + table: *const paimon_table, + column: *const std::ffi::c_char, + datums: *const paimon_datum, + datums_len: usize, +) -> paimon_result_predicate { + build_leaf_predicate_datums(table, column, datums, datums_len, |pb, col, values| { + pb.is_in(col, values) + }) +} + +/// Create a NOT IN predicate: `column NOT IN (datum1, datum2, ...)`. +/// +/// # Safety +/// `table`, `column`, and `datums` must be valid pointers. `datums_len` must be the length. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_is_not_in( + table: *const paimon_table, + column: *const std::ffi::c_char, + datums: *const paimon_datum, + datums_len: usize, +) -> paimon_result_predicate { + build_leaf_predicate_datums(table, column, datums, datums_len, |pb, col, values| { + pb.is_not_in(col, values) + }) +} + +/// Helper to build an IN/NOT IN predicate with a datum array. +unsafe fn build_leaf_predicate_datums( + table: *const paimon_table, + column: *const std::ffi::c_char, + datums: *const paimon_datum, + datums_len: usize, + build_fn: impl FnOnce(&PredicateBuilder, &str, Vec) -> paimon::Result, +) -> paimon_result_predicate { + if let Err(e) = check_non_null(table, "table") { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + }; + } + let col_name = match validate_cstr(column, "column") { + Ok(s) => s, + Err(e) => { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + } + } + }; + + if datums.is_null() && datums_len > 0 { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: paimon_error::new( + PaimonErrorCode::InvalidInput, + "null datums pointer with non-zero length".to_string(), + ), + }; + } + + let slice = if datums_len > 0 { + std::slice::from_raw_parts(datums, datums_len) + } else { + &[] + }; + let values: Result, _> = slice.iter().map(|d| datum_from_c(d)).collect(); + let values = match values { + Ok(v) => v, + Err(e) => { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + } + } + }; + + let table_ref = &*((*table).inner as *const Table); + let fields = table_ref.schema().fields(); + + let values: Result, _> = values + .into_iter() + .map(|d| coerce_integer_datum(d, fields, &col_name)) + .collect(); + let values = match values { + Ok(v) => v, + Err(e) => { + return paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: e, + } + } + }; + + let pb = PredicateBuilder::new(fields); + match build_fn(&pb, &col_name, values) { + Ok(pred) => { + let inner = Box::into_raw(Box::new(pred)) as *mut c_void; + paimon_result_predicate { + predicate: Box::into_raw(Box::new(paimon_predicate { inner })), + error: std::ptr::null_mut(), + } + } + Err(e) => paimon_result_predicate { + predicate: std::ptr::null_mut(), + error: paimon_error::from_paimon(e), + }, + } +} + +/// Combine two predicates with AND. Consumes both inputs. +/// +/// # Safety +/// `a` and `b` must be valid pointers from predicate functions. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_and( + a: *mut paimon_predicate, + b: *mut paimon_predicate, +) -> *mut paimon_predicate { + let pred_a = *Box::from_raw(Box::from_raw(a).inner as *mut Predicate); + let pred_b = *Box::from_raw(Box::from_raw(b).inner as *mut Predicate); + let combined = Predicate::and(vec![pred_a, pred_b]); + let inner = Box::into_raw(Box::new(combined)) as *mut c_void; + Box::into_raw(Box::new(paimon_predicate { inner })) +} + +/// Combine two predicates with OR. Consumes both inputs. +/// +/// # Safety +/// `a` and `b` must be valid pointers from predicate functions. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_or( + a: *mut paimon_predicate, + b: *mut paimon_predicate, +) -> *mut paimon_predicate { + let pred_a = *Box::from_raw(Box::from_raw(a).inner as *mut Predicate); + let pred_b = *Box::from_raw(Box::from_raw(b).inner as *mut Predicate); + let combined = Predicate::or(vec![pred_a, pred_b]); + let inner = Box::into_raw(Box::new(combined)) as *mut c_void; + Box::into_raw(Box::new(paimon_predicate { inner })) +} + +/// Negate a predicate with NOT. Consumes the input. +/// +/// # Safety +/// `p` must be a valid pointer from a predicate function. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_not(p: *mut paimon_predicate) -> *mut paimon_predicate { + let pred = *Box::from_raw(Box::from_raw(p).inner as *mut Predicate); + let negated = Predicate::negate(pred); + let inner = Box::into_raw(Box::new(negated)) as *mut c_void; + Box::into_raw(Box::new(paimon_predicate { inner })) +} + +/// Free a paimon_predicate. +/// +/// # Safety +/// Only call with a predicate returned from paimon predicate functions. +#[no_mangle] +pub unsafe extern "C" fn paimon_predicate_free(p: *mut paimon_predicate) { + if !p.is_null() { + let wrapper = Box::from_raw(p); + if !wrapper.inner.is_null() { + drop(Box::from_raw(wrapper.inner as *mut Predicate)); + } + } +} diff --git a/bindings/c/src/types.rs b/bindings/c/src/types.rs index 6f09042..4df28de 100644 --- a/bindings/c/src/types.rs +++ b/bindings/c/src/types.rs @@ -17,7 +17,7 @@ use std::ffi::c_void; -use paimon::spec::DataField; +use paimon::spec::{DataField, Predicate}; use paimon::table::Table; /// C-compatible key-value pair for options. @@ -78,10 +78,17 @@ pub struct paimon_read_builder { pub inner: *mut c_void, } -/// Internal state for ReadBuilder that stores table and projection columns. +/// Internal state for ReadBuilder that stores table, projection columns, and filter. pub(crate) struct ReadBuilderState { pub table: Table, pub projected_columns: Option>, + pub filter: Option, +} + +/// Internal state for TableScan that stores table and filter. +pub(crate) struct TableScanState { + pub table: Table, + pub filter: Option, } #[repr(C)] @@ -94,10 +101,11 @@ pub struct paimon_table_read { pub inner: *mut c_void, } -/// Internal state for TableRead that stores table and projected read type. +/// Internal state for TableRead that stores table, projected read type, and data predicates. pub(crate) struct TableReadState { pub table: Table, pub read_type: Vec, + pub data_predicates: Vec, } #[repr(C)] @@ -110,6 +118,53 @@ pub struct paimon_record_batch_reader { pub inner: *mut c_void, } +/// Opaque wrapper around a Predicate. +#[repr(C)] +pub struct paimon_predicate { + pub inner: *mut c_void, +} + +/// A typed literal value for predicate comparison, passed across FFI. +/// +/// # Design +/// +/// We use a tagged flat struct instead of opaque heap-allocated handles +/// (like DuckDB's `duckdb_value`). The trade-off: +/// +/// - **Pro**: Zero allocation — the entire datum is passed by value on the +/// stack, with no heap round-trips or free calls needed. This keeps the +/// FFI surface minimal and the Go/C caller simple. +/// - **Con**: The struct is larger than any single variant needs, wasting +/// some bytes per datum (currently ~56 bytes vs. ~16 for the largest +/// single variant). +/// +/// Since datums are only used for predicate construction (not a hot path), +/// the extra size is acceptable. +/// +/// # Tags +/// +/// - 0: Bool, 1: TinyInt, 2: SmallInt, 3: Int, 4: Long +/// - 5: Float, 6: Double, 7: String, 8: Date, 9: Time +/// - 10: Timestamp, 11: LocalZonedTimestamp, 12: Decimal, 13: Bytes +/// +/// `tag` determines which value fields are valid: +/// - `Bool`/`TinyInt`/`SmallInt`/`Int`/`Long`/`Date`/`Time` → `int_val` +/// - `Float`/`Double` → `double_val` +/// - `String`/`Bytes` → `str_data` + `str_len` +/// - `Timestamp`/`LocalZonedTimestamp` → `int_val` (millis) + `int_val2` (nanos) +/// - `Decimal` → `int_val` + `int_val2` (unscaled i128) + `uint_val` (precision) + `uint_val2` (scale) +#[repr(C)] +pub struct paimon_datum { + pub tag: i32, + pub int_val: i64, + pub double_val: f64, + pub str_data: *const u8, + pub str_len: usize, + pub int_val2: i64, + pub uint_val: u32, + pub uint_val2: u32, +} + /// A single Arrow record batch exported via the Arrow C Data Interface. /// /// `array` and `schema` point to heap-allocated ArrowArray and ArrowSchema diff --git a/bindings/go/predicate.go b/bindings/go/predicate.go new file mode 100644 index 0000000..d39b684 --- /dev/null +++ b/bindings/go/predicate.go @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "fmt" + "math" + "runtime" + "sync" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// Datum type tags (must match paimon-c datum_from_c). +const ( + datumTagBool int32 = 0 + datumTagTinyInt int32 = 1 + datumTagSmallInt int32 = 2 + datumTagInt int32 = 3 + datumTagLong int32 = 4 + datumTagFloat int32 = 5 + datumTagDouble int32 = 6 + datumTagString int32 = 7 + datumTagDate int32 = 8 + datumTagTime int32 = 9 + datumTagTimestamp int32 = 10 + datumTagLocalZonedTimestamp int32 = 11 + datumTagDecimal int32 = 12 + datumTagBytes int32 = 13 +) + +// Datum is a typed literal value for predicate comparison. +// The internal representation is hidden to allow future changes +// (e.g. switching to opaque handles) without breaking callers. +type Datum struct { + inner paimonDatumC +} + +// BoolDatum creates a boolean datum. +func BoolDatum(v bool) Datum { + var iv int64 + if v { + iv = 1 + } + return Datum{inner: paimonDatumC{tag: datumTagBool, intVal: iv}} +} + +// TinyIntDatum creates a tinyint datum. +func TinyIntDatum(v int8) Datum { + return Datum{inner: paimonDatumC{tag: datumTagTinyInt, intVal: int64(v)}} +} + +// SmallIntDatum creates a smallint datum. +func SmallIntDatum(v int16) Datum { + return Datum{inner: paimonDatumC{tag: datumTagSmallInt, intVal: int64(v)}} +} + +// IntDatum creates an int datum. +func IntDatum(v int32) Datum { + return Datum{inner: paimonDatumC{tag: datumTagInt, intVal: int64(v)}} +} + +// LongDatum creates a long (bigint) datum. +func LongDatum(v int64) Datum { + return Datum{inner: paimonDatumC{tag: datumTagLong, intVal: v}} +} + +// FloatDatum creates a float datum. +func FloatDatum(v float32) Datum { + return Datum{inner: paimonDatumC{tag: datumTagFloat, dblVal: float64(v)}} +} + +// DoubleDatum creates a double datum. +func DoubleDatum(v float64) Datum { + return Datum{inner: paimonDatumC{tag: datumTagDouble, dblVal: v}} +} + +// StringDatum creates a string datum. +func StringDatum(v string) Datum { + b := []byte(v) + d := paimonDatumC{tag: datumTagString, strLen: uintptr(len(b))} + if len(b) > 0 { + d.strData = &b[0] + } + return Datum{inner: d} +} + +// Date represents a date value as epoch days since 1970-01-01. +// Usage: table.PredicateEqual("dt", paimon.Date(19000)) +type Date int32 + +// Time represents a time-of-day value as milliseconds since midnight. +// Usage: table.PredicateEqual("t", paimon.Time(3600000)) +type Time int32 + +// Timestamp represents a timestamp without timezone (millis + sub-millis nanos). +// Usage: table.PredicateEqual("ts", paimon.Timestamp{Millis: 1700000000000, Nanos: 0}) +type Timestamp struct { + Millis int64 + Nanos int32 +} + +// LocalZonedTimestamp represents a timestamp with local timezone semantics. +// Usage: table.PredicateEqual("lzts", paimon.LocalZonedTimestamp{Millis: 1700000000000, Nanos: 0}) +type LocalZonedTimestamp struct { + Millis int64 + Nanos int32 +} + +// Decimal represents a fixed-precision decimal value up to DECIMAL(38, s). +// +// The unscaled value is stored as a little-endian i128 split into two int64 +// halves: Lo (low 64 bits, unsigned interpretation) and Hi (high 64 bits, +// sign-extended). For values that fit in int64, use [NewDecimal]. +// +// Usage: +// +// paimon.NewDecimal(12345, 10, 2) // 123.45 as DECIMAL(10,2) +// paimon.Decimal{Lo: lo, Hi: hi, ...} // full i128 +type Decimal struct { + Lo int64 // low 64 bits of unscaled i128 (unsigned interpretation) + Hi int64 // high 64 bits of unscaled i128 (sign extension) + Precision uint32 + Scale uint32 +} + +// NewDecimal creates a Decimal from an int64 unscaled value. +// For unscaled values that exceed int64 range, construct Decimal directly +// with Lo/Hi fields. +func NewDecimal(unscaled int64, precision, scale uint32) Decimal { + hi := int64(0) + if unscaled < 0 { + hi = -1 + } + return Decimal{Lo: unscaled, Hi: hi, Precision: precision, Scale: scale} +} + +// Bytes represents a binary value. +// Usage: table.PredicateEqual("data", paimon.Bytes(someSlice)) +type Bytes []byte + +// toDatum converts a Go value to a Datum for predicate comparison. +// +// Supported Go types and their Paimon mappings: +// - bool → Bool +// - int8 → TinyInt +// - int16 → SmallInt +// - int32 → Int +// - int → Int (if fits int32) or Long +// - int64 → Long +// - float32 → Float +// - float64 → Double +// - string → String +// - Date → Date +// - Time → Time +// - Timestamp → Timestamp +// - LocalZonedTimestamp → LocalZonedTimestamp +// - Decimal → Decimal +// - Bytes → Bytes +// - Datum → passed through +func toDatum(v any) (Datum, error) { + switch val := v.(type) { + case bool: + return BoolDatum(val), nil + case int8: + return TinyIntDatum(val), nil + case int16: + return SmallIntDatum(val), nil + case int32: + return IntDatum(val), nil + case int: + if val >= math.MinInt32 && val <= math.MaxInt32 { + return IntDatum(int32(val)), nil + } + return LongDatum(int64(val)), nil + case int64: + return LongDatum(val), nil + case float32: + return FloatDatum(val), nil + case float64: + return DoubleDatum(val), nil + case string: + return StringDatum(val), nil + case Date: + return Datum{inner: paimonDatumC{tag: datumTagDate, intVal: int64(val)}}, nil + case Time: + return Datum{inner: paimonDatumC{tag: datumTagTime, intVal: int64(val)}}, nil + case Timestamp: + return Datum{inner: paimonDatumC{tag: datumTagTimestamp, intVal: val.Millis, intVal2: int64(val.Nanos)}}, nil + case LocalZonedTimestamp: + return Datum{inner: paimonDatumC{tag: datumTagLocalZonedTimestamp, intVal: val.Millis, intVal2: int64(val.Nanos)}}, nil + case Decimal: + return Datum{inner: paimonDatumC{ + tag: datumTagDecimal, intVal: val.Lo, intVal2: val.Hi, + uintVal: val.Precision, uintVal2: val.Scale, + }}, nil + case Bytes: + d := paimonDatumC{tag: datumTagBytes, strLen: uintptr(len(val))} + if len(val) > 0 { + d.strData = &val[0] + } + return Datum{inner: d}, nil + case Datum: + return val, nil + default: + return Datum{}, fmt.Errorf("unsupported datum type: %T", v) + } +} + +// Predicate is an opaque filter predicate for scan planning. +type Predicate struct { + ctx context.Context + lib *libRef + inner *paimonPredicate + closeOnce sync.Once +} + +// Close releases the predicate resources. Safe to call multiple times. +// Note: predicates passed to WithFilter or combinators (And/Or/Not) are consumed +// and should NOT be closed by the caller. +func (p *Predicate) Close() { + p.closeOnce.Do(func() { + if p.inner != nil { + ffiPredicateFree.symbol(p.ctx)(p.inner) + p.inner = nil + p.lib.release() + } + }) +} + +// errConsumedPredicate is returned when a consumed or nil predicate is reused. +var errConsumedPredicate = fmt.Errorf("paimon: predicate already consumed or nil") + +// PredicateBuilder creates filter predicates for a table. +// It holds a Go-level reference to the Table and does not own any C resources, +// so there is no Close() method. +type PredicateBuilder struct { + table *Table +} + +// Eq creates an equality predicate: column = value. +func (pb *PredicateBuilder) Eq(column string, value any) (*Predicate, error) { + datum, err := toDatum(value) + if err != nil { + return nil, err + } + return pb.buildLeafPredicate(ffiPredicateEqual, column, datum) +} + +// NotEq creates a not-equal predicate: column != value. +func (pb *PredicateBuilder) NotEq(column string, value any) (*Predicate, error) { + datum, err := toDatum(value) + if err != nil { + return nil, err + } + return pb.buildLeafPredicate(ffiPredicateNotEqual, column, datum) +} + +// Lt creates a less-than predicate: column < value. +func (pb *PredicateBuilder) Lt(column string, value any) (*Predicate, error) { + datum, err := toDatum(value) + if err != nil { + return nil, err + } + return pb.buildLeafPredicate(ffiPredicateLessThan, column, datum) +} + +// Le creates a less-or-equal predicate: column <= value. +func (pb *PredicateBuilder) Le(column string, value any) (*Predicate, error) { + datum, err := toDatum(value) + if err != nil { + return nil, err + } + return pb.buildLeafPredicate(ffiPredicateLessOrEqual, column, datum) +} + +// Gt creates a greater-than predicate: column > value. +func (pb *PredicateBuilder) Gt(column string, value any) (*Predicate, error) { + datum, err := toDatum(value) + if err != nil { + return nil, err + } + return pb.buildLeafPredicate(ffiPredicateGreaterThan, column, datum) +} + +// Ge creates a greater-or-equal predicate: column >= value. +func (pb *PredicateBuilder) Ge(column string, value any) (*Predicate, error) { + datum, err := toDatum(value) + if err != nil { + return nil, err + } + return pb.buildLeafPredicate(ffiPredicateGreaterOrEqual, column, datum) +} + +// IsNull creates an IS NULL predicate. +func (pb *PredicateBuilder) IsNull(column string) (*Predicate, error) { + return pb.buildNullPredicate(ffiPredicateIsNull, column) +} + +// IsNotNull creates an IS NOT NULL predicate. +func (pb *PredicateBuilder) IsNotNull(column string) (*Predicate, error) { + return pb.buildNullPredicate(ffiPredicateIsNotNull, column) +} + +// In creates an IN predicate: column IN (values...). +func (pb *PredicateBuilder) In(column string, values ...any) (*Predicate, error) { + return pb.buildInPredicate(ffiPredicateIsIn, column, values) +} + +// NotIn creates a NOT IN predicate: column NOT IN (values...). +func (pb *PredicateBuilder) NotIn(column string, values ...any) (*Predicate, error) { + return pb.buildInPredicate(ffiPredicateIsNotIn, column, values) +} + +// buildLeafPredicate is a helper for comparison predicates that take (table, column, datum). +func (pb *PredicateBuilder) buildLeafPredicate( + ffiVar *FFI[func(*paimonTable, *byte, paimonDatumC) (*paimonPredicate, error)], + column string, datum Datum, +) (*Predicate, error) { + t := pb.table + if t.inner == nil { + return nil, ErrClosed + } + createFn := ffiVar.symbol(t.ctx) + cCol := append([]byte(column), 0) + inner, err := createFn(t.inner, &cCol[0], datum.inner) + runtime.KeepAlive(cCol) + runtime.KeepAlive(datum) + if err != nil { + return nil, err + } + t.lib.acquire() + return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil +} + +// buildNullPredicate is a helper for IS NULL / IS NOT NULL predicates. +func (pb *PredicateBuilder) buildNullPredicate( + ffiVar *FFI[func(*paimonTable, *byte) (*paimonPredicate, error)], + column string, +) (*Predicate, error) { + t := pb.table + if t.inner == nil { + return nil, ErrClosed + } + createFn := ffiVar.symbol(t.ctx) + cCol := append([]byte(column), 0) + inner, err := createFn(t.inner, &cCol[0]) + runtime.KeepAlive(cCol) + if err != nil { + return nil, err + } + t.lib.acquire() + return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil +} + +// buildInPredicate is a helper for IS IN / IS NOT IN predicates. +func (pb *PredicateBuilder) buildInPredicate( + ffiVar *FFI[func(*paimonTable, *byte, unsafe.Pointer, uintptr) (*paimonPredicate, error)], + column string, values []any, +) (*Predicate, error) { + t := pb.table + if t.inner == nil { + return nil, ErrClosed + } + datums := make([]paimonDatumC, len(values)) + for i, v := range values { + d, err := toDatum(v) + if err != nil { + return nil, err + } + datums[i] = d.inner + } + createFn := ffiVar.symbol(t.ctx) + cCol := append([]byte(column), 0) + var datumsPtr unsafe.Pointer + if len(datums) > 0 { + datumsPtr = unsafe.Pointer(&datums[0]) + } + inner, err := createFn(t.inner, &cCol[0], datumsPtr, uintptr(len(datums))) + runtime.KeepAlive(cCol) + runtime.KeepAlive(datums) + runtime.KeepAlive(values) + if err != nil { + return nil, err + } + t.lib.acquire() + return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil +} + +// combinePredicate is a shared helper for And/Or. +func (p *Predicate) combinePredicate( + other *Predicate, + ffiVar *FFI[func(*paimonPredicate, *paimonPredicate) *paimonPredicate], +) (*Predicate, error) { + if p == nil || p.inner == nil { + return nil, errConsumedPredicate + } + if other == nil || other.inner == nil { + return nil, errConsumedPredicate + } + if p == other { + return nil, fmt.Errorf("paimon: cannot combine a predicate with itself") + } + combineFn := ffiVar.symbol(p.ctx) + p.inner = combineFn(p.inner, other.inner) + other.inner = nil + other.lib.release() + return p, nil +} + +// And combines this predicate with another using AND. Consumes both predicates +// (callers must NOT close either after this call). +func (p *Predicate) And(other *Predicate) (*Predicate, error) { + return p.combinePredicate(other, ffiPredicateAnd) +} + +// Or combines this predicate with another using OR. Consumes both predicates +// (callers must NOT close either after this call). +func (p *Predicate) Or(other *Predicate) (*Predicate, error) { + return p.combinePredicate(other, ffiPredicateOr) +} + +// Not negates this predicate. Consumes the input +// (caller must NOT close it after this call). +func (p *Predicate) Not() (*Predicate, error) { + if p == nil || p.inner == nil { + return nil, errConsumedPredicate + } + negateFn := ffiPredicateNot.symbol(p.ctx) + p.inner = negateFn(p.inner) + return p, nil +} + +// FFI wrappers for predicate functions. + +var ffiPredicateFree = newFFI(ffiOpts{ + sym: "paimon_predicate_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(p *paimonPredicate) { + return func(p *paimonPredicate) { + ffiCall(nil, unsafe.Pointer(&p)) + } +}) + +var ffiPredicateEqual = newPredicateLeafFFI("paimon_predicate_equal") +var ffiPredicateNotEqual = newPredicateLeafFFI("paimon_predicate_not_equal") +var ffiPredicateLessThan = newPredicateLeafFFI("paimon_predicate_less_than") +var ffiPredicateLessOrEqual = newPredicateLeafFFI("paimon_predicate_less_or_equal") +var ffiPredicateGreaterThan = newPredicateLeafFFI("paimon_predicate_greater_than") +var ffiPredicateGreaterOrEqual = newPredicateLeafFFI("paimon_predicate_greater_or_equal") + +// newPredicateLeafFFI creates an FFI wrapper for comparison predicate functions +// with signature: (table, column, datum) -> result_predicate. +func newPredicateLeafFFI(sym string) *FFI[func(*paimonTable, *byte, paimonDatumC) (*paimonPredicate, error)] { + return newFFI(ffiOpts{ + sym: contextKey(sym), + rType: &typeResultPredicate, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer, &typePaimonDatum}, + }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte, paimonDatumC) (*paimonPredicate, error) { + return func(table *paimonTable, column *byte, datum paimonDatumC) (*paimonPredicate, error) { + var result resultPredicate + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&table), + unsafe.Pointer(&column), + unsafe.Pointer(&datum), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.predicate, nil + } + }) +} + +var ffiPredicateIsNull = newPredicateNullFFI("paimon_predicate_is_null") +var ffiPredicateIsNotNull = newPredicateNullFFI("paimon_predicate_is_not_null") + +// newPredicateNullFFI creates an FFI wrapper for null-check predicate functions +// with signature: (table, column) -> result_predicate. +func newPredicateNullFFI(sym string) *FFI[func(*paimonTable, *byte) (*paimonPredicate, error)] { + return newFFI(ffiOpts{ + sym: contextKey(sym), + rType: &typeResultPredicate, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer}, + }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte) (*paimonPredicate, error) { + return func(table *paimonTable, column *byte) (*paimonPredicate, error) { + var result resultPredicate + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&table), + unsafe.Pointer(&column), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.predicate, nil + } + }) +} + +var ffiPredicateIsIn = newPredicateInFFI("paimon_predicate_is_in") +var ffiPredicateIsNotIn = newPredicateInFFI("paimon_predicate_is_not_in") + +// newPredicateInFFI creates an FFI wrapper for IN/NOT IN predicate functions +// with signature: (table, column, datums, datums_len) -> result_predicate. +func newPredicateInFFI(sym string) *FFI[func(*paimonTable, *byte, unsafe.Pointer, uintptr) (*paimonPredicate, error)] { + return newFFI(ffiOpts{ + sym: contextKey(sym), + rType: &typeResultPredicate, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer, &ffi.TypePointer, &ffi.TypePointer}, + }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte, unsafe.Pointer, uintptr) (*paimonPredicate, error) { + return func(table *paimonTable, column *byte, datums unsafe.Pointer, datumsLen uintptr) (*paimonPredicate, error) { + var result resultPredicate + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&table), + unsafe.Pointer(&column), + unsafe.Pointer(&datums), + unsafe.Pointer(&datumsLen), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.predicate, nil + } + }) +} + +var ffiPredicateAnd = newFFI(ffiOpts{ + sym: "paimon_predicate_and", + rType: &ffi.TypePointer, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate, *paimonPredicate) *paimonPredicate { + return func(a, b *paimonPredicate) *paimonPredicate { + var result *paimonPredicate + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&a), + unsafe.Pointer(&b), + ) + return result + } +}) + +var ffiPredicateOr = newFFI(ffiOpts{ + sym: "paimon_predicate_or", + rType: &ffi.TypePointer, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate, *paimonPredicate) *paimonPredicate { + return func(a, b *paimonPredicate) *paimonPredicate { + var result *paimonPredicate + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&a), + unsafe.Pointer(&b), + ) + return result + } +}) + +var ffiPredicateNot = newFFI(ffiOpts{ + sym: "paimon_predicate_not", + rType: &ffi.TypePointer, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate) *paimonPredicate { + return func(p *paimonPredicate) *paimonPredicate { + var result *paimonPredicate + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&p), + ) + return result + } +}) diff --git a/bindings/go/predicate/predicate.go b/bindings/go/predicate/predicate.go new file mode 100644 index 0000000..046fa78 --- /dev/null +++ b/bindings/go/predicate/predicate.go @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Package predicate provides convenience functions for combining Paimon filter predicates. +package predicate + +import ( + "fmt" + + paimon "github.com/apache/paimon-rust/bindings/go" +) + +// Predicate is an alias for paimon.Predicate. +type Predicate = paimon.Predicate + +// And combines two or more predicates with AND. Consumes all inputs +// (callers must NOT close them after this call). +func And(preds ...*Predicate) (*Predicate, error) { + if len(preds) < 2 { + return nil, fmt.Errorf("predicate.And requires at least 2 predicates, got %d", len(preds)) + } + result := preds[0] + for _, p := range preds[1:] { + r, err := result.And(p) + if err != nil { + return nil, err + } + result = r + } + return result, nil +} + +// Or combines two or more predicates with OR. Consumes all inputs +// (callers must NOT close them after this call). +func Or(preds ...*Predicate) (*Predicate, error) { + if len(preds) < 2 { + return nil, fmt.Errorf("predicate.Or requires at least 2 predicates, got %d", len(preds)) + } + result := preds[0] + for _, p := range preds[1:] { + r, err := result.Or(p) + if err != nil { + return nil, err + } + result = r + } + return result, nil +} + +// Not negates a predicate. Consumes the input +// (caller must NOT close it after this call). +func Not(p *Predicate) (*Predicate, error) { + return p.Not() +} diff --git a/bindings/go/read_builder.go b/bindings/go/read_builder.go index fde6bf5..177c595 100644 --- a/bindings/go/read_builder.go +++ b/bindings/go/read_builder.go @@ -56,6 +56,47 @@ func (rb *ReadBuilder) WithProjection(columns []string) error { return projFn(rb.inner, columns) } +// WithFilter sets a filter predicate for scan planning and read-side pruning. +// +// The predicate is used in two phases: +// - Scan planning: prunes partitions, buckets, and data files based on +// file-level statistics (min/max). This is conservative — files whose +// statistics are inconclusive are kept. +// - Read-side: applies row-level filtering via Parquet native row filters +// for supported leaf predicates (Eq, NotEq, Lt, Le, Gt, Ge, IsNull, +// IsNotNull, In, NotIn). +// +// Row-level filtering is exact for most common types (Bool, Int, Long, Float, +// Double, String, Date, Decimal, Binary). However, the following cases are NOT +// filtered at the row level and may return non-matching rows: +// - Compound predicates (And/Or/Not) — not yet implemented for row-level filtering. +// - Time, Timestamp, and LocalZonedTimestamp columns (not yet implemented). +// - Schema-evolution: the predicate column does not exist in older data files. +// - Data-evolution mode (data-evolution.enabled = true). +// +// In these cases callers should apply residual filtering on the returned records. +// +// The predicate is consumed (ownership transferred to the read builder); +// the caller must NOT close it after this call. +// Passing nil is a no-op. +func (rb *ReadBuilder) WithFilter(p *Predicate) error { + if rb.inner == nil { + return ErrClosed + } + if p == nil { + return nil + } + if p.inner == nil { + return errConsumedPredicate + } + filterFn := ffiReadBuilderWithFilter.symbol(rb.ctx) + err := filterFn(rb.inner, p.inner) + // Ownership transferred; prevent double-free. + p.inner = nil + p.lib.release() + return err +} + // NewScan creates a TableScan for planning which data files to read. func (rb *ReadBuilder) NewScan() (*TableScan, error) { if rb.inner == nil { @@ -136,6 +177,25 @@ var ffiReadBuilderWithProjection = newFFI(ffiOpts{ } }) +var ffiReadBuilderWithFilter = newFFI(ffiOpts{ + sym: "paimon_read_builder_with_filter", + rType: &ffi.TypePointer, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder, p *paimonPredicate) error { + return func(rb *paimonReadBuilder, p *paimonPredicate) error { + var errPtr *paimonError + ffiCall( + unsafe.Pointer(&errPtr), + unsafe.Pointer(&rb), + unsafe.Pointer(&p), + ) + if errPtr != nil { + return parseError(ctx, errPtr) + } + return nil + } +}) + var ffiReadBuilderNewScan = newFFI(ffiOpts{ sym: "paimon_read_builder_new_scan", rType: &typeResultTableScan, diff --git a/bindings/go/table.go b/bindings/go/table.go index 17ff7af..e6008a2 100644 --- a/bindings/go/table.go +++ b/bindings/go/table.go @@ -44,6 +44,11 @@ func (t *Table) Close() { }) } +// PredicateBuilder returns a builder for creating filter predicates on this table. +func (t *Table) PredicateBuilder() *PredicateBuilder { + return &PredicateBuilder{table: t} +} + // NewReadBuilder creates a ReadBuilder for this table. func (t *Table) NewReadBuilder() (*ReadBuilder, error) { if t.inner == nil { diff --git a/bindings/go/tests/paimon_test.go b/bindings/go/tests/paimon_test.go index e83f151..e9ee196 100644 --- a/bindings/go/tests/paimon_test.go +++ b/bindings/go/tests/paimon_test.go @@ -30,41 +30,14 @@ import ( paimon "github.com/apache/paimon-rust/bindings/go" ) -// TestReadLogTable reads the test table and verifies the data matches expected values. -// -// The table was populated by Docker provisioning with: -// -// (1, 'alice'), (2, 'bob'), (3, 'carol') -func TestReadLogTable(t *testing.T) { - warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE") - if warehouse == "" { - warehouse = "/tmp/paimon-warehouse" - } - - if _, err := os.Stat(warehouse); os.IsNotExist(err) { - t.Skipf("Skipping: warehouse %s does not exist (run 'make docker-up' first)", warehouse) - } - - // Use NewCatalog with options - catalog, err := paimon.NewCatalog(map[string]string{ - "warehouse": warehouse, - }) - if err != nil { - t.Fatalf("Failed to create catalog: %v", err) - } - defer catalog.Close() +type row struct { + id int32 + name string +} - table, err := catalog.GetTable(paimon.NewIdentifier("default", "simple_log_table")) - if err != nil { - t.Fatalf("Failed to get table: %v", err) - } - defer table.Close() - - rb, err := table.NewReadBuilder() - if err != nil { - t.Fatalf("Failed to create read builder: %v", err) - } - defer rb.Close() +// readRows scans and reads all (id, name) rows from a ReadBuilder. +func readRows(t *testing.T, rb *paimon.ReadBuilder) []row { + t.Helper() scan, err := rb.NewScan() if err != nil { @@ -80,7 +53,7 @@ func TestReadLogTable(t *testing.T) { splits := plan.Splits() if len(splits) == 0 { - t.Fatal("Expected at least one split") + return nil } read, err := rb.NewRead() @@ -95,13 +68,6 @@ func TestReadLogTable(t *testing.T) { } defer reader.Close() - // Import Arrow batches via C Data Interface and collect rows. - // Strings are copied before Release because arrow-go's String.Value() - // returns zero-copy references into the Arrow buffer. - type row struct { - id int32 - name string - } var rows []row batchIdx := 0 for { @@ -132,58 +98,130 @@ func TestReadLogTable(t *testing.T) { record.Release() batchIdx++ } - - if len(rows) == 0 { - t.Fatal("Expected at least one row, got 0") - } - - sort.Slice(rows, func(i, j int) bool { - return rows[i].id < rows[j].id - }) - - expected := []row{ - {1, "alice"}, - {2, "bob"}, - {3, "carol"}, - } - - if len(rows) != len(expected) { - t.Fatalf("Expected %d rows, got %d: %v", len(expected), len(rows), rows) - } - - for i, exp := range expected { - if rows[i] != exp { - t.Errorf("Row %d: expected %v, got %v", i, exp, rows[i]) - } - } + return rows } -// TestReadWithProjection reads only the "id" column via WithProjection and -// verifies that only the projected column is returned with correct values. -func TestReadWithProjection(t *testing.T) { +// openTestTable creates a catalog, opens the simple_log_table, and returns +// the table along with a cleanup function. Skips the test if the warehouse +// does not exist. +func openTestTable(t *testing.T) *paimon.Table { + t.Helper() + warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE") if warehouse == "" { warehouse = "/tmp/paimon-warehouse" } - if _, err := os.Stat(warehouse); os.IsNotExist(err) { t.Skipf("Skipping: warehouse %s does not exist (run 'make docker-up' first)", warehouse) } - // Use NewCatalog with options catalog, err := paimon.NewCatalog(map[string]string{ "warehouse": warehouse, }) if err != nil { t.Fatalf("Failed to create catalog: %v", err) } - defer catalog.Close() + t.Cleanup(func() { catalog.Close() }) table, err := catalog.GetTable(paimon.NewIdentifier("default", "simple_log_table")) if err != nil { t.Fatalf("Failed to get table: %v", err) } - defer table.Close() + t.Cleanup(func() { table.Close() }) + + return table +} + +// TestReadLogTable reads the test table and verifies the data matches expected values. +// +// The table was populated by Docker provisioning with: +// +// (1, 'alice'), (2, 'bob'), (3, 'carol') +func TestReadLogTable(t *testing.T) { + table := openTestTable(t) + + rb, err := table.NewReadBuilder() + if err != nil { + t.Fatalf("Failed to create read builder: %v", err) + } + defer rb.Close() + + rows := readRows(t, rb) + if len(rows) == 0 { + t.Fatal("Expected at least one row, got 0") + } + + sort.Slice(rows, func(i, j int) bool { return rows[i].id < rows[j].id }) + + expected := []row{{1, "alice"}, {2, "bob"}, {3, "carol"}} + if len(rows) != len(expected) { + t.Fatalf("Expected %d rows, got %d: %v", len(expected), len(rows), rows) + } + for i, exp := range expected { + if rows[i] != exp { + t.Errorf("Row %d: expected %v, got %v", i, exp, rows[i]) + } + } +} + +// TestReadWithFilter exercises filter push-down through several sub-tests. +func TestReadWithFilter(t *testing.T) { + table := openTestTable(t) + + t.Run("EqualById", func(t *testing.T) { + rb, err := table.NewReadBuilder() + if err != nil { + t.Fatalf("Failed to create read builder: %v", err) + } + defer rb.Close() + + // id = 1 + pb := table.PredicateBuilder() + pred, err := pb.Eq("id", 1) + if err != nil { + t.Fatalf("Failed to create predicate: %v", err) + } + if err := rb.WithFilter(pred); err != nil { + t.Fatalf("Failed to set filter: %v", err) + } + + rows := readRows(t, rb) + expected := []row{{1, "alice"}} + if len(rows) != len(expected) { + t.Fatalf("Expected %d rows, got %d: %v", len(expected), len(rows), rows) + } + if rows[0] != expected[0] { + t.Errorf("Expected %v, got %v", expected[0], rows[0]) + } + }) + + t.Run("EmptyStringEqual", func(t *testing.T) { + rb, err := table.NewReadBuilder() + if err != nil { + t.Fatalf("Failed to create read builder: %v", err) + } + defer rb.Close() + + pb := table.PredicateBuilder() + pred, err := pb.Eq("name", "") + if err != nil { + t.Fatalf("Eq with empty string failed: %v", err) + } + if err := rb.WithFilter(pred); err != nil { + t.Fatalf("WithFilter failed: %v", err) + } + + rows := readRows(t, rb) + if len(rows) != 0 { + t.Fatalf("Expected 0 rows for empty string filter, got %d: %v", len(rows), rows) + } + }) +} + +// TestReadWithProjection reads only the "id" column via WithProjection and +// verifies that only the projected column is returned with correct values. +func TestReadWithProjection(t *testing.T) { + table := openTestTable(t) rb, err := table.NewReadBuilder() if err != nil { @@ -191,7 +229,6 @@ func TestReadWithProjection(t *testing.T) { } defer rb.Close() - // Set projection to only read "id" column if err := rb.WithProjection([]string{"id"}); err != nil { t.Fatalf("Failed to set projection: %v", err) } @@ -236,7 +273,6 @@ func TestReadWithProjection(t *testing.T) { t.Fatalf("Batch %d: failed to read next record: %v", batchIdx, err) } - // Verify schema only contains the projected column schema := record.Schema() if schema.NumFields() != 1 { record.Release() diff --git a/bindings/go/types.go b/bindings/go/types.go index fdc8990..7d943cb 100644 --- a/bindings/go/types.go +++ b/bindings/go/types.go @@ -120,6 +120,34 @@ var ( }[0], } + // paimon_result_predicate { predicate: *paimon_predicate, error: *paimon_error } + typeResultPredicate = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_datum { tag: i32, int_val: i64, double_val: f64, str_data: *u8, str_len: usize, + // int_val2: i64, uint_val: u32, uint_val2: u32 } + typePaimonDatum = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypeSint32, // tag + &ffi.TypeSint32, // padding + &ffi.TypeSint64, // int_val + &ffi.TypeDouble, // double_val + &ffi.TypePointer, // str_data + &ffi.TypePointer, // str_len (usize) + &ffi.TypeSint64, // int_val2 + &ffi.TypeUint32, // uint_val + &ffi.TypeUint32, // uint_val2 + nil, + }[0], + } + // paimon_result_next_batch { batch: paimon_arrow_batch, error: *paimon_error } typeResultNextBatch = ffi.Type{ Type: ffi.Struct, @@ -153,6 +181,7 @@ type paimonTableScan struct{} type paimonTableRead struct{} type paimonPlan struct{} type paimonRecordBatchReader struct{} +type paimonPredicate struct{} // Result types matching the C repr structs type resultCatalogNew struct { @@ -195,6 +224,24 @@ type resultRecordBatchReader struct { error *paimonError } +type resultPredicate struct { + predicate *paimonPredicate + error *paimonError +} + +// paimonDatumC mirrors the C paimon_datum struct. +type paimonDatumC struct { + tag int32 + _pad0 [4]byte // padding for alignment + intVal int64 + dblVal float64 + strData *byte + strLen uintptr + intVal2 int64 + uintVal uint32 + uintVal2 uint32 +} + // arrowBatch holds a single Arrow record batch via the Arrow C Data Interface. type arrowBatch struct { ctx context.Context diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 4a405ea..a931230 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -249,6 +249,11 @@ impl<'a> TableRead<'a> { &self.read_type } + /// Data predicates for read-side pruning. + pub fn data_predicates(&self) -> &[Predicate] { + &self.data_predicates + } + /// Table for this read. pub fn table(&self) -> &Table { self.table