Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package datadog.trace.util;

import static java.util.concurrent.TimeUnit.MICROSECONDS;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

/**
* Compares {@link ConcurrentHashtable.D2} against {@link ConcurrentHashMap} and {@link
* ConcurrentSkipListMap} for shared, concurrent composite-key lookups.
*
* <p>The table is shared across all threads ({@link Scope#Benchmark}) and pre-populated before the
* measurement iteration — modelling the steady-state read-mostly pattern that the tracer uses (a
* per-class or per-method instrumentation cache consulted on every invocation).
*
* <ul>
* <li><b>get</b> — pure read: D2.get(k1, k2) vs CHM.get(new Key2(k1, k2)). D2 sidesteps the
* composite key allocation entirely; CHM.get does not store the key, but the allocation still
* happens before the call.
* <li><b>getOrCreate (hit)</b> — the dominant call-site pattern: try to fetch an existing entry,
* create only on first access. On subsequent calls D2 takes the lock-free fast path (same as
* get); CHM.computeIfAbsent with a get-first pattern avoids the lambda capture allocation on
* hits, but still allocates the composite key.
* </ul>
*
* <p>ConcurrentSkipListMap is included as a second baseline: it is entirely lock-free for reads
* (CAS-based) but pays for tree traversal and Comparable overhead on every operation.
*/
@Fork(2)
@Warmup(iterations = 2)
@Measurement(iterations = 3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(MICROSECONDS)
@Threads(8)
public class ConcurrentHashtableD2Benchmark {

static final int N_KEYS = 64;
static final int CAPACITY = 128;

static final String[] SOURCE_K1 = new String[N_KEYS];
static final Integer[] SOURCE_K2 = new Integer[N_KEYS];

static {
for (int i = 0; i < N_KEYS; ++i) {
SOURCE_K1[i] = "key-" + i;
SOURCE_K2[i] = i * 31 + 17;
}
}

static final class D2Entry extends Hashtable.D2.Entry<String, Integer> {
final long value;

D2Entry(String k1, Integer k2) {
super(k1, k2);
this.value = 1L;
}
}

/** Composite key for ConcurrentHashMap and ConcurrentSkipListMap baselines. */
static final class Key2 implements Comparable<Key2> {
final String k1;
final Integer k2;
final int hash;

Key2(String k1, Integer k2) {
this.k1 = k1;
this.k2 = k2;
this.hash = Objects.hash(k1, k2);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Key2)) {
return false;
}
Key2 other = (Key2) o;
return Objects.equals(k1, other.k1) && Objects.equals(k2, other.k2);
}

@Override
public int hashCode() {
return hash;
}

@Override
public int compareTo(Key2 other) {
int c = k1.compareTo(other.k1);
return c != 0 ? c : k2.compareTo(other.k2);
}
}

/**
* Shared state ({@link Scope#Benchmark}): one table instance across all threads, modelling a
* shared instrumentation cache.
*/
@State(Scope.Benchmark)
public static class SharedState {
ConcurrentHashtable.D2<String, Integer, D2Entry> table;
ConcurrentHashMap<Key2, Long> concurrentHashMap;
ConcurrentSkipListMap<Key2, Long> skipListMap;

@Setup(Level.Iteration)
public void setUp() {
table = new ConcurrentHashtable.D2<>(CAPACITY);
concurrentHashMap = new ConcurrentHashMap<>(CAPACITY);
skipListMap = new ConcurrentSkipListMap<>();
for (int i = 0; i < N_KEYS; ++i) {
table.getOrCreate(SOURCE_K1[i], SOURCE_K2[i], D2Entry::new);
Key2 key = new Key2(SOURCE_K1[i], SOURCE_K2[i]);
concurrentHashMap.put(key, (long) i);
skipListMap.put(key, (long) i);
}
}
}

/** Per-thread cursor so each thread cycles through keys independently. */
@State(Scope.Thread)
public static class ThreadState {
int cursor;

int next() {
int i = cursor;
cursor = (i + 1) & (N_KEYS - 1);
return i;
}
}

@Benchmark
public D2Entry get_concurrentHashtable(SharedState s, ThreadState t) {
int i = t.next();
return s.table.get(SOURCE_K1[i], SOURCE_K2[i]);
}

@Benchmark
public Long get_concurrentHashMap(SharedState s, ThreadState t) {
int i = t.next();
return s.concurrentHashMap.get(new Key2(SOURCE_K1[i], SOURCE_K2[i]));
}

@Benchmark
public Long get_concurrentSkipListMap(SharedState s, ThreadState t) {
int i = t.next();
return s.skipListMap.get(new Key2(SOURCE_K1[i], SOURCE_K2[i]));
}

@Benchmark
public D2Entry getOrCreate_concurrentHashtable(SharedState s, ThreadState t) {
int i = t.next();
return s.table.getOrCreate(SOURCE_K1[i], SOURCE_K2[i], D2Entry::new);
}

/**
* get-first pattern for CHM to avoid capturing-lambda allocation on hits — the idiomatic
* equivalent of D2.getOrCreate on a mostly-populated table.
*/
@Benchmark
public Long getOrCreate_concurrentHashMap(SharedState s, ThreadState t) {
int i = t.next();
Key2 key = new Key2(SOURCE_K1[i], SOURCE_K2[i]);
Long existing = s.concurrentHashMap.get(key);
if (existing != null) {
return existing;
}
return s.concurrentHashMap.computeIfAbsent(key, k -> 0L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package datadog.trace.util;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Concurrent counterpart to {@link Hashtable}. Provides lock-free reads and locked writes for
* {@link D1} (single-key) and {@link D2} (composite-key) tables.
*
* <p>Like {@link Hashtable}, capacity is fixed at construction and the table does not resize.
* Unlike {@link Hashtable}, all operations are safe for concurrent access without external
* synchronization.
*
* <p>The primary advantage over {@link java.util.concurrent.ConcurrentHashMap} for composite-key
* use cases is that {@link D2#get(Object, Object)} and {@link D2#getOrCreate(Object, Object,
* BiFunction)} accept key parts directly — no composite key object is allocated for the lookup.
* {@code ConcurrentHashMap} requires a wrapper object whose ownership may transfer to the map on
* insert; escape analysis must conservatively assume the key escapes even on hit paths, preventing
* scalar replacement.
*
* <p><b>Memory model.</b> Bucket slots are held in an {@link AtomicReferenceArray}, so each {@link
* #get} begins with a volatile read of the slot. Entries are inserted at the bucket head: the new
* entry's {@code next} pointer is set before the volatile slot write, so any subsequent volatile
* read of that slot carries happens-before over the full chain — chain {@code next} fields do not
* need to be volatile.
*/
public final class ConcurrentHashtable {
private ConcurrentHashtable() {}

/**
* Single-key concurrent hash table. Lock-free on hit; locked on miss.
*
* @param <K> the key type
* @param <TEntry> the user's {@link Hashtable.D1.Entry D1.Entry&lt;K&gt;} subclass
*/
public static final class D1<K, TEntry extends Hashtable.D1.Entry<K>> {

private final AtomicReferenceArray<Hashtable.Entry> buckets;
private final AtomicInteger size = new AtomicInteger();

public D1(int capacity) {
this.buckets = new AtomicReferenceArray<>(Hashtable.Support.sizeFor(capacity));
}

public int size() {
return size.get();
}

@SuppressWarnings("unchecked")
public TEntry get(K key) {
long keyHash = Hashtable.D1.Entry.hash(key);
for (TEntry te = (TEntry) buckets.get(Support.bucketIndex(buckets, keyHash));
te != null;
te = te.next()) {
if (te.keyHash == keyHash && te.matches(key)) {
return te;
}
}
return null;
}

/**
* Returns the entry for {@code key}, creating one via {@code creator} if absent. Lock-free on
* hit; acquires a table-level lock on miss. Re-checks under the lock to avoid duplicate entries
* under concurrent misses.
*/
@SuppressWarnings("unchecked")
public TEntry getOrCreate(K key, Function<? super K, ? extends TEntry> creator) {
long keyHash = Hashtable.D1.Entry.hash(key);
int index = Support.bucketIndex(buckets, keyHash);
for (TEntry te = (TEntry) buckets.get(index); te != null; te = te.next()) {
if (te.keyHash == keyHash && te.matches(key)) {
return te;
}
}
synchronized (this) {
for (TEntry te = (TEntry) buckets.get(index); te != null; te = te.next()) {
if (te.keyHash == keyHash && te.matches(key)) {
return te;
}
}
TEntry newEntry = creator.apply(key);
newEntry.setNext((TEntry) buckets.get(index));
buckets.set(index, newEntry);
size.incrementAndGet();
return newEntry;
}
}

public void forEach(Consumer<? super TEntry> consumer) {
Support.forEach(buckets, consumer);
}

/**
* Context-passing forEach. Avoids a capturing-lambda allocation — pass a non-capturing {@link
* BiConsumer} (typically a {@code static final}) plus whatever side-band state it needs.
*/
public <T> void forEach(T context, BiConsumer<? super T, ? super TEntry> consumer) {
Support.forEach(buckets, context, consumer);
}
}

/**
* Two-key (composite-key) concurrent hash table. Lock-free on hit; locked on miss.
*
* <p>Key parts are passed directly to {@link #get} and {@link #getOrCreate}, eliminating the
* per-lookup composite key object allocation that {@code ConcurrentHashMap<Pair<K1,K2>, V>}
* requires.
*
* @param <K1> first key type
* @param <K2> second key type
* @param <TEntry> the user's {@link Hashtable.D2.Entry D2.Entry&lt;K1, K2&gt;} subclass
*/
public static final class D2<K1, K2, TEntry extends Hashtable.D2.Entry<K1, K2>> {

private final AtomicReferenceArray<Hashtable.Entry> buckets;
private final AtomicInteger size = new AtomicInteger();

public D2(int capacity) {
this.buckets = new AtomicReferenceArray<>(Hashtable.Support.sizeFor(capacity));
}

public int size() {
return size.get();
}

@SuppressWarnings("unchecked")
public TEntry get(K1 key1, K2 key2) {
long keyHash = Hashtable.D2.Entry.hash(key1, key2);
for (TEntry te = (TEntry) buckets.get(Support.bucketIndex(buckets, keyHash));
te != null;
te = te.next()) {
if (te.keyHash == keyHash && te.matches(key1, key2)) {
return te;
}
}
return null;
}

/**
* Returns the entry for {@code (key1, key2)}, creating one via {@code creator} if absent.
* Lock-free on hit; acquires a table-level lock on miss. Re-checks under the lock to avoid
* duplicate entries under concurrent misses.
*
* <p>The {@code creator} should build an entry whose {@code keyHash} equals {@link
* Hashtable.D2.Entry#hash(Object, Object) D2.Entry.hash(key1, key2)}.
*/
@SuppressWarnings("unchecked")
public TEntry getOrCreate(
K1 key1, K2 key2, BiFunction<? super K1, ? super K2, ? extends TEntry> creator) {
long keyHash = Hashtable.D2.Entry.hash(key1, key2);
int index = Support.bucketIndex(buckets, keyHash);
for (TEntry te = (TEntry) buckets.get(index); te != null; te = te.next()) {
if (te.keyHash == keyHash && te.matches(key1, key2)) {
return te;
}
}
synchronized (this) {
for (TEntry te = (TEntry) buckets.get(index); te != null; te = te.next()) {
if (te.keyHash == keyHash && te.matches(key1, key2)) {
return te;
}
}
TEntry newEntry = creator.apply(key1, key2);
newEntry.setNext((TEntry) buckets.get(index));
buckets.set(index, newEntry);
size.incrementAndGet();
return newEntry;
}
}

public void forEach(Consumer<? super TEntry> consumer) {
Support.forEach(buckets, consumer);
}

/**
* Context-passing forEach. Avoids a capturing-lambda allocation — pass a non-capturing {@link
* BiConsumer} (typically a {@code static final}) plus whatever side-band state it needs.
*/
public <T> void forEach(T context, BiConsumer<? super T, ? super TEntry> consumer) {
Support.forEach(buckets, context, consumer);
}
}

/** Building blocks for concurrent hash-table operations, mirroring {@link Hashtable.Support}. */
public static final class Support {
private Support() {}

public static int bucketIndex(AtomicReferenceArray<Hashtable.Entry> buckets, long keyHash) {
return (int) (keyHash & (buckets.length() - 1));
}

@SuppressWarnings("unchecked")
public static <TEntry extends Hashtable.Entry> void forEach(
AtomicReferenceArray<Hashtable.Entry> buckets, Consumer<? super TEntry> consumer) {
for (int i = 0; i < buckets.length(); i++) {
for (TEntry te = (TEntry) buckets.get(i); te != null; te = te.next()) {
consumer.accept(te);
}
}
}

@SuppressWarnings("unchecked")
public static <T, TEntry extends Hashtable.Entry> void forEach(
AtomicReferenceArray<Hashtable.Entry> buckets,
T context,
BiConsumer<? super T, ? super TEntry> consumer) {
for (int i = 0; i < buckets.length(); i++) {
for (TEntry te = (TEntry) buckets.get(i); te != null; te = te.next()) {
consumer.accept(context, te);
}
}
}
}
}
Loading