diff --git a/bench/ndarray/multithreaded_matmul_bench.py b/bench/ndarray/multithreaded_matmul_bench.py new file mode 100644 index 00000000..810527c6 --- /dev/null +++ b/bench/ndarray/multithreaded_matmul_bench.py @@ -0,0 +1,48 @@ +import blosc2 +import numpy as np +import time + +N = 10000 +ndim = 2 +ashape = (N,) * ndim +bshape = ashape +dtype = np.float64 + +achunks = (1000, 1000) +bchunks = (achunks[1], achunks[0]) +ablocks = (200, 200) +bblocks = (ablocks[1], ablocks[0]) +outblocks = (ablocks[0], bblocks[1]) +outchunks = (achunks[0], bchunks[1]) +# a = blosc2.linspace(0, 1, dtype=dtype, shape=ashape, chunks=achunks, blocks=ablocks) +# b = blosc2.linspace(0, 1, dtype=dtype, shape=bshape, chunks=bchunks, blocks=bblocks) +a = blosc2.ones(dtype=dtype, shape=ashape, chunks=achunks, blocks=ablocks) +b = blosc2.full(fill_value=2, dtype=dtype, shape=bshape, chunks=bchunks, blocks=bblocks) + +a_np = a[:] +b_np = b[:] +tic = time.time() +np_res = np.matmul(a_np, b_np) +print(f'numpy finished in {time.time()-tic} s') + +tic = time.time() +b2_res = blosc2.matmul(a, b, blocks=outblocks, chunks=outchunks) +print(f'blosc2 multithreaded finished in {time.time()-tic} s') + +tic = time.time() +b2_res = blosc2.matmul(a, b) +print(f'blosc2 normal finished in {time.time()-tic} s') + +achunks = None #(1000, 1000) +bchunks = None #(achunks[1], achunks[0]) +ablocks = None #(200, 200) +bblocks = None #(ablocks[1], ablocks[0]) +outblocks = None #(ablocks[0], bblocks[1]) +outchunks = None #(achunks[0], bchunks[1]) +# a = blosc2.linspace(0, 1, dtype=dtype, shape=ashape, chunks=achunks, blocks=ablocks) +# b = blosc2.linspace(0, 1, dtype=dtype, shape=bshape, chunks=bchunks, blocks=bblocks) +a = blosc2.ones(dtype=dtype, shape=ashape, chunks=achunks, blocks=ablocks) +b = blosc2.full(fill_value=2, dtype=dtype, shape=bshape, chunks=bchunks, blocks=bblocks) +tic = time.time() +b2_res = blosc2.matmul(a, b, blocks=outblocks, chunks=outchunks) +print(f'blosc2 normal with default chunks etc. finished in {time.time()-tic} s') diff --git a/bench/ndarray/stringops_bench.py b/bench/ndarray/stringops_bench.py index c983a0f2..5f97aac3 100644 --- a/bench/ndarray/stringops_bench.py +++ b/bench/ndarray/stringops_bench.py @@ -12,7 +12,7 @@ import time import numpy as np import blosc2 -from blosc2.lazyexpr import _toggle_miniexpr +from blosc2.utils import _toggle_miniexpr # nparr = np.random.randint(low=0, high=128, size=(N, 10), dtype=np.uint32) # nparr = nparr.view('S40').astype('U10') diff --git a/src/blosc2/blosc2_ext.pyx b/src/blosc2/blosc2_ext.pyx index c56104ab..322f4d2b 100644 --- a/src/blosc2/blosc2_ext.pyx +++ b/src/blosc2/blosc2_ext.pyx @@ -42,7 +42,6 @@ cimport numpy as np np.import_array() - cdef extern from "": ctypedef signed char int8_t ctypedef signed short int16_t @@ -53,6 +52,12 @@ cdef extern from "": ctypedef unsigned int uint32_t ctypedef unsigned long long uint64_t +ctypedef fused T: + float + double + int32_t + int64_t + cdef extern from "": int printf(const char *format, ...) nogil @@ -669,6 +674,13 @@ ctypedef struct me_udata: int64_t blocks_in_chunk[B2ND_MAX_DIM] me_expr* miniexpr_handle +ctypedef struct mm_udata: + b2nd_array_t** inputs + b2nd_array_t* array + int64_t chunks_strides[3][B2ND_MAX_DIM] + int64_t blocks_strides[3][B2ND_MAX_DIM] + int64_t el_strides[3][B2ND_MAX_DIM] + MAX_TYPESIZE = BLOSC2_MAXTYPESIZE MAX_BUFFERSIZE = BLOSC2_MAX_BUFFERSIZE MAX_BLOCKSIZE = BLOSC2_MAXBLOCKSIZE @@ -982,7 +994,7 @@ cdef _check_cparams(blosc2_cparams *cparams): if ufilters[i] and cparams.filters[i] in blosc2.ufilters_registry.keys(): raise ValueError("Cannot use multi-threading with user defined Python filters") - if cparams.prefilter != NULL and cparams.prefilter != miniexpr_prefilter: + if cparams.prefilter != NULL and (cparams.prefilter != miniexpr_prefilter and cparams.prefilter != matmul_prefilter): # Note: miniexpr_prefilter uses miniexpr C API which is thread-friendly, raise ValueError("`nthreads` must be 1 when a prefilter is set") @@ -2017,7 +2029,6 @@ cdef int aux_miniexpr(me_udata *udata, int64_t nchunk, int32_t nblock, cdef b2nd_array_t* ndarr cdef int rc cdef void** input_buffers = malloc(udata.ninputs * sizeof(uint8_t*)) - cdef float *buf cdef uint8_t* src cdef uint8_t* chunk cdef c_bool needs_free @@ -2143,6 +2154,165 @@ cdef int aux_miniexpr(me_udata *udata, int64_t nchunk, int32_t nblock, return 0 +cdef int matmul_block_kernel(T* A, T* B, T* C, int M, int K, int N) nogil: + cdef int r, c, k + cdef T a + cdef int rowA, rowC, rowB + for r in range(M): + rowA = r * K + rowC = r * N + for k in range(K): + a = A[rowA + k] + rowB = k * N + for c in range(N): + C[rowC + c] += (a * B[rowB + c]) + return 0 + +cdef int aux_matmul(mm_udata *udata, int64_t nchunk, int32_t nblock, void *params_output, int32_t typesize, int typecode) nogil: + # Declare all C variables at the beginning + cdef b2nd_array_t* out_arr + cdef b2nd_array_t* ndarr + cdef c_bool first_run + cdef int rc, p, q, r + cdef void** input_buffers = malloc(2 * sizeof(uint8_t*)) + cdef uint8_t** src = malloc(2 * sizeof(uint8_t*)) + cdef int32_t chunk_nbytes[2] + cdef int32_t chunk_cbytes[2] + cdef int32_t block_nbytes[2] + cdef int blocknitems[2] + cdef int startA, startB, expected_blocknitems + cdef blosc2_context* dctx + cdef int i, j, block_i, block_j, ncols, block_ncols, Bblock_ncols, Bncols + cdef int nchunkA = 0, nchunkB = 0, nblockA = 0, nblockB = 0, offsetA = 0, offsetB = 0, offset = 0 + out_arr = udata.array + cdef int ndim = out_arr.ndim + cdef int nchunk_ = nchunk + cdef int coord, batch, batch_, batches = 1 + + # batches = sum(strides[i]*elcoords[i]) + for i in range(ndim - 2): + batches *= out_arr.blockshape[i] + + # nchunk = sum(strides[i]*chunkcoords[i]) + for i in range(ndim - 2): + coord = nchunk_ // udata.chunks_strides[0][i] + nchunk_ = nchunk_ % udata.chunks_strides[0][i] + nchunkA += coord * udata.chunks_strides[1][i] + nchunkB += coord * udata.chunks_strides[2][i] + + ncols = udata.chunks_strides[0][ndim - 2] + Bncols = udata.chunks_strides[2][ndim - 2] + + i = nchunk_ // ncols # ncols * i + j + j = nchunk_ % ncols + chunk_startA = nchunkA + i * ncols + chunk_startB = nchunkB + j + + # nblock = sum(strides[i]*blockcoords[i]) + cdef int nblock_ = nblock + for i in range(ndim - 2): + coord = nblock_ // udata.blocks_strides[0][i] + nblock_ = nblock_ % udata.blocks_strides[0][i] + nblockA += coord * udata.blocks_strides[1][i] + nblockB += coord * udata.blocks_strides[2][i] + + block_ncols = udata.blocks_strides[0][ndim - 2] + Bblock_ncols = udata.blocks_strides[2][ndim - 2] + + block_i = nblock_ // block_ncols + block_j = nblock_ % block_ncols + block_startA = nblockA + block_i * block_ncols + block_startB = nblockB + block_j + + dctx = blosc2_create_dctx(BLOSC2_DPARAMS_DEFAULTS) + + first_run = True + nchunkA = chunk_startA + nchunkB = chunk_startB + while True: # chunk loop + for i in range(2): + chunk_idx = nchunkA if i == 0 else nchunkB + ndarr = udata.inputs[i] + ndim = ndarr.ndim + src[i] = ndarr.sc.data[chunk_idx] + rc = blosc2_cbuffer_sizes(src[i], &chunk_nbytes[i], &chunk_cbytes[i], &block_nbytes[i]) + if rc < 0: + raise ValueError("miniexpr: error getting cbuffer sizes") + if block_nbytes[i] <= 0: + raise ValueError("miniexpr: invalid block size") + if first_run: + if i == 0: + q = ndarr.blockshape[ndim - 1] + p = ndarr.blockshape[ndim - 2] + else: # i = 1 + r = ndarr.blockshape[ndim - 1] + input_buffers[i] = malloc(block_nbytes[i]) + if input_buffers[i] == NULL: + raise MemoryError("miniexpr: cannot allocate input block buffer") + blocknitems[i] = block_nbytes[i] // ndarr.sc.typesize + if i == 0: + expected_blocknitems = blocknitems[i] + elif blocknitems[i] != expected_blocknitems: + raise ValueError("miniexpr: inconsistent block element counts across inputs") + + first_run = False + nblockA = block_startA + nblockB = block_startB + while True: # block loop + startA = nblockA * blocknitems[0] + startB = nblockB * blocknitems[1] + rc = blosc2_getitem_ctx(dctx, src[0], chunk_cbytes[0], startA, blocknitems[0], + input_buffers[0], block_nbytes[0]) + if rc < 0: + raise ValueError("matmul: error decompressing the A chunk") + rc = blosc2_getitem_ctx(dctx, src[1], chunk_cbytes[1], startB, blocknitems[1], + input_buffers[1], block_nbytes[1]) + if rc < 0: + raise ValueError("matmul: error decompressing the B chunk") + batch = 0 + offsetA = 0 + offsetB = 0 + offset = 0 + while batch < batches: + batch_ = batch + for i in range(ndim - 2): + coord = batch // udata.el_strides[0][i] + batch_ = batch_ % udata.el_strides[0][i] + offsetA += coord * udata.el_strides[1][i] + offsetB += coord * udata.el_strides[2][i] + offset += coord * udata.el_strides[0][i] + if typecode == 0: + if typesize == 4: + rc = matmul_block_kernel[float](input_buffers[0] + offsetA, input_buffers[1] + offsetB, params_output + offset, p, q, r) + else: + rc = matmul_block_kernel[double](input_buffers[0] + offsetA, input_buffers[1] + offsetB, params_output + offset, p, q, r) + elif typecode == 1: + if typesize == 4: + rc = matmul_block_kernel[int32_t](input_buffers[0] + offsetA, input_buffers[1] + offsetB, params_output + offset, p, q, r) + else: + rc = matmul_block_kernel[int64_t](input_buffers[0] + offsetA, input_buffers[1] + offsetB, params_output + offset, p, q, r) + else: + with gil: + raise ValueError("Unsupported dtype") + batch += 1 + nblockA += 1 + nblockB += Bblock_ncols + if (nblockA % block_ncols == 0): + break + nchunkA += 1 + nchunkB += Bncols + if (nchunkA % ncols == 0): + break + + + blosc2_free_ctx(dctx) + # Free resources + for i in range(2): + free(input_buffers[i]) + free(input_buffers) + free(src) + + return 0 # Aux function for prefilter and postfilter udf cdef int aux_udf(udf_udata *udata, int64_t nchunk, int32_t nblock, @@ -2221,6 +2391,19 @@ cdef int miniexpr_prefilter(blosc2_prefilter_params *params): return aux_miniexpr( params.user_data, params.nchunk, params.nblock, False, params.output, params.output_typesize) +cdef int matmul_prefilter(blosc2_prefilter_params *params): + cdef int typecode + + cdef mm_udata* udata = params.user_data + cdef b2nd_array_t* out_arr = udata.array + cdef char dtype_kind = out_arr.dtype[1] + if dtype_kind == 'f': + typecode = 0 + elif dtype_kind == 'i': + typecode = 1 + else: + raise ValueError("Unsupported dtype") + return aux_matmul(udata, params.nchunk, params.nblock, params.output, params.output_typesize, typecode) cdef int general_udf_prefilter(blosc2_prefilter_params *params): cdef udf_udata *udata = params.user_data @@ -3068,6 +3251,49 @@ cdef class NDArray: return udata + cdef mm_udata *_fill_mm_udata(self, inputs): + cdef mm_udata *udata = malloc(sizeof(mm_udata)) + cdef int cstrides, bstrides, estrides + cdef b2nd_array_t* inp + cdef b2nd_array_t** inputs_ = malloc(2 * sizeof(b2nd_array_t*)) + for i in range(2): + operand = inputs['x1'] if i == 0 else inputs['x2'] + inputs_[i] = operand.c_array + inputs_[i].chunk_cache.nchunk = -1 + inputs_[i].chunk_cache.data = NULL + udata.inputs = inputs_ + udata.array = self.array + + # Save these in udf_udata to avoid computing them for each block + for i in range(3): + udata.chunks_strides[i][self.array.ndim - 1] = 1 + udata.blocks_strides[i][self.array.ndim - 1] = 1 + udata.el_strides[i][self.array.ndim - 1] = 1 + for idx in range(2, self.array.ndim + 1): + i = self.array.ndim - idx + udata.chunks_strides[0][i] = udata.chunks_strides[0][i + 1] * udata.array.extshape[i + 1] // udata.array.chunkshape[i + 1] + udata.blocks_strides[0][i] = udata.blocks_strides[0][i + 1] * udata.array.extchunkshape[i + 1] // udata.array.blockshape[i + 1] + udata.el_strides[0][i] = udata.el_strides[0][i + 1] * udata.array.blockshape[i + 1] + + for j in range(1, 3): + inp = inputs_[j - 1] + cstrides = bstrides = estrides = 1 + for idx in range(2, self.array.ndim + 1): + i = inp.ndim - idx + if inp.shape[i + 1] == 1 or i < 0: + udata.chunks_strides[j][i] = 0 + udata.blocks_strides[j][i] = 0 + udata.el_strides[j][i] = 0 + else: + bstrides *= inp.extchunkshape[i + 1] // inp.blockshape[i + 1] + cstrides *= inp.extshape[i + 1] // inp.chunkshape[i + 1] + estrides *= inp.blockshape[i + 1] + udata.chunks_strides[j][i] = cstrides + udata.blocks_strides[j][i] = bstrides + udata.el_strides[j][i] = estrides + + return udata + def _set_pref_expr(self, expression, inputs, fp_accuracy, aux_reduc=None, jit=None): # Set prefilter for miniexpr cdef blosc2_cparams* cparams = self.array.sc.storage.cparams @@ -3153,6 +3379,26 @@ cdef class NDArray: if self.array.sc.cctx == NULL: raise RuntimeError("Could not create compression context") + def _set_pref_matmul(self, inputs, fp_accuracy): + # Set prefilter for miniexpr + cdef blosc2_cparams* cparams = self.array.sc.storage.cparams + cparams.prefilter = matmul_prefilter + + cdef mm_udata* udata = self._fill_mm_udata(inputs) + cdef b2nd_array_t* out_arr = udata.array + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) + preparams.user_data = udata + preparams.output_is_disposable = False + cparams.preparams = preparams + _check_cparams(cparams) + + if self.array.sc.cctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.array.sc.cctx) + self.array.sc.cctx = blosc2_create_cctx(dereference(cparams)) + if self.array.sc.cctx == NULL: + raise RuntimeError("Could not create compression context") + def _set_pref_udf(self, func, inputs_id): if self.array.sc.storage.cparams.nthreads > 1: raise AttributeError("compress `nthreads` must be 1 when assigning a prefilter") diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 07a78247..15942b4e 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -68,6 +68,7 @@ process_key, reducers, safe_numpy_globals, + try_miniexpr, ) if not blosc2.IS_WASM: @@ -76,14 +77,6 @@ global safe_blosc2_globals safe_blosc2_globals = {} -# Set this to False if miniexpr should not be tried out -try_miniexpr = not blosc2.IS_WASM or getattr(blosc2, "_WASM_MINIEXPR_ENABLED", False) - - -def _toggle_miniexpr(FLAG): - global try_miniexpr - try_miniexpr = FLAG - def ne_evaluate(expression, local_dict=None, **kwargs): """Safely evaluate expressions using numexpr when possible, falling back to numpy.""" diff --git a/src/blosc2/linalg.py b/src/blosc2/linalg.py index b1bda5e7..36437bfa 100644 --- a/src/blosc2/linalg.py +++ b/src/blosc2/linalg.py @@ -17,13 +17,13 @@ import blosc2 -from .utils import get_intersecting_chunks, nptranspose, npvecdot, slice_to_chunktuple +from .utils import get_intersecting_chunks, nptranspose, npvecdot, slice_to_chunktuple, try_miniexpr if TYPE_CHECKING: from collections.abc import Sequence -def matmul(x1: blosc2.Array, x2: blosc2.NDArray, **kwargs: Any) -> blosc2.NDArray: +def matmul(x1: blosc2.Array, x2: blosc2.NDArray, **kwargs: Any) -> blosc2.NDArray: # noqa: C901 """ Computes the matrix product between two Blosc2 NDArrays. @@ -112,30 +112,99 @@ def matmul(x1: blosc2.Array, x2: blosc2.NDArray, **kwargs: Any) -> blosc2.NDArra kwargs["_chunksize_reduc_factor"] = 1 result = blosc2.zeros(result_shape, dtype=blosc2.result_type(x1, x2), **kwargs) + # multithreaded matmul + # TODO: handle a) type promotion, b) padding (explicitly), c) (improved) >2D + ops = (x1, x2, result) + all_ndarray = all(isinstance(value, blosc2.NDArray) and value.shape != () for value in ops) + global try_miniexpr + + # Use a local copy so we don't modify the global + use_miniexpr = try_miniexpr if 0 not in result.shape + x1.shape + x2.shape: # if any array is empty, return array of 0s - p, q = result.chunks[-2:] - r = x2.chunks[-1] - - intersecting_chunks = get_intersecting_chunks((), result.shape[:-2], result.chunks[:-2]) - for chunk in intersecting_chunks: - chunk = chunk.raw - for row in range(0, n, p): - row_end = builtins.min(row + p, n) - for col in range(0, m, q): - col_end = builtins.min(col + q, m) - for aux in range(0, k, r): - aux_end = builtins.min(aux + r, k) - bx1 = ( - x1[chunk[-x1.ndim + 2 :] + (slice(row, row_end), slice(aux, aux_end))] - if x1.ndim > 2 - else x1[row:row_end, aux:aux_end] - ) - bx2 = ( - x2[chunk[-x2.ndim + 2 :] + (slice(aux, aux_end), slice(col, col_end))] - if x2.ndim > 2 - else x2[aux:aux_end, col:col_end] - ) - result[chunk + (slice(row, row_end), slice(col, col_end))] += np.matmul(bx1, bx2) + if all_ndarray: + if any(op.dtype != ops[0].dtype for op in ops): # TODO: Remove this condition + use_miniexpr = False + + # Just force same chunk/block shapes + same_chunks = all(op.chunks == result.chunks for op in (x1, x2)) + same_blocks = all(op.blocks == result.blocks for op in (x1, x2)) + same_shape = all(op.shape == result.shape for op in (x1, x2)) + + use_miniexpr &= same_blocks & same_chunks & same_shape + + # TODO: We can relax this to even just load according to result blockshape, but that's difficult. + # Two easier cases are presented below + # Case 1: Might want to restrict loading across chunk boundaries, in which case would require: + # x1.chunks[-2] % result.blocks[-2] == 0 + # x2.chunks[-1] % result.blocks[-1] == 0 + # x2.chunks[-2] % x1.blocks[-1] == 0 + # Can then load in x1 as slices of size [result.blocks[-2], x1.blocks[-1]] + # and x2 in slices of [x1.blocks[-1], result.blocks[-1]] + + # Case 2: Slightly easier to implement this maybe + # Require that blocks are matmul compatible and broadcastable directly to result + # (M, K) x (K, N) = (M, N) + # so can load block-by-block for inputs and calculate block of output + # Also need to avoid loading across chunk boundaries + # chunks_aligned = x1.chunks[-2] % x1.blocks[-2] == 0 + # chunks_aligned &= x2.chunks[-1] % x2.blocks[-1] == 0 + # chunks_aligned &= x2.chunks[-2] % x1.blocks[-1] == 0 + # same_blocks = x2.blocks[-2] == x1.blocks[-1] + # same_blocks &= x2.blocks[-1] == result.blocks[-1] + # same_blocks &= result.blocks[-2] == x1.blocks[-2] + # try: + # result_blocks = np.broadcast_shapes(x1.blocks, x2.blocks) + # if not (same_blocks and chunks_aligned and result_blocks[:-2] == result.blocks[:-2]): + # use_miniexpr = False + # except ValueError: + # use_miniexpr = False + + use_miniexpr &= x1.dtype.kind in ("i", "f") + use_miniexpr &= x2.dtype.kind in ("i", "f") + use_miniexpr &= x1.dtype == x2.dtype + + else: + use_miniexpr = False + + if use_miniexpr: + prefilter_set = False + try: + result._set_pref_matmul({"x1": x1, "x2": x2}, fp_accuracy=blosc2.FPAccuracy.DEFAULT) + prefilter_set = True + # Data to compress is fetched from operands, so it can be uninitialized here + data = np.empty(result.schunk.chunksize, dtype=np.uint8) + for nchunk_out in range(result.schunk.nchunks): + result.schunk.update_data(nchunk_out, data, copy=False) + except Exception as e: + raise Exception from e + finally: + if prefilter_set: + result.schunk.remove_prefilter("miniexpr") + else: # couldn't do multithreading + print("multithreading failed :( ") + p, q = result.chunks[-2:] + r = x2.chunks[-1] + + intersecting_chunks = get_intersecting_chunks((), result.shape[:-2], result.chunks[:-2]) + for chunk in intersecting_chunks: + chunk = chunk.raw + for row in range(0, n, p): + row_end = builtins.min(row + p, n) + for col in range(0, m, q): + col_end = builtins.min(col + q, m) + for aux in range(0, k, r): + aux_end = builtins.min(aux + r, k) + bx1 = ( + x1[chunk[-x1.ndim + 2 :] + (slice(row, row_end), slice(aux, aux_end))] + if x1.ndim > 2 + else x1[row:row_end, aux:aux_end] + ) + bx2 = ( + x2[chunk[-x2.ndim + 2 :] + (slice(aux, aux_end), slice(col, col_end))] + if x2.ndim > 2 + else x2[aux:aux_end, col:col_end] + ) + result[chunk + (slice(row, row_end), slice(col, col_end))] += np.matmul(bx1, bx2) if x1_is_vector: result = result.squeeze(axis=-2) diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 6a72f2ef..46fc40a1 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -19,6 +19,15 @@ import blosc2 +# Set this to False if miniexpr should not be tried out +try_miniexpr = not blosc2.IS_WASM or getattr(blosc2, "_WASM_MINIEXPR_ENABLED", False) + + +def _toggle_miniexpr(FLAG): + global try_miniexpr + try_miniexpr = FLAG + + # NumPy version and a convenient boolean flag NUMPY_GE_2_0 = np.__version__ >= "2.0" # handle different numpy versions