-
Notifications
You must be signed in to change notification settings - Fork 28
Avoid thread fanout for single-item parallel maps #1160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,195 @@ | ||
| #!/usr/bin/env ruby | ||
| # Copyright 2024 - 2026 Block, Inc. | ||
| # | ||
| # Use of this source code is governed by an MIT-style | ||
| # license that can be found in the LICENSE file or at | ||
| # https://opensource.org/licenses/MIT. | ||
| # | ||
| # frozen_string_literal: true | ||
|
|
||
| # Benchmarks the single-item fast path in ElasticGraph::Support::Threading.parallel_map | ||
| # using local copies of the before/after implementations. | ||
| # | ||
| # Run with: | ||
| # bundle exec ruby benchmarks/threading/parallel_map_single_item.rb | ||
|
|
||
| require "benchmark/ips" | ||
|
|
||
| module OriginalThreadingImplementation | ||
| def self.parallel_map(items) | ||
| threads = _ = items.map do |item| | ||
| ::Thread.new do | ||
| ::Thread.current.report_on_exception = false | ||
|
|
||
| yield item | ||
| end | ||
| end | ||
|
|
||
| threads.map(&:value) | ||
| rescue => e | ||
| e.set_backtrace(e.backtrace + caller) | ||
| raise e | ||
| end | ||
| end | ||
|
|
||
| module UpdatedThreadingImplementation | ||
| def self.parallel_map(items) | ||
| return _ = items.map { |item| yield item } if items.size < 2 | ||
|
|
||
| begin | ||
| threads = _ = items.map do |item| | ||
| ::Thread.new do | ||
| ::Thread.current.report_on_exception = false | ||
|
|
||
| yield item | ||
| end | ||
| end | ||
|
|
||
| threads.map(&:value) | ||
| rescue => e | ||
| e.set_backtrace(e.backtrace + caller) | ||
| raise e | ||
| end | ||
| end | ||
| end | ||
|
|
||
| SINGLE_ARRAY = ["a"].freeze | ||
| EMPTY_ARRAY = [].freeze | ||
| MULTI_ITEM_ARRAY = %w[a b c d].freeze | ||
| SINGLE_HASH = {"orders" => [1, 2, 3]}.freeze | ||
| EMPTY_HASH = {}.freeze | ||
| MULTI_ENTRY_HASH = { | ||
| "orders" => [1, 2, 3], | ||
| "payments" => [4, 5, 6], | ||
| "refunds" => [7, 8, 9], | ||
| "disputes" => [10, 11, 12] | ||
| }.freeze | ||
|
|
||
| def updated_parallel_map(items, &block) | ||
| UpdatedThreadingImplementation.parallel_map(items, &block) | ||
| end | ||
|
|
||
| def assert_same_result(label) | ||
| original = yield OriginalThreadingImplementation | ||
| updated = yield UpdatedThreadingImplementation | ||
|
|
||
| abort "#{label} produced different results: #{original.inspect} != #{updated.inspect}" unless original == updated | ||
| end | ||
|
|
||
| assert_same_result("single array") { |implementation| implementation.parallel_map(SINGLE_ARRAY, &:next) } | ||
| assert_same_result("empty array") { |implementation| implementation.parallel_map(EMPTY_ARRAY, &:next) } | ||
| assert_same_result("multi item array") { |implementation| implementation.parallel_map(MULTI_ITEM_ARRAY, &:next) } | ||
| assert_same_result("single hash") { |implementation| implementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } } | ||
| assert_same_result("empty hash") { |implementation| implementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } } | ||
| assert_same_result("multi entry hash") do |implementation| | ||
| implementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } | ||
| end | ||
|
|
||
| def run_ips(title) | ||
| puts | ||
| puts "=" * 80 | ||
| puts title | ||
| puts "=" * 80 | ||
|
|
||
| Benchmark.ips do |x| | ||
| x.config(time: 5, warmup: 2) | ||
| yield x | ||
| x.compare! | ||
| end | ||
| end | ||
|
|
||
| run_ips("single item array") do |x| | ||
| x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(SINGLE_ARRAY, &:next) } | ||
| x.report("after: fast path") { updated_parallel_map(SINGLE_ARRAY, &:next) } | ||
| end | ||
|
|
||
| run_ips("single entry hash, like one datastore-client msearch fanout") do |x| | ||
| x.report("before: always spawn thread") do | ||
| OriginalThreadingImplementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } | ||
| end | ||
|
|
||
| x.report("after: fast path") do | ||
| updated_parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } | ||
| end | ||
| end | ||
|
|
||
| run_ips("empty array") do |x| | ||
| x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(EMPTY_ARRAY, &:next) } | ||
| x.report("after: fast path") { updated_parallel_map(EMPTY_ARRAY, &:next) } | ||
| end | ||
|
|
||
| run_ips("empty hash") do |x| | ||
| x.report("before: always spawn thread") do | ||
| OriginalThreadingImplementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } | ||
| end | ||
|
|
||
| x.report("after: fast path") do | ||
| updated_parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } | ||
| end | ||
| end | ||
|
|
||
| run_ips("multi item array, expected to stay on the threaded path") do |x| | ||
| x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(MULTI_ITEM_ARRAY, &:next) } | ||
| x.report("after: current implementation") { updated_parallel_map(MULTI_ITEM_ARRAY, &:next) } | ||
| end | ||
|
|
||
| run_ips("multi entry hash, expected to stay on the threaded path") do |x| | ||
| x.report("before: always spawn thread") do | ||
| OriginalThreadingImplementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } | ||
| end | ||
|
|
||
| x.report("after: current implementation") do | ||
| updated_parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } | ||
| end | ||
| end | ||
|
|
||
| module ThreadNewCounter | ||
| def self.count | ||
| @count ||= 0 | ||
| end | ||
|
|
||
| def self.count=(count) | ||
| @count = count | ||
| end | ||
|
|
||
| def new(...) | ||
| ThreadNewCounter.count += 1 | ||
| super | ||
| end | ||
| end | ||
|
|
||
| class << Thread | ||
| prepend ThreadNewCounter | ||
| end | ||
|
|
||
| def count_thread_new_calls | ||
| ThreadNewCounter.count = 0 | ||
| yield | ||
| ThreadNewCounter.count | ||
| end | ||
|
|
||
| thread_count_iterations = 1_000 | ||
|
|
||
| puts | ||
| puts "Thread.new calls for #{thread_count_iterations} map calls" | ||
| puts "-" * 80 | ||
| { | ||
| "before single array" => -> { OriginalThreadingImplementation.parallel_map(SINGLE_ARRAY, &:next) }, | ||
| "after single array" => -> { updated_parallel_map(SINGLE_ARRAY, &:next) }, | ||
| "before single hash" => -> { OriginalThreadingImplementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } }, | ||
| "after single hash" => -> { updated_parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } }, | ||
| "before empty array" => -> { OriginalThreadingImplementation.parallel_map(EMPTY_ARRAY, &:next) }, | ||
| "after empty array" => -> { updated_parallel_map(EMPTY_ARRAY, &:next) }, | ||
| "before empty hash" => -> { OriginalThreadingImplementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } }, | ||
| "after empty hash" => -> { updated_parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } }, | ||
| "before multi item array" => -> { OriginalThreadingImplementation.parallel_map(MULTI_ITEM_ARRAY, &:next) }, | ||
| "after multi item array" => -> { updated_parallel_map(MULTI_ITEM_ARRAY, &:next) }, | ||
| "before multi entry hash" => -> { OriginalThreadingImplementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } }, | ||
| "after multi entry hash" => -> { updated_parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } } | ||
| }.each do |label, callable| | ||
| count = count_thread_new_calls do | ||
| thread_count_iterations.times { callable.call } | ||
| end | ||
|
|
||
| puts "#{label.ljust(28)} #{count}" | ||
| end | ||
127 changes: 127 additions & 0 deletions
127
benchmarks/threading/parallel_map_single_item.results.txt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
|
|
||
| ================================================================================ | ||
| single item array | ||
| ================================================================================ | ||
| ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] | ||
| Warming up -------------------------------------- | ||
| before: always spawn thread | ||
| 2.938k i/100ms | ||
| after: fast path 408.510k i/100ms | ||
| Calculating ------------------------------------- | ||
| before: always spawn thread | ||
| 25.825k (± 6.5%) i/s (38.72 μs/i) - 129.272k in 5.027934s | ||
| after: fast path 3.993M (± 3.9%) i/s (250.43 ns/i) - 20.017M in 5.021850s | ||
|
|
||
| Comparison: | ||
| after: fast path: 3993083.5 i/s | ||
| before: always spawn thread: 25824.7 i/s - 154.62x slower | ||
|
|
||
|
|
||
| ================================================================================ | ||
| single entry hash, like one datastore-client msearch fanout | ||
| ================================================================================ | ||
| ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] | ||
| Warming up -------------------------------------- | ||
| before: always spawn thread | ||
| 2.668k i/100ms | ||
| after: fast path 318.552k i/100ms | ||
| Calculating ------------------------------------- | ||
| before: always spawn thread | ||
| 21.520k (±19.5%) i/s (46.47 μs/i) - 104.052k in 5.098624s | ||
| after: fast path 3.278M (± 5.4%) i/s (305.03 ns/i) - 16.565M in 5.069629s | ||
|
|
||
| Comparison: | ||
| after: fast path: 3278402.5 i/s | ||
| before: always spawn thread: 21519.8 i/s - 152.34x slower | ||
|
|
||
|
|
||
| ================================================================================ | ||
| empty array | ||
| ================================================================================ | ||
| ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] | ||
| Warming up -------------------------------------- | ||
| before: always spawn thread | ||
| 735.659k i/100ms | ||
| after: fast path 723.657k i/100ms | ||
| Calculating ------------------------------------- | ||
| before: always spawn thread | ||
| 7.120M (± 8.2%) i/s (140.45 ns/i) - 35.312M in 5.009859s | ||
| after: fast path 7.094M (± 4.8%) i/s (140.96 ns/i) - 35.459M in 5.011149s | ||
|
|
||
| Comparison: | ||
| before: always spawn thread: 7119941.7 i/s | ||
| after: fast path: 7094392.7 i/s - same-ish: difference falls within error | ||
|
|
||
|
|
||
| ================================================================================ | ||
| empty hash | ||
| ================================================================================ | ||
| ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] | ||
| Warming up -------------------------------------- | ||
| before: always spawn thread | ||
| 519.319k i/100ms | ||
| after: fast path 538.244k i/100ms | ||
| Calculating ------------------------------------- | ||
| before: always spawn thread | ||
| 5.077M (±10.5%) i/s (196.95 ns/i) - 25.447M in 5.094679s | ||
| after: fast path 5.290M (± 8.2%) i/s (189.05 ns/i) - 26.374M in 5.036762s | ||
|
|
||
| Comparison: | ||
| after: fast path: 5289538.9 i/s | ||
| before: always spawn thread: 5077366.9 i/s - same-ish: difference falls within error | ||
|
|
||
|
|
||
| ================================================================================ | ||
| multi item array, expected to stay on the threaded path | ||
| ================================================================================ | ||
| ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] | ||
| Warming up -------------------------------------- | ||
| before: always spawn thread | ||
| 734.000 i/100ms | ||
| after: current implementation | ||
| 704.000 i/100ms | ||
| Calculating ------------------------------------- | ||
| before: always spawn thread | ||
| 7.063k (± 8.0%) i/s (141.58 μs/i) - 35.232k in 5.025075s | ||
| after: current implementation | ||
| 6.363k (± 8.0%) i/s (157.16 μs/i) - 31.680k in 5.010779s | ||
|
|
||
| Comparison: | ||
| before: always spawn thread: 7063.1 i/s | ||
| after: current implementation: 6363.0 i/s - same-ish: difference falls within error | ||
|
|
||
|
|
||
| ================================================================================ | ||
| multi entry hash, expected to stay on the threaded path | ||
| ================================================================================ | ||
| ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20] | ||
| Warming up -------------------------------------- | ||
| before: always spawn thread | ||
| 455.000 i/100ms | ||
| after: current implementation | ||
| 652.000 i/100ms | ||
| Calculating ------------------------------------- | ||
| before: always spawn thread | ||
| 5.907k (±14.3%) i/s (169.29 μs/i) - 28.665k in 5.019039s | ||
| after: current implementation | ||
| 6.306k (±11.1%) i/s (158.59 μs/i) - 31.296k in 5.042971s | ||
|
|
||
| Comparison: | ||
| after: current implementation: 6305.7 i/s | ||
| before: always spawn thread: 5906.9 i/s - same-ish: difference falls within error | ||
|
|
||
|
|
||
| Thread.new calls for 1000 map calls | ||
| -------------------------------------------------------------------------------- | ||
| before single array 1000 | ||
| after single array 0 | ||
| before single hash 1000 | ||
| after single hash 0 | ||
| before empty array 0 | ||
| after empty array 0 | ||
| before empty hash 0 | ||
| after empty hash 0 | ||
| before multi item array 4000 | ||
| after multi item array 4000 | ||
| before multi entry hash 4000 | ||
| after multi entry hash 4000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.