From c9d763f48f86931b91051eea004107ece1426f61 Mon Sep 17 00:00:00 2001 From: Joshua Wilson Date: Thu, 30 Apr 2026 09:06:23 -0500 Subject: [PATCH] Avoid thread fanout for single-item parallel maps --- .../threading/parallel_map_single_item.rb | 195 ++++++++++++++++++ .../parallel_map_single_item.results.txt | 127 ++++++++++++ .../lib/elastic_graph/support/threading.rb | 41 ++-- .../sig/elastic_graph/support/threading.rbs | 2 +- .../elastic_graph/support/threading_spec.rb | 50 +++++ 5 files changed, 396 insertions(+), 19 deletions(-) create mode 100644 benchmarks/threading/parallel_map_single_item.rb create mode 100644 benchmarks/threading/parallel_map_single_item.results.txt diff --git a/benchmarks/threading/parallel_map_single_item.rb b/benchmarks/threading/parallel_map_single_item.rb new file mode 100644 index 000000000..02f78f246 --- /dev/null +++ b/benchmarks/threading/parallel_map_single_item.rb @@ -0,0 +1,195 @@ +#!/usr/bin/env ruby +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +# Benchmarks the single-item fast path in ElasticGraph::Support::Threading.parallel_map +# using local copies of the before/after implementations. +# +# Run with: +# bundle exec ruby benchmarks/threading/parallel_map_single_item.rb + +require "benchmark/ips" + +module OriginalThreadingImplementation + def self.parallel_map(items) + threads = _ = items.map do |item| + ::Thread.new do + ::Thread.current.report_on_exception = false + + yield item + end + end + + threads.map(&:value) + rescue => e + e.set_backtrace(e.backtrace + caller) + raise e + end +end + +module UpdatedThreadingImplementation + def self.parallel_map(items) + return _ = items.map { |item| yield item } if items.size < 2 + + begin + threads = _ = items.map do |item| + ::Thread.new do + ::Thread.current.report_on_exception = false + + yield item + end + end + + threads.map(&:value) + rescue => e + e.set_backtrace(e.backtrace + caller) + raise e + end + end +end + +SINGLE_ARRAY = ["a"].freeze +EMPTY_ARRAY = [].freeze +MULTI_ITEM_ARRAY = %w[a b c d].freeze +SINGLE_HASH = {"orders" => [1, 2, 3]}.freeze +EMPTY_HASH = {}.freeze +MULTI_ENTRY_HASH = { + "orders" => [1, 2, 3], + "payments" => [4, 5, 6], + "refunds" => [7, 8, 9], + "disputes" => [10, 11, 12] +}.freeze + +def updated_parallel_map(items, &block) + UpdatedThreadingImplementation.parallel_map(items, &block) +end + +def assert_same_result(label) + original = yield OriginalThreadingImplementation + updated = yield UpdatedThreadingImplementation + + abort "#{label} produced different results: #{original.inspect} != #{updated.inspect}" unless original == updated +end + +assert_same_result("single array") { |implementation| implementation.parallel_map(SINGLE_ARRAY, &:next) } +assert_same_result("empty array") { |implementation| implementation.parallel_map(EMPTY_ARRAY, &:next) } +assert_same_result("multi item array") { |implementation| implementation.parallel_map(MULTI_ITEM_ARRAY, &:next) } +assert_same_result("single hash") { |implementation| implementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } } +assert_same_result("empty hash") { |implementation| implementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } } +assert_same_result("multi entry hash") do |implementation| + implementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } +end + +def run_ips(title) + puts + puts "=" * 80 + puts title + puts "=" * 80 + + Benchmark.ips do |x| + x.config(time: 5, warmup: 2) + yield x + x.compare! + end +end + +run_ips("single item array") do |x| + x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(SINGLE_ARRAY, &:next) } + x.report("after: fast path") { updated_parallel_map(SINGLE_ARRAY, &:next) } +end + +run_ips("single entry hash, like one datastore-client msearch fanout") do |x| + x.report("before: always spawn thread") do + OriginalThreadingImplementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } + end + + x.report("after: fast path") do + updated_parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } + end +end + +run_ips("empty array") do |x| + x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(EMPTY_ARRAY, &:next) } + x.report("after: fast path") { updated_parallel_map(EMPTY_ARRAY, &:next) } +end + +run_ips("empty hash") do |x| + x.report("before: always spawn thread") do + OriginalThreadingImplementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } + end + + x.report("after: fast path") do + updated_parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } + end +end + +run_ips("multi item array, expected to stay on the threaded path") do |x| + x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(MULTI_ITEM_ARRAY, &:next) } + x.report("after: current implementation") { updated_parallel_map(MULTI_ITEM_ARRAY, &:next) } +end + +run_ips("multi entry hash, expected to stay on the threaded path") do |x| + x.report("before: always spawn thread") do + OriginalThreadingImplementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } + end + + x.report("after: current implementation") do + updated_parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } + end +end + +module ThreadNewCounter + def self.count + @count ||= 0 + end + + def self.count=(count) + @count = count + end + + def new(...) + ThreadNewCounter.count += 1 + super + end +end + +class << Thread + prepend ThreadNewCounter +end + +def count_thread_new_calls + ThreadNewCounter.count = 0 + yield + ThreadNewCounter.count +end + +thread_count_iterations = 1_000 + +puts +puts "Thread.new calls for #{thread_count_iterations} map calls" +puts "-" * 80 +{ + "before single array" => -> { OriginalThreadingImplementation.parallel_map(SINGLE_ARRAY, &:next) }, + "after single array" => -> { updated_parallel_map(SINGLE_ARRAY, &:next) }, + "before single hash" => -> { OriginalThreadingImplementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } }, + "after single hash" => -> { updated_parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } }, + "before empty array" => -> { OriginalThreadingImplementation.parallel_map(EMPTY_ARRAY, &:next) }, + "after empty array" => -> { updated_parallel_map(EMPTY_ARRAY, &:next) }, + "before empty hash" => -> { OriginalThreadingImplementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } }, + "after empty hash" => -> { updated_parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } }, + "before multi item array" => -> { OriginalThreadingImplementation.parallel_map(MULTI_ITEM_ARRAY, &:next) }, + "after multi item array" => -> { updated_parallel_map(MULTI_ITEM_ARRAY, &:next) }, + "before multi entry hash" => -> { OriginalThreadingImplementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } }, + "after multi entry hash" => -> { updated_parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } } +}.each do |label, callable| + count = count_thread_new_calls do + thread_count_iterations.times { callable.call } + end + + puts "#{label.ljust(28)} #{count}" +end diff --git a/benchmarks/threading/parallel_map_single_item.results.txt b/benchmarks/threading/parallel_map_single_item.results.txt new file mode 100644 index 000000000..b38df2561 --- /dev/null +++ b/benchmarks/threading/parallel_map_single_item.results.txt @@ -0,0 +1,127 @@ + +================================================================================ +single item array +================================================================================ +ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] +Warming up -------------------------------------- +before: always spawn thread + 2.938k i/100ms + after: fast path 408.510k i/100ms +Calculating ------------------------------------- +before: always spawn thread + 25.825k (± 6.5%) i/s (38.72 μs/i) - 129.272k in 5.027934s + after: fast path 3.993M (± 3.9%) i/s (250.43 ns/i) - 20.017M in 5.021850s + +Comparison: + after: fast path: 3993083.5 i/s +before: always spawn thread: 25824.7 i/s - 154.62x slower + + +================================================================================ +single entry hash, like one datastore-client msearch fanout +================================================================================ +ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] +Warming up -------------------------------------- +before: always spawn thread + 2.668k i/100ms + after: fast path 318.552k i/100ms +Calculating ------------------------------------- +before: always spawn thread + 21.520k (±19.5%) i/s (46.47 μs/i) - 104.052k in 5.098624s + after: fast path 3.278M (± 5.4%) i/s (305.03 ns/i) - 16.565M in 5.069629s + +Comparison: + after: fast path: 3278402.5 i/s +before: always spawn thread: 21519.8 i/s - 152.34x slower + + +================================================================================ +empty array +================================================================================ +ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] +Warming up -------------------------------------- +before: always spawn thread + 735.659k i/100ms + after: fast path 723.657k i/100ms +Calculating ------------------------------------- +before: always spawn thread + 7.120M (± 8.2%) i/s (140.45 ns/i) - 35.312M in 5.009859s + after: fast path 7.094M (± 4.8%) i/s (140.96 ns/i) - 35.459M in 5.011149s + +Comparison: +before: always spawn thread: 7119941.7 i/s + after: fast path: 7094392.7 i/s - same-ish: difference falls within error + + +================================================================================ +empty hash +================================================================================ +ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] +Warming up -------------------------------------- +before: always spawn thread + 519.319k i/100ms + after: fast path 538.244k i/100ms +Calculating ------------------------------------- +before: always spawn thread + 5.077M (±10.5%) i/s (196.95 ns/i) - 25.447M in 5.094679s + after: fast path 5.290M (± 8.2%) i/s (189.05 ns/i) - 26.374M in 5.036762s + +Comparison: + after: fast path: 5289538.9 i/s +before: always spawn thread: 5077366.9 i/s - same-ish: difference falls within error + + +================================================================================ +multi item array, expected to stay on the threaded path +================================================================================ +ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] +Warming up -------------------------------------- +before: always spawn thread + 734.000 i/100ms +after: current implementation + 704.000 i/100ms +Calculating ------------------------------------- +before: always spawn thread + 7.063k (± 8.0%) i/s (141.58 μs/i) - 35.232k in 5.025075s +after: current implementation + 6.363k (± 8.0%) i/s (157.16 μs/i) - 31.680k in 5.010779s + +Comparison: +before: always spawn thread: 7063.1 i/s +after: current implementation: 6363.0 i/s - same-ish: difference falls within error + + +================================================================================ +multi entry hash, expected to stay on the threaded path +================================================================================ +ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] +Warming up -------------------------------------- +before: always spawn thread + 455.000 i/100ms +after: current implementation + 652.000 i/100ms +Calculating ------------------------------------- +before: always spawn thread + 5.907k (±14.3%) i/s (169.29 μs/i) - 28.665k in 5.019039s +after: current implementation + 6.306k (±11.1%) i/s (158.59 μs/i) - 31.296k in 5.042971s + +Comparison: +after: current implementation: 6305.7 i/s +before: always spawn thread: 5906.9 i/s - same-ish: difference falls within error + + +Thread.new calls for 1000 map calls +-------------------------------------------------------------------------------- +before single array 1000 +after single array 0 +before single hash 1000 +after single hash 0 +before empty array 0 +after empty array 0 +before empty hash 0 +after empty hash 0 +before multi item array 4000 +after multi item array 4000 +before multi entry hash 4000 +after multi entry hash 4000 diff --git a/elasticgraph-support/lib/elastic_graph/support/threading.rb b/elasticgraph-support/lib/elastic_graph/support/threading.rb index f20e0599a..2bdec1e2e 100644 --- a/elasticgraph-support/lib/elastic_graph/support/threading.rb +++ b/elasticgraph-support/lib/elastic_graph/support/threading.rb @@ -10,33 +10,38 @@ module ElasticGraph module Support # @private module Threading - # Like Enumerable#map, but performs the map in parallel using one thread per list item. + # Like Enumerable#map, but performs the map in parallel using one thread per list item + # when there are multiple items. # Exceptions that happen in the threads will propagate to the caller at the end. # Due to Ruby's GVL, this will never be helpful for pure computation, but can be # quite helpful when dealing with blocking I/O. However, the cost of threads is # such that this method should not be used when you have a large list of items to # map over (say, hundreds or thousands of items or more). def self.parallel_map(items) - threads = _ = items.map do |item| - ::Thread.new do - # Disable reporting of exceptions. We use `value` at the end of this method, which - # propagates any exception that happened in the thread to the calling thread. If - # this is true (the default), then the exception is also printed to $stderr which - # is quite noisy. - ::Thread.current.report_on_exception = false + return _ = items.map { |item| yield item } if items.size < 2 - yield item + begin + threads = _ = items.map do |item| + ::Thread.new do + # Disable reporting of exceptions. We use `value` at the end of this method, which + # propagates any exception that happened in the thread to the calling thread. If + # this is true (the default), then the exception is also printed to $stderr which + # is quite noisy. + ::Thread.current.report_on_exception = false + + yield item + end end - end - # `value` here either returns the value of the final expression in the thread, or raises - # whatever exception happened in the thread. `join` doesn't propagate the exception in - # the same way, so we always want to use `Thread#value` even if we are just using threads - # for side effects. - threads.map(&:value) - rescue => e - e.set_backtrace(e.backtrace + caller) - raise e + # `value` here either returns the value of the final expression in the thread, or raises + # whatever exception happened in the thread. `join` doesn't propagate the exception in + # the same way, so we always want to use `Thread#value` even if we are just using threads + # for side effects. + threads.map(&:value) + rescue => e + e.set_backtrace(e.backtrace + caller) + raise e + end end end end diff --git a/elasticgraph-support/sig/elastic_graph/support/threading.rbs b/elasticgraph-support/sig/elastic_graph/support/threading.rbs index 39ceb7cea..e47fd57ba 100644 --- a/elasticgraph-support/sig/elastic_graph/support/threading.rbs +++ b/elasticgraph-support/sig/elastic_graph/support/threading.rbs @@ -3,7 +3,7 @@ module ElasticGraph module Threading def self.parallel_map: [K, V, O] (::Hash[K, V]) { ([K, V]) -> O } -> ::Array[O] - | [I, O] (::Enumerable[I]) { (I) -> O } -> ::Array[O] + | [I, O] (::Array[I]) { (I) -> O } -> ::Array[O] end end end diff --git a/elasticgraph-support/spec/unit/elastic_graph/support/threading_spec.rb b/elasticgraph-support/spec/unit/elastic_graph/support/threading_spec.rb index a57bee5ac..abd5eab0a 100644 --- a/elasticgraph-support/spec/unit/elastic_graph/support/threading_spec.rb +++ b/elasticgraph-support/spec/unit/elastic_graph/support/threading_spec.rb @@ -17,6 +17,56 @@ module Support expect(result).to eq %w[b c d] end + it "does not spawn a thread when there is only one item to map" do + calling_thread = Thread.current + + result = Threading.parallel_map(["a"]) do |value| + expect(Thread.current).to be(calling_thread) + value.next + end + + expect(result).to eq ["b"] + end + + it "preserves hash entry destructuring when there is only one item to map" do + calling_thread = Thread.current + + result = Threading.parallel_map({"a" => 1}) do |key, value| + expect(Thread.current).to be(calling_thread) + [key.next, value.next] + end + + expect(result).to eq [["b", 2]] + end + + it "preserves the backtrace on exceptions when mapping over one item" do + exception1 = ["a"].map { raise it } rescue $! # standard:disable Style/RescueModifier + exception2 = Threading.parallel_map(["a"]) { raise it } rescue $! # standard:disable Style/RescueModifier + + spec_file = File.basename(__FILE__) + + suffix1, suffix2 = [exception1, exception2].map do |ex| + ex.backtrace + .reject { |frame| frame.include?(spec_file) || frame.include?("threading.rb") } + .join("\n") + end + + # Threading.parallel_map updates exception backtraces normally. + # Here we confirm it has not changed backtraces for the non-threaded fast path. + expect(suffix2).to eq(suffix1) + end + + it "uses threads when mapping over multiple hash entries" do + calling_thread = Thread.current + + result = Threading.parallel_map({"a" => 1, "b" => 2}) do |key, value| + expect(Thread.current).not_to be(calling_thread) + [key.next, value.next] + end + + expect(result).to eq [["b", 2], ["c", 3]] + end + it "propagates exceptions to the calling thread properly, even preserving the calling thread's stacktrace in the exception" do expected_trace_frames = caller