feat(dispatch): turn-boundary batching dispatcher v2#686
feat(dispatch): turn-boundary batching dispatcher v2#686brettchien wants to merge 1 commit intoopenabdev:mainfrom
Conversation
c408a5a to
00d7b25
Compare
00d7b25 to
5b7e08c
Compare
Status update — SendError testing approachAdded in latest amend (
Not in this PR — full Phase 1 coverage strategy: unit-test the predicate (here), defer the integrated path to a manual staging smoke matrix entry in the ADR (PR #598). Smoke procedure draft (TBD landing decision):
Decision pending: land the §6.10 smoke entry in PR #598's ADR doc, or co-locate here. |
What's in PR 686 — feature checklistCore architecture
Three invariants enforced (ADR §1)
Modes & config (
Adapter wiring
Slash commands
Packing (
Observability (ADR §6.6)
Reaction UX (ADR §6.7)
Error handling (ADR §2.5)
Graceful shutdown (ADR §6.8)
Token estimation
Tests (11 unit tests in
Removed
Deferred to follow-up PRs
|
ADR Phase 1 (§4.4) — item-by-item checklistMapping each deliverable in ADR §4.4 to the code in Mechanism deliverables
Tests required by §4.4
Tally
Recommended follow-ups (separate PRs)
Edits
|
5b7e08c to
16bed85
Compare
OpenAB PR ScreeningThis is auto-generated by the OpenAB project-screening flow for context collection and reviewer handoff.
Screening report## IntentPR #686 aims to add turn-boundary message batching across Discord, Slack, and Gateway adapters. The operator-visible problem is that rapid follow-up messages can currently be processed as separate turns, creating unnecessary agent runs, fragmented context, and awkward response timing. This PR introduces a dispatcher that starts the first message immediately, buffers later messages while a turn is in flight, and dispatches them as a batch at the next turn boundary. FeatFeature: a new dispatch layer for batched message processing. Behavioral change: adapters can run either in existing per-message mode or in batched mode via configuration. Batched mode enforces one active turn per thread/channel context, preserves structured arrival metadata, evicts failed send paths, and adds buffer/token limits. Who It ServesPrimary beneficiaries: Discord and Slack end users who send multi-message bursts, plus agent runtime operators who need lower duplicate-run pressure and cleaner turn semantics. Secondary beneficiaries: maintainers and reviewers, because adapter behavior becomes more explicit through Rewritten PromptImplement Phase 1 of the turn-boundary batching ADR. Add a shared dispatcher that supports configurable per-message or batched processing for Discord, Slack, and Gateway. In batched mode, the first message for a thread should dispatch immediately, subsequent messages should buffer while the turn is active, and exactly one buffered batch should dispatch when the active turn completes. Preserve message ordering, channel/message references, adapter routing metadata, and existing per-message behavior when batching is disabled. Add config fields for processing mode, max buffered messages, and max batch tokens. Add focused tests for token estimation, arrival-event packing, batching shape, buffer limits, SendError eviction, and no overlapping turns for the same thread. Merge PitchThis is worth advancing because it addresses a real interaction quality issue: users often send thoughts in bursts, but agents should usually answer once per coherent turn. The PR also centralizes dispatch behavior instead of leaving each adapter to approximate batching independently. Risk profile: moderate to high. It adds a new concurrency primitive and touches Discord, Slack, Gateway, config, and main wiring. Likely reviewer concerns are message loss, duplicate dispatch, ordering guarantees, shutdown behavior, backpressure, and whether the test coverage is strong enough for async edge cases. Best-Practice ComparisonOpenClaw principles that apply:
Hermes Agent principles that apply:
Implementation OptionsConservative option: merge only the shared packing/config groundwork and keep adapters in per-message mode by default. Land Balanced option: merge the dispatcher behind opt-in config. Keep current behavior as the default, enable batched mode per adapter, and require async tests for ordering, no overlap, overflow handling, and shutdown before merge. Ambitious option: evolve the dispatcher into a durable runtime queue. Persist buffered messages, add retry/backoff, structured run logs, metrics, and recovery after restart. This would align more closely with OpenClaw-style durable execution but is larger than the current PR. Comparison Table
RecommendationAdvance the balanced option: review PR #686 as an opt-in Phase 1 dispatcher, with current per-message behavior preserved as the safe default. Before merge discussion, require focused async coverage for the core invariants: first-message zero latency, at-most-one in-flight turn, ordered batch dispatch, buffer/token limits, SendError eviction, and graceful shutdown. Split durable persistence, retry/backoff, and richer run logs into a follow-up PR so this change stays reviewable. |
chaodu-agent
left a comment
There was a problem hiding this comment.
CHANGES_REQUESTED — Strong architecture with thorough ADR backing and good test coverage, but three issues need attention before merge.
Baseline Check
Main has per-message dispatch (each inbound → one tokio::spawn → one ACP turn), inline block-packing in adapter.rs:131-152 that reorders extra_blocks (text blocks before sender_context, images after), Slack KeyedAsyncQueue for FIFO ordering, and no batching mechanism.
PR adds: src/dispatch.rs (724 lines) with Dispatcher, BufferedMessage, consumer_loop, dispatch_batch; MessageProcessingMode enum; pack_arrival_event() unified packing; SenderContext.timestamp; ReactionsConfig gains Clone; Slack KeyedAsyncQueue removed; /reset integration.
四問框架
- What problem? Multiple messages arriving during an in-flight ACP turn become separate sequential turns — wasting tokens, losing attachment attribution, non-deterministic ordering.
- How? Per-thread bounded
mpsc::channelwith consumer task. First message fires immediately (I1), subsequent messages buffer and batch at turn boundary. Config-gated:per-message(default) orbatched. - Alternatives? Pre-turn debouncing (rejected: adds latency), mutex-level coalescing (rejected: opaque),
<message index=N>wrapper (rejected: attribution loss). All documented in ADR #598. - Best approach? Architecture is sound. Three issues below need resolution.
Traffic Light
🔴 SUGGESTED CHANGES
1. Block ordering semantic change in pack_arrival_event
On main, adapter.rs:131-152 reorders extra_blocks: text blocks (voice transcripts) go before the sender_context header, image blocks go after. The new pack_arrival_event places all extra_blocks after the header in arrival order. This changes behavior even in PerMessage mode (which also calls pack_arrival_event). Voice transcript blocks will now appear after the prompt instead of before it.
Recommendation: Document this ordering change explicitly. If intentional (ADR says it is), confirm existing agents handle transcripts appearing after the prompt correctly. Consider whether this warrants a CHANGELOG entry.
2. ReactionsConfig::default() used for queued emoji in dispatch_batch
let queued_emoji = crate::config::ReactionsConfig::default().emojis.queued;This ignores the user's actual reactions config (custom emojis, enabled flag). The router's config is available via router.reactions_config() (which this PR itself adds). Should use the actual config.
3. Slack KeyedAsyncQueue removal affects PerMessage mode
The PR removes KeyedAsyncQueue entirely from Slack, but in PerMessage mode there's now no per-thread serialization — messages go directly to router.handle_message() without the semaphore guard. On main, KeyedAsyncQueue ensured FIFO ordering even in per-message mode. This is a regression for PerMessage Slack users.
Recommendation: Either keep KeyedAsyncQueue for PerMessage mode, or document that PerMessage mode on Slack no longer guarantees strict FIFO ordering.
🟡 NIT
- Duplicated
days_to_ymd/ timestamp conversion betweenslack.rsandgateway.rs— extract to shared utility sender_namefield from ADR §2.3 missing inBufferedMessage— note the divergenceDispatchErrordoesn't implementstd::error::Error— limits composability withanyhow
🟢 INFO
- Excellent test coverage for packing logic (single message, batch of 2, extra blocks, all four ADR §3.6 scenarios)
- Clean config gating — default
PerMessageis fully backward-compatible - Graceful shutdown with
buffered_lostcounts per thread /resetintegration drops pending messages and reports count to userstream_prompt_blocksextraction enables clean reuse
Summary
Implements the turn-boundary message batching dispatcher per ADR v0.3 (
docs/adr/turn-boundary-batching.md).Changes
src/dispatch.rs(new):Dispatcher,BufferedMessage,ThreadHandle,consumer_loop,dispatch_batch,estimate_tokens, unit testssrc/config.rs:MessageProcessingModeenum,max_buffered_messages,max_batch_tokensconfig fields for Discord/Slack/Gatewaysrc/adapter.rs:AdapterRouter::pack_arrival_eventuniform packing (§3.3),ChannelRef/MessageReftypessrc/discord.rs: branch onmessage_processing_mode— per-message vs batched dispatch pathsrc/slack.rs: same branching;KeyedAsyncQueuereplaced byDispatcherconsumer tasksrc/gateway.rs: same branchingsrc/main.rs: wireDispatcherinstances per adapterADR compliance
Implements Phase 1 scope from ADR §4.4: I1 zero-latency first message, I2 at-most-one-in-flight-turn, I3 broker structural fidelity, SendError eviction (§2.5),
other_bot_presentfreshness (§2.6), batch reaction UX (§6.7), graceful shutdown (§6.8).Testing
Unit tests in
src/dispatch.rs:estimate_tokens,pack_arrival_eventsingle/batch/extra-blocks scenarios.https://discord.com/channels/1491295327620169908/1497977225314832536