Skip to content

feat(dispatch): turn-boundary batching dispatcher v2#686

Open
brettchien wants to merge 1 commit intoopenabdev:mainfrom
brettchien:feature/turn-boundary-batching-v2
Open

feat(dispatch): turn-boundary batching dispatcher v2#686
brettchien wants to merge 1 commit intoopenabdev:mainfrom
brettchien:feature/turn-boundary-batching-v2

Conversation

@brettchien
Copy link
Copy Markdown
Contributor

@brettchien brettchien commented May 1, 2026

Summary

Implements the turn-boundary message batching dispatcher per ADR v0.3 (docs/adr/turn-boundary-batching.md).

Changes

  • src/dispatch.rs (new): Dispatcher, BufferedMessage, ThreadHandle, consumer_loop, dispatch_batch, estimate_tokens, unit tests
  • src/config.rs: MessageProcessingMode enum, max_buffered_messages, max_batch_tokens config fields for Discord/Slack/Gateway
  • src/adapter.rs: AdapterRouter::pack_arrival_event uniform packing (§3.3), ChannelRef/MessageRef types
  • src/discord.rs: branch on message_processing_mode — per-message vs batched dispatch path
  • src/slack.rs: same branching; KeyedAsyncQueue replaced by Dispatcher consumer task
  • src/gateway.rs: same branching
  • src/main.rs: wire Dispatcher instances per adapter

ADR compliance

Implements Phase 1 scope from ADR §4.4: I1 zero-latency first message, I2 at-most-one-in-flight-turn, I3 broker structural fidelity, SendError eviction (§2.5), other_bot_present freshness (§2.6), batch reaction UX (§6.7), graceful shutdown (§6.8).

Testing

Unit tests in src/dispatch.rs: estimate_tokens, pack_arrival_event single/batch/extra-blocks scenarios.

https://discord.com/channels/1491295327620169908/1497977225314832536

@brettchien brettchien requested a review from thepagent as a code owner May 1, 2026 15:49
@github-actions github-actions Bot added closing-soon PR missing Discord Discussion URL — will auto-close in 3 days pending-screening PR awaiting automated screening needs-rebase pending-contributor and removed closing-soon PR missing Discord Discussion URL — will auto-close in 3 days needs-rebase pending-contributor labels May 1, 2026
@brettchien brettchien force-pushed the feature/turn-boundary-batching-v2 branch 6 times, most recently from c408a5a to 00d7b25 Compare May 2, 2026 04:16
@brettchien brettchien force-pushed the feature/turn-boundary-batching-v2 branch from 00d7b25 to 5b7e08c Compare May 2, 2026 05:17
@brettchien
Copy link
Copy Markdown
Contributor Author

Status update — SendError testing approach

Added in latest amend (5b7e08c):

  • Extracted Dispatcher::try_evict_locked(map, key, my_generation) -> bool from submit()'s inline eviction.
  • Fixed the §2.5 generation mechanism: ThreadHandle.generation was hardcoded to 0, making race-safe eviction degenerate (any stale producer could remove a freshly-inserted handle). Now backed by Dispatcher.next_generation: AtomicU64, pre-fetched per submit() and consumed only on lazy insert. Wasted values are fine — generations need only be monotonic, not contiguous.
  • 3 unit tests covering the eviction predicate: match → remove, mismatch → keep, absent → false.

Not in this PR — full SendError end-to-end recovery:
The user-visible path (submit → tx.send → SendError → try_evict_locked → adapter.add_reaction + send_message → return Err) needs a Dispatcher → DispatchTarget trait seam to mock AdapterRouter (currently a concrete struct with SessionPool + real CLI subprocesses). That refactor is wider than Phase 1 scope.

Phase 1 coverage strategy: unit-test the predicate (here), defer the integrated path to a manual staging smoke matrix entry in the ADR (PR #598). Smoke procedure draft (TBD landing decision):

  1. SIGKILL the agent CLI subprocess for an active session.
  2. Send a new mention into the same thread.
  3. Verify ❌ on trigger_msg + ⚠️ dispatch consumer exited unexpectedly reply + next mention runs cleanly on a fresh consumer (logs show generation = N+1).

Decision pending: land the §6.10 smoke entry in PR #598's ADR doc, or co-locate here.

@brettchien
Copy link
Copy Markdown
Contributor Author

What's in PR 686 — feature checklist

Core architecture

  • New per-thread Dispatcher (src/dispatch.rs, 722 lines) — bounded tokio::sync::mpsc::channel + dedicated consumer task per active thread. Replaces v0.8.2-beta.1's per-message direct dispatch and Slack's KeyedAsyncQueue (now removed).
  • BufferedMessage carries sender_json, prompt, extra_blocks, trigger_msg, arrived_at, estimated_tokens.
  • consumer_loop blocks on rx.recv() for the first message (zero added latency), then greedy try_recv drain up to max_buffered_messages or max_batch_tokens.

Three invariants enforced (ADR §1)

  • I1 Zero added latency on the first message after idle.
  • I2 At most one in-flight ACP turn per thread (consumer task serializes).
  • I3 Broker structural fidelity — no merge/split/reorder; each BufferedMessage becomes its own pack_arrival_event block sequence concatenated in arrival order.

Modes & config (src/config.rs)

  • New MessageProcessingMode enum: PerMessage (default, v0.8.2-beta.1 behaviour) / Batched (opt-in).
  • Per-platform on DiscordConfig / SlackConfig / GatewayConfig: message_processing_mode, max_buffered_messages (default 10), max_batch_tokens (default 24000).

Adapter wiring

  • Discord (src/discord.rs), Slack (src/slack.rs), Gateway (src/gateway.rs) all route through the same Dispatcher when in Batched mode.
  • Dispatcher instances are tracked in main.rs and shut down ahead of pool.shutdown() on SIGTERM.
  • Per-message mode unchanged.

Slash commands

  • /reset now also cancels in-flight buffered messages — merges the originally-planned standalone /cancel-all into the existing reset path. Calls Dispatcher::cancel_buffered(thread_key) before pool.reset_session(...). Reports the dropped count when non-zero (e.g. 🔄 Session reset. Dropped 3 buffered message(s). Start a new conversation!).
  • Discord-only; Slack has no /reset to extend.

Packing (AdapterRouter::pack_arrival_event, ADR §3.6 Scenarios A-D)

  • One BufferedMessage[<sender_context> + prompt header, ...extra_blocks]. Multi-message batches concatenate sequence-preserving.
  • Scenario D ordering fix: voice transcript text now follows <sender_context> (was prepended in v0.8.2-beta.1) — matches the position used for image attachments.

Observability (ADR §6.6)

  • info_span!("dispatch", channel = %channel_id, adapter = %platform) per dispatched batch.
  • Five fields per span: events_per_dispatch, packed_block_count, agent_dispatch_ms, tokens_per_event, wait_ms.

Reaction UX (ADR §6.7)

  • 👀 applied to every message in a batch before dispatch (not just the trigger).

Error handling (ADR §2.5)

  • tokio::sync::mpsc::error::SendError (consumer task died) → ❌ on trigger_msg + ⚠️ dispatch consumer exited unexpectedly reply + DispatchError::ConsumerDead returned.
  • Race-safe eviction: try_evict_locked helper + monotonic Dispatcher.next_generation: AtomicU64. Replaces previously-broken hardcoded generation: 0 (which made any stale producer able to remove a freshly-inserted handle).

Graceful shutdown (ADR §6.8)

  • Dispatcher::shutdown() invoked on SIGTERM ahead of pool.shutdown(). Logs buffered_lost per thread for any messages dropped before they could dispatch.

Token estimation

  • estimate_tokens(prompt, extra_blocks): ~4 chars/token for text, fixed 512 per image block. Coarse — guard rail, not pre-flight.

Tests (11 unit tests in src/dispatch.rs)

  • Token estimation: empty / text / image / oversized single message / cumulative cap.
  • Pack arrival event Scenarios A–D: single text, separate-message image (B), multi-author interleaved (C), voice transcript ordering (D).
  • try_evict_locked: generation match → remove, mismatch → keep, key absent → false.

Removed

  • Slack KeyedAsyncQueue and its acquire() call sites in app_mention / message handlers — per-thread serialization moves to the Dispatcher consumer task.

Deferred to follow-up PRs

  • Async integration tests for consumer_loop (mid-turn arrival, fragmentation, buffer-full parking) — needs a Dispatcher → DispatchTarget trait seam that's wider than Phase 1 scope.
  • Full SendError end-to-end coverage — same trait-seam blocker. Manual staging smoke procedure being added to the ADR (PR docs: add ADR for turn-boundary message batching #598).
  • Cross-agent Scenario D runtime smoke (ADR §6.9) — needs per-agent API credentials not present in CI.

@brettchien
Copy link
Copy Markdown
Contributor Author

brettchien commented May 2, 2026

ADR Phase 1 (§4.4) — item-by-item checklist

Mapping each deliverable in ADR §4.4 to the code in 16bed85. Legend: ✅ done · ⚠️ done with caveat · ❌ deferred.

Mechanism deliverables

  • New module src/dispatch.rsDispatcher + ThreadHandle + consumer_loop (pack_arrival_event lives in src/adapter.rs as AdapterRouter::pack_arrival_event, not in dispatch.rs — ADR allows this; same function, different home).
  • MessageProcessingMode enum in config.rs (default PerMessage).
  • Discord wiring — branch on mode at discord.rs:629+.
  • Slack wiringKeyedAsyncQueue removed; dispatcher.submit() at slack.rs:1107.
  • Gateway wiringdispatcher.submit() at gateway.rs:484.
  • PackingSenderContext.timestamp additive; pack_arrival_event uniform for batch.len() == 1 and ≥ 2; extra_blocks interleaved after each <sender_context>.
  • SendError handling (§2.5) — evict + ❌ on trigger_msg + ⚠️ reply + Err(DispatchError::ConsumerDead).
  • submit does NOT touch last_active — verified, last_active only updated in acp/connection.rs on session prompts.
  • other_bot_present freshness (§2.6)fixed in 16bed85. Replaced the broken Arc<AtomicBool> mirror with a per-message bool snapshot on BufferedMessage. dispatch_batch reads batch.last().unwrap().other_bot_present — the freshest snapshot in the batch. This mirrors v0.8.2-beta.1's per-message by-value pattern (the same bool is computed at the same place in discord.rs/slack.rs/gateway.rs; only difference vs. released code is that it's parked in BufferedMessage until dispatch instead of consumed inline). No Arc, no shared mutable state, no spawn-time vs dispatch-time gap. Dispatcher::submit and consumer_loop signatures lost a parameter as a side benefit.
  • std::sync::Mutex + ThreadHandle.generation: u64Mutex<HashMap<...>> confirmed; generation now backed by monotonic Dispatcher.next_generation: AtomicU64 (was hardcoded 0, fixed in 5b7e08c).
  • ⚠️ max_buffered_messages configurable (default 10) — done. ADR also mentions "multi-bot 30" default; not implemented as a separate auto-bumped value, but reachable via per-channel config override (which §4.4 lists under Phase 2 anyway).
  • max_batch_tokens soft cap (default 24000).
  • 👀 reaction on ALL messages in batch before dispatch (§6.7).
  • Trailing message anchors StatusReactionControllerdispatch.rs:363 uses batch.last().unwrap().trigger_msg.
  • ⚠️ /cancel-all command + Dispatcher::cancel_bufferedcancel_buffered exists; /cancel-all was merged into /reset per direction, not landed as a standalone command. Discord-only (Slack has no /reset to extend). Implementation diverges from ADR text by design — cancel_buffered removes the entry unconditionally before aborting (rationale at dispatch.rs:199-201: removal precedes abort, so a fresh submit after cancel_buffered returns lands on a new handle without observing SendError; generation field not consulted on this path).
  • Tracing spans (§6.6)info_span!("dispatch", channel, adapter) with five fields: events_per_dispatch, packed_block_count, agent_dispatch_ms, tokens_per_event, wait_ms.
  • SIGTERM: log per-thread buffered count before drop (§6.8)Dispatcher::shutdown() invoked from main.rs ahead of pool.shutdown().
  • Cross-agent recognition smoke fixture (Claude Code / Cursor / Copilot — Scenario D) — deferred; needs per-agent API credentials not present in CI. Captured as a manual staging smoke (ADR §6.9).

Tests required by §4.4

  • ⚠️ single-message batch — proxy via pack_arrival_event_single; full consumer_loop round-trip not exercised.
  • ⚠️ 3-message fragmentation merges into one batch — proxy via pack_arrival_event_batch_n2 (2 msgs, structural only); full consumer-loop fragmentation deferred.
  • new message arrives mid-turn → joins next batch — consumer-loop integration test, deferred (needs trait seam).
  • /cancel during batched turn does not drop buffer — deferred.
  • /cancel-all (now /reset) drops buffered + aborts consumer — deferred (would test full handle removal + reset_session interaction).
  • ⚠️ SendError → evict + ❌ + ⚠️ + return Err — predicate unit-tested (try_evict_locked_*); full user-visible path deferred to staging smoke (ADR §6.10 proposed).
  • concurrent SendError → only one eviction; both observers react on own trigger — covered by try_evict_locked_keeps_when_generation_differs (the predicate guarantees only the matching-generation observer evicts; per-observer ❌ + ⚠️ wired in submit() and reviewed structurally).
  • buffer-full → submit parks (no Err, no reaction, no ⚠️) — deferred.
  • other_bot_present freshness (3-turn timeline) — deferred (behaviour is now a property of the immutable per-message snapshot — easier to test if/when the trait seam lands; the freshness mechanism itself is fixed).
  • oversized batch (cumulative tokens > cap) splits across two ACP turns; FIFO preserved — only the math is unit-tested (estimate_tokens_cumulative_exceeds_cap); the consumer-loop split-across-turns behaviour is not.
  • single message > max_batch_tokens dispatches aloneestimate_tokens_oversized_single_message.
  • voice-only Scenario D pack outputpack_arrival_event_scenario_d_voice_only.
  • queued reaction applied to all batch messages before dispatch — code path verified by reading (dispatch.rs:331-333); no unit test (would need adapter mock).
  • Scenario B packing (image in separate message, same author)pack_arrival_event_scenario_b_image_in_separate_message.
  • Scenario C packing (multi-author interleaved)pack_arrival_event_scenario_c_multi_author_interleaved.

Tally

  • Mechanism: 17/19 ✅, 1 ⚠️ (/cancel-all intentionally folded into /reset; max_buffered_messages multi-bot 30 default left to per-channel config override / Phase 2), 1 ❌ (cross-agent smoke).
  • Tests: 5/15 ✅, 3 ⚠️ (packing-side proxies for full consumer-loop coverage), 7 ❌ (consumer-loop integration / freshness behaviour / queued-reaction — all blocked by the same Dispatcher → DispatchTarget trait seam).

Recommended follow-ups (separate PRs)

  1. Trait seam for Dispatcher (introduce DispatchTarget trait wrapping the AdapterRouter-facing surface) → unlocks the 7 ❌ tests and the §6.10 SendError end-to-end test in CI.
  2. Cross-agent Scenario D smoke — once CI fixture for per-agent credentials is in place.

Edits

@brettchien brettchien force-pushed the feature/turn-boundary-batching-v2 branch from 5b7e08c to 16bed85 Compare May 2, 2026 06:21
@shaun-agent
Copy link
Copy Markdown
Contributor

OpenAB PR Screening

This is auto-generated by the OpenAB project-screening flow for context collection and reviewer handoff.
Click 👍 if you find this useful. Human review will be done within 24 hours. We appreciate your support and contribution 🙏

Screening report ## Intent

PR #686 aims to add turn-boundary message batching across Discord, Slack, and Gateway adapters. The operator-visible problem is that rapid follow-up messages can currently be processed as separate turns, creating unnecessary agent runs, fragmented context, and awkward response timing. This PR introduces a dispatcher that starts the first message immediately, buffers later messages while a turn is in flight, and dispatches them as a batch at the next turn boundary.

Feat

Feature: a new dispatch layer for batched message processing.

Behavioral change: adapters can run either in existing per-message mode or in batched mode via configuration. Batched mode enforces one active turn per thread/channel context, preserves structured arrival metadata, evicts failed send paths, and adds buffer/token limits.

Who It Serves

Primary beneficiaries: Discord and Slack end users who send multi-message bursts, plus agent runtime operators who need lower duplicate-run pressure and cleaner turn semantics.

Secondary beneficiaries: maintainers and reviewers, because adapter behavior becomes more explicit through Dispatcher, ThreadHandle, BufferedMessage, and shared packing logic.

Rewritten Prompt

Implement Phase 1 of the turn-boundary batching ADR.

Add a shared dispatcher that supports configurable per-message or batched processing for Discord, Slack, and Gateway. In batched mode, the first message for a thread should dispatch immediately, subsequent messages should buffer while the turn is active, and exactly one buffered batch should dispatch when the active turn completes. Preserve message ordering, channel/message references, adapter routing metadata, and existing per-message behavior when batching is disabled.

Add config fields for processing mode, max buffered messages, and max batch tokens. Add focused tests for token estimation, arrival-event packing, batching shape, buffer limits, SendError eviction, and no overlapping turns for the same thread.

Merge Pitch

This is worth advancing because it addresses a real interaction quality issue: users often send thoughts in bursts, but agents should usually answer once per coherent turn. The PR also centralizes dispatch behavior instead of leaving each adapter to approximate batching independently.

Risk profile: moderate to high. It adds a new concurrency primitive and touches Discord, Slack, Gateway, config, and main wiring. Likely reviewer concerns are message loss, duplicate dispatch, ordering guarantees, shutdown behavior, backpressure, and whether the test coverage is strong enough for async edge cases.

Best-Practice Comparison

OpenClaw principles that apply:

  • Gateway-owned scheduling: partially relevant. This is not scheduled-job execution, but OpenAB benefits from having dispatch ownership centralized rather than spread across adapters.
  • Durable job persistence: not implemented here. Buffered messages appear in-memory, so process crashes can lose pending batches. Acceptable for Phase 1 only if documented.
  • Isolated executions: relevant. The at-most-one-in-flight-turn invariant is aligned with isolated per-thread execution.
  • Explicit delivery routing: relevant. ChannelRef, MessageRef, and uniform arrival packing move in this direction.
  • Retry/backoff and run logs: only partly covered. SendError eviction helps, but reviewers should look for observability around dropped batches, buffer overflow, and dispatch failures.

Hermes Agent principles that apply:

  • Gateway daemon tick model: mostly not relevant; this is event-driven message dispatch, not periodic scheduled execution.
  • File locking to prevent overlap: conceptually relevant. The dispatcher’s per-thread in-flight guard is the equivalent overlap-prevention mechanism.
  • Atomic writes for persisted state: not relevant unless batching state becomes durable later.
  • Fresh session per scheduled run: not relevant to message batching.
  • Self-contained prompts for scheduled tasks: partly relevant through structured batch packing; each batched dispatch should include enough arrival context to be independently understandable.

Implementation Options

Conservative option: merge only the shared packing/config groundwork and keep adapters in per-message mode by default. Land MessageProcessingMode, ChannelRef, MessageRef, and pack_arrival_event, but defer the dispatcher until tests and rollout semantics are tighter.

Balanced option: merge the dispatcher behind opt-in config. Keep current behavior as the default, enable batched mode per adapter, and require async tests for ordering, no overlap, overflow handling, and shutdown before merge.

Ambitious option: evolve the dispatcher into a durable runtime queue. Persist buffered messages, add retry/backoff, structured run logs, metrics, and recovery after restart. This would align more closely with OpenClaw-style durable execution but is larger than the current PR.

Comparison Table

Option Speed to ship Complexity Reliability Maintainability User impact Fit for OpenAB right now
Conservative groundwork only High Low Medium High Low Good if reviewers are unsure about async behavior
Opt-in dispatcher behind config Medium Medium Medium-High with tests Medium-High High for bursty chat users Best current fit
Durable dispatcher/runtime queue Low High High Medium High Better as follow-up work

Recommendation

Advance the balanced option: review PR #686 as an opt-in Phase 1 dispatcher, with current per-message behavior preserved as the safe default.

Before merge discussion, require focused async coverage for the core invariants: first-message zero latency, at-most-one in-flight turn, ordered batch dispatch, buffer/token limits, SendError eviction, and graceful shutdown. Split durable persistence, retry/backoff, and richer run logs into a follow-up PR so this change stays reviewable.

Copy link
Copy Markdown
Collaborator

@chaodu-agent chaodu-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CHANGES_REQUESTED — Strong architecture with thorough ADR backing and good test coverage, but three issues need attention before merge.

Baseline Check

Main has per-message dispatch (each inbound → one tokio::spawn → one ACP turn), inline block-packing in adapter.rs:131-152 that reorders extra_blocks (text blocks before sender_context, images after), Slack KeyedAsyncQueue for FIFO ordering, and no batching mechanism.

PR adds: src/dispatch.rs (724 lines) with Dispatcher, BufferedMessage, consumer_loop, dispatch_batch; MessageProcessingMode enum; pack_arrival_event() unified packing; SenderContext.timestamp; ReactionsConfig gains Clone; Slack KeyedAsyncQueue removed; /reset integration.

四問框架
  1. What problem? Multiple messages arriving during an in-flight ACP turn become separate sequential turns — wasting tokens, losing attachment attribution, non-deterministic ordering.
  2. How? Per-thread bounded mpsc::channel with consumer task. First message fires immediately (I1), subsequent messages buffer and batch at turn boundary. Config-gated: per-message (default) or batched.
  3. Alternatives? Pre-turn debouncing (rejected: adds latency), mutex-level coalescing (rejected: opaque), <message index=N> wrapper (rejected: attribution loss). All documented in ADR #598.
  4. Best approach? Architecture is sound. Three issues below need resolution.
Traffic Light

🔴 SUGGESTED CHANGES

1. Block ordering semantic change in pack_arrival_event

On main, adapter.rs:131-152 reorders extra_blocks: text blocks (voice transcripts) go before the sender_context header, image blocks go after. The new pack_arrival_event places all extra_blocks after the header in arrival order. This changes behavior even in PerMessage mode (which also calls pack_arrival_event). Voice transcript blocks will now appear after the prompt instead of before it.

Recommendation: Document this ordering change explicitly. If intentional (ADR says it is), confirm existing agents handle transcripts appearing after the prompt correctly. Consider whether this warrants a CHANGELOG entry.

2. ReactionsConfig::default() used for queued emoji in dispatch_batch

let queued_emoji = crate::config::ReactionsConfig::default().emojis.queued;

This ignores the user's actual reactions config (custom emojis, enabled flag). The router's config is available via router.reactions_config() (which this PR itself adds). Should use the actual config.

3. Slack KeyedAsyncQueue removal affects PerMessage mode

The PR removes KeyedAsyncQueue entirely from Slack, but in PerMessage mode there's now no per-thread serialization — messages go directly to router.handle_message() without the semaphore guard. On main, KeyedAsyncQueue ensured FIFO ordering even in per-message mode. This is a regression for PerMessage Slack users.

Recommendation: Either keep KeyedAsyncQueue for PerMessage mode, or document that PerMessage mode on Slack no longer guarantees strict FIFO ordering.

🟡 NIT

  • Duplicated days_to_ymd / timestamp conversion between slack.rs and gateway.rs — extract to shared utility
  • sender_name field from ADR §2.3 missing in BufferedMessage — note the divergence
  • DispatchError doesn't implement std::error::Error — limits composability with anyhow

🟢 INFO

  • Excellent test coverage for packing logic (single message, batch of 2, extra blocks, all four ADR §3.6 scenarios)
  • Clean config gating — default PerMessage is fully backward-compatible
  • Graceful shutdown with buffered_lost counts per thread
  • /reset integration drops pending messages and reports count to user
  • stream_prompt_blocks extraction enables clean reuse

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pending-contributor pending-screening PR awaiting automated screening

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants