Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions benchmarks/threading/parallel_map_single_item.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/bin/env ruby
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

# Benchmarks the single-item fast path in ElasticGraph::Support::Threading.parallel_map
# using local copies of the before/after implementations.
#
# Run with:
# bundle exec ruby benchmarks/threading/parallel_map_single_item.rb

require "benchmark/ips"

module OriginalThreadingImplementation
def self.parallel_map(items)
threads = _ = items.map do |item|
::Thread.new do
::Thread.current.report_on_exception = false

yield item
end
end

threads.map(&:value)
rescue => e
e.set_backtrace(e.backtrace + caller)
raise e
end
end

module UpdatedThreadingImplementation
def self.parallel_map(items)
return _ = items.map { |item| yield item } if items.size < 2

begin
threads = _ = items.map do |item|
::Thread.new do
::Thread.current.report_on_exception = false

yield item
end
end

threads.map(&:value)
rescue => e
e.set_backtrace(e.backtrace + caller)
raise e
end
end
end

SINGLE_ARRAY = ["a"].freeze
EMPTY_ARRAY = [].freeze
MULTI_ITEM_ARRAY = %w[a b c d].freeze
SINGLE_HASH = {"orders" => [1, 2, 3]}.freeze
EMPTY_HASH = {}.freeze
MULTI_ENTRY_HASH = {
"orders" => [1, 2, 3],
"payments" => [4, 5, 6],
"refunds" => [7, 8, 9],
"disputes" => [10, 11, 12]
}.freeze

def updated_parallel_map(items, &block)
UpdatedThreadingImplementation.parallel_map(items, &block)
end

def assert_same_result(label)
original = yield OriginalThreadingImplementation
updated = yield UpdatedThreadingImplementation

abort "#{label} produced different results: #{original.inspect} != #{updated.inspect}" unless original == updated
end

assert_same_result("single array") { |implementation| implementation.parallel_map(SINGLE_ARRAY, &:next) }
assert_same_result("empty array") { |implementation| implementation.parallel_map(EMPTY_ARRAY, &:next) }
assert_same_result("multi item array") { |implementation| implementation.parallel_map(MULTI_ITEM_ARRAY, &:next) }
Comment thread
jwils marked this conversation as resolved.
assert_same_result("single hash") { |implementation| implementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } }
assert_same_result("empty hash") { |implementation| implementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } }
assert_same_result("multi entry hash") do |implementation|
implementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] }
end

def run_ips(title)
puts
puts "=" * 80
puts title
puts "=" * 80

Benchmark.ips do |x|
x.config(time: 5, warmup: 2)
yield x
x.compare!
end
end

run_ips("single item array") do |x|
x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(SINGLE_ARRAY, &:next) }
x.report("after: fast path") { updated_parallel_map(SINGLE_ARRAY, &:next) }
end

run_ips("single entry hash, like one datastore-client msearch fanout") do |x|
x.report("before: always spawn thread") do
OriginalThreadingImplementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] }
end

x.report("after: fast path") do
updated_parallel_map(SINGLE_HASH) { |key, values| [key, values.size] }
end
end

run_ips("empty array") do |x|
x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(EMPTY_ARRAY, &:next) }
x.report("after: fast path") { updated_parallel_map(EMPTY_ARRAY, &:next) }
end

run_ips("empty hash") do |x|
x.report("before: always spawn thread") do
OriginalThreadingImplementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] }
end

x.report("after: fast path") do
updated_parallel_map(EMPTY_HASH) { |key, values| [key, values.size] }
end
end

run_ips("multi item array, expected to stay on the threaded path") do |x|
x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(MULTI_ITEM_ARRAY, &:next) }
x.report("after: current implementation") { updated_parallel_map(MULTI_ITEM_ARRAY, &:next) }
end

run_ips("multi entry hash, expected to stay on the threaded path") do |x|
x.report("before: always spawn thread") do
OriginalThreadingImplementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] }
end

x.report("after: current implementation") do
updated_parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] }
end
end

module ThreadNewCounter
def self.count
@count ||= 0
end

def self.count=(count)
@count = count
end

def new(...)
ThreadNewCounter.count += 1
super
end
end

class << Thread
prepend ThreadNewCounter
end

def count_thread_new_calls
ThreadNewCounter.count = 0
yield
ThreadNewCounter.count
end

thread_count_iterations = 1_000

puts
puts "Thread.new calls for #{thread_count_iterations} map calls"
puts "-" * 80
{
"before single array" => -> { OriginalThreadingImplementation.parallel_map(SINGLE_ARRAY, &:next) },
"after single array" => -> { updated_parallel_map(SINGLE_ARRAY, &:next) },
"before single hash" => -> { OriginalThreadingImplementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } },
"after single hash" => -> { updated_parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } },
"before empty array" => -> { OriginalThreadingImplementation.parallel_map(EMPTY_ARRAY, &:next) },
"after empty array" => -> { updated_parallel_map(EMPTY_ARRAY, &:next) },
"before empty hash" => -> { OriginalThreadingImplementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } },
"after empty hash" => -> { updated_parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } },
"before multi item array" => -> { OriginalThreadingImplementation.parallel_map(MULTI_ITEM_ARRAY, &:next) },
"after multi item array" => -> { updated_parallel_map(MULTI_ITEM_ARRAY, &:next) },
"before multi entry hash" => -> { OriginalThreadingImplementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } },
"after multi entry hash" => -> { updated_parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } }
}.each do |label, callable|
count = count_thread_new_calls do
thread_count_iterations.times { callable.call }
end

puts "#{label.ljust(28)} #{count}"
end
127 changes: 127 additions & 0 deletions benchmarks/threading/parallel_map_single_item.results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@

================================================================================
single item array
================================================================================
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
Warming up --------------------------------------
before: always spawn thread
2.938k i/100ms
after: fast path 408.510k i/100ms
Calculating -------------------------------------
before: always spawn thread
25.825k (± 6.5%) i/s (38.72 μs/i) - 129.272k in 5.027934s
after: fast path 3.993M (± 3.9%) i/s (250.43 ns/i) - 20.017M in 5.021850s

Comparison:
after: fast path: 3993083.5 i/s
before: always spawn thread: 25824.7 i/s - 154.62x slower


================================================================================
single entry hash, like one datastore-client msearch fanout
================================================================================
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
Warming up --------------------------------------
before: always spawn thread
2.668k i/100ms
after: fast path 318.552k i/100ms
Calculating -------------------------------------
before: always spawn thread
21.520k (±19.5%) i/s (46.47 μs/i) - 104.052k in 5.098624s
after: fast path 3.278M (± 5.4%) i/s (305.03 ns/i) - 16.565M in 5.069629s

Comparison:
after: fast path: 3278402.5 i/s
before: always spawn thread: 21519.8 i/s - 152.34x slower


================================================================================
empty array
================================================================================
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
Warming up --------------------------------------
before: always spawn thread
735.659k i/100ms
after: fast path 723.657k i/100ms
Calculating -------------------------------------
before: always spawn thread
7.120M (± 8.2%) i/s (140.45 ns/i) - 35.312M in 5.009859s
after: fast path 7.094M (± 4.8%) i/s (140.96 ns/i) - 35.459M in 5.011149s

Comparison:
before: always spawn thread: 7119941.7 i/s
after: fast path: 7094392.7 i/s - same-ish: difference falls within error


================================================================================
empty hash
================================================================================
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
Warming up --------------------------------------
before: always spawn thread
519.319k i/100ms
after: fast path 538.244k i/100ms
Calculating -------------------------------------
before: always spawn thread
5.077M (±10.5%) i/s (196.95 ns/i) - 25.447M in 5.094679s
after: fast path 5.290M (± 8.2%) i/s (189.05 ns/i) - 26.374M in 5.036762s

Comparison:
after: fast path: 5289538.9 i/s
before: always spawn thread: 5077366.9 i/s - same-ish: difference falls within error


================================================================================
multi item array, expected to stay on the threaded path
================================================================================
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
Warming up --------------------------------------
before: always spawn thread
734.000 i/100ms
after: current implementation
704.000 i/100ms
Calculating -------------------------------------
before: always spawn thread
7.063k (± 8.0%) i/s (141.58 μs/i) - 35.232k in 5.025075s
after: current implementation
6.363k (± 8.0%) i/s (157.16 μs/i) - 31.680k in 5.010779s

Comparison:
before: always spawn thread: 7063.1 i/s
after: current implementation: 6363.0 i/s - same-ish: difference falls within error


================================================================================
multi entry hash, expected to stay on the threaded path
================================================================================
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
Warming up --------------------------------------
before: always spawn thread
455.000 i/100ms
after: current implementation
652.000 i/100ms
Calculating -------------------------------------
before: always spawn thread
5.907k (±14.3%) i/s (169.29 μs/i) - 28.665k in 5.019039s
after: current implementation
6.306k (±11.1%) i/s (158.59 μs/i) - 31.296k in 5.042971s

Comparison:
after: current implementation: 6305.7 i/s
before: always spawn thread: 5906.9 i/s - same-ish: difference falls within error


Thread.new calls for 1000 map calls
--------------------------------------------------------------------------------
before single array 1000
after single array 0
before single hash 1000
after single hash 0
before empty array 0
after empty array 0
before empty hash 0
after empty hash 0
before multi item array 4000
after multi item array 4000
before multi entry hash 4000
after multi entry hash 4000
41 changes: 23 additions & 18 deletions elasticgraph-support/lib/elastic_graph/support/threading.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,38 @@ module ElasticGraph
module Support
# @private
module Threading
# Like Enumerable#map, but performs the map in parallel using one thread per list item.
# Like Enumerable#map, but performs the map in parallel using one thread per list item
# when there are multiple items.
# Exceptions that happen in the threads will propagate to the caller at the end.
# Due to Ruby's GVL, this will never be helpful for pure computation, but can be
# quite helpful when dealing with blocking I/O. However, the cost of threads is
# such that this method should not be used when you have a large list of items to
# map over (say, hundreds or thousands of items or more).
def self.parallel_map(items)
threads = _ = items.map do |item|
::Thread.new do
# Disable reporting of exceptions. We use `value` at the end of this method, which
# propagates any exception that happened in the thread to the calling thread. If
# this is true (the default), then the exception is also printed to $stderr which
# is quite noisy.
::Thread.current.report_on_exception = false
return _ = items.map { |item| yield item } if items.size < 2

yield item
begin
threads = _ = items.map do |item|
::Thread.new do
# Disable reporting of exceptions. We use `value` at the end of this method, which
# propagates any exception that happened in the thread to the calling thread. If
# this is true (the default), then the exception is also printed to $stderr which
# is quite noisy.
::Thread.current.report_on_exception = false

yield item
end
end
end

# `value` here either returns the value of the final expression in the thread, or raises
# whatever exception happened in the thread. `join` doesn't propagate the exception in
# the same way, so we always want to use `Thread#value` even if we are just using threads
# for side effects.
threads.map(&:value)
rescue => e
e.set_backtrace(e.backtrace + caller)
raise e
# `value` here either returns the value of the final expression in the thread, or raises
# whatever exception happened in the thread. `join` doesn't propagate the exception in
# the same way, so we always want to use `Thread#value` even if we are just using threads
# for side effects.
threads.map(&:value)
rescue => e
e.set_backtrace(e.backtrace + caller)
raise e
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module ElasticGraph
module Threading
def self.parallel_map:
[K, V, O] (::Hash[K, V]) { ([K, V]) -> O } -> ::Array[O]
| [I, O] (::Enumerable[I]) { (I) -> O } -> ::Array[O]
| [I, O] (::Array[I]) { (I) -> O } -> ::Array[O]
end
end
end
Loading
Loading