Skip to content

Flow→Flux graphql subscription: PropagationContextElement opens Observation scopes asymmetrically, tripping ObservationThreadLocalAccessor.restore assertion #36929

@jludvice

Description

@jludvice

Summary

In Spring Framework 7.0.8, GraphQL subscriptions returning a Kotlin Flow<T> over a WebSocket trip ObservationThreadLocalAccessor.restore's contract assertion:

java.lang.AssertionError: Observation <secured.requests> to which we're restoring is not the same as
the one set as this scope's parent observation <graphql.datafetcher>.
Most likely a manually created Observation has a scope opened that was never closed.

Downgrading back to Spring Boot 4.0.6 (Spring Framework to 7.0.7) makes the failure disappear. The change that introduced the regression is #36667 (PropagationContextElement added to the Flow → Flux adapter).

Environment

  • Spring Boot: 4.0.7
  • Spring Framework: 7.0.8 (7.0.7 works)
  • Spring GraphQL: 2.0.4
  • Micrometer Observation: 1.16.6
  • Micrometer context-propagation: 1.2.1
  • kotlinx-coroutines-core / -reactor: 1.11.0
  • JDK: Temurin 25
  • WebSocket transport (graphql-transport-ws)
  • Spring Security on the request chain

Root cause

A client opens a GraphQL-over-WebSocket connection and sends a subscribe frame:

{"id": "1", "type": "subscribe", "payload": {"query": "subscription { events }"}}

The server's handler returns a Kotlin Flow:

@Controller
class EventsController(private val handler: UpdateHandler) {
    @SubscriptionMapping
    fun subscribeEvents(): Flow<Event> =
        handler.updatedEventFlow              // SharedFlow<Event>
            .filter { it.eventProperty == filterValue }
            .map { it.toGql() }
}

class UpdateHandler {
    private val _messageFlow = MutableSharedFlow<Event>(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    val updatedEventFlow: SharedFlow<Event> = _messageFlow.asSharedFlow()
}

Spring GraphQL hands the returned Flow to kotlinx.coroutines.reactor.FlowAsFlux. When the WebSocket subscriber calls request(n) on the resulting FlowSubscription, the following happens synchronously on the WebSocket worker thread:

  1. The coroutine resume invokes PropagationContextElement.updateThreadContext via DispatchedContinuation.resumeCancellableWithInternalcall n.1, expected.
  2. Inside FlowSubscription.consumeFlow, the .filter.map chain has wrapped the upstream SharedFlow in a ChannelFlowOperator, so collection routes through ChannelFlowKt.withContextUndispatched, which invokes updateThreadContext again — call n.2, unexpected: its returned Scope is not closed before the outer thread-context restore runs.
  3. Each call routes through DefaultContextSnapshot.setThreadLocalsObservationThreadLocalAccessor.setValueSimpleObservation.openScope and pushes a fresh SimpleScope for the captured graphql.datafetcher Observation onto the thread-local stack.
  4. Only one PropagationContextElement.restoreThreadContext runs to unwind. With two pushes and one pop, the live previous-scope on the stack no longer matches what the snapshot recorded, and ObservationThreadLocalAccessor.restore(value) throws the assertion.

The kotlinx coroutines machinery then wraps the resulting AssertionError as kotlinx.coroutines.CoroutinesInternalError, which kills the FlowSubscription coroutine and stops event delivery to the WebSocket client.

We could not reproduce the assertion firing in a stripped-down standalone project despite matching all known dependency versions, the parent observation chain (graphql.datafetcher → graphql.request → spring.security.http.secured.requests → spring.security.filterchains → http.server.requests), security setup, and WS transport.
The bug fires reliably in our full Spring Boot + Testcontainers integration test environment.

Evidence

Suspending-debugger capture at the failing thread, just before line ObservationThreadLocalAccessor.java:186 trips (anchor t=0):

t_ms    event                       
 -166   datafetcher.openScope        (legit: ContextDataFetcherDecorator.get → ContextSnapshot.wrap)
  -79   datafetcher.close            (legit: data fetcher returns)
  -20   PropagationContextElement.updateThreadContext  #1  (from FlowSubscription.request)
  -17   datafetcher.openScope        LEAK 1
   -9   PropagationContextElement.updateThreadContext  #2  (from ChannelFlowKt.withContextUndispatched)
   -8   datafetcher.openScope        LEAK 2
   -1   PropagationContextElement.restoreThreadContext     (only ONE restore runs)
    0   AssertionError fires

Net imbalance on the failing thread: PropagationContextElement.updateThreadContext runs twice, but PropagationContextElement.restoreThreadContext runs only once.
Each unbalanced updateThreadContext call invokes ObservationThreadLocalAccessor.setValue, which in turn calls SimpleObservation.openScope and pushes a new SimpleScope for the captured graphql.datafetcher Observation onto the thread-local Observation-scope stack.
When restoreThreadContext finally runs, the stack contains two leaked graphql.datafetcher scopes that the snapshot did not expect.

Single-call-chain stack at the assertion:

ObservationThreadLocalAccessor.restore:186
  ← DefaultContextSnapshot$DefaultScope.close:143
  ← PropagationContextElement.restoreThreadContext:83
  ← ThreadContextKt.restoreThreadContext:90
  ← DispatchedContinuation.resumeCancellableWithInternal
  ← FlowSubscription.request:257
  ← FlowAsFlux.subscribe:34
  ← ... → GraphQlWebSocketHandler.handleInternal:273

Symptom timing

Our Tests log the WARN (Observation to which we're restoring) on every subscription. Whether it escalates to fatal coroutine failure (kotlinx.coroutines.CoroutinesInternalError) is timing-sensitive — same test, same code, fails ~50% of runs but passes with debugger attached.

Expected behavior

PropagationContextElement.updateThreadContext should be idempotent across nested ThreadContextElement invocations within a single coroutine resume sequence on the same thread — or FlowSubscription.consumeFlow should not re-apply the captured snapshot a second time when the upstream context is already current on the thread.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions