Skip to content

Change broadcast replacement strategy to queue backups instead of reannouncing#1319

Merged
kixelated merged 2 commits intomainfrom
claude/fix-originproducer-ordering-YuHdL
Apr 17, 2026
Merged

Change broadcast replacement strategy to queue backups instead of reannouncing#1319
kixelated merged 2 commits intomainfrom
claude/fix-originproducer-ordering-YuHdL

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

Summary

This PR changes how the origin node handles multiple broadcasts on the same path. Instead of immediately replacing the active broadcast and reannouncing when a new one is published, the new broadcast is now queued as a backup. The active broadcast only changes when the current one is closed, at which point the oldest queued backup is promoted and reannounced.

Key Changes

  • Modified publish behavior: When a new broadcast is published to a path that already has an active broadcast, the new one is added to a backup queue instead of replacing the active one and triggering a reannounce
  • Changed unpublish behavior: When the active broadcast closes, the oldest backup (using remove(0) instead of pop()) is promoted to active and reannounced, rather than the most recently added one
  • Updated documentation: The publish_broadcast method documentation now accurately describes the new queuing behavior and promotion strategy
  • Updated test expectations: The test now verifies that only the oldest broadcast is announced initially, and that backups are promoted in FIFO order when the active broadcast closes

Implementation Details

  • The backup queue now operates as a FIFO queue (oldest first) rather than a LIFO stack, ensuring fair ordering of backup broadcasts
  • This change avoids unnecessary reannouncements when multiple broadcasts are published in quick succession, reducing disruption to subscribers
  • The promotion logic explicitly checks if the backup queue is non-empty before attempting to promote, maintaining the same unannounce behavior when no backups remain

https://claude.ai/code/session_01GmwjbStTAmsRUaNYjkapxn

When a new broadcast is published to a path that already has one, keep
the older broadcast active and queue the new one as a backup instead
of reannouncing. This avoids disrupting subscribers with reannounces
when duplicate publishers arrive (e.g. reconnects).

When the active broadcast closes, the oldest remaining backup is
promoted (and reannounced) to preserve the "prefer older" ordering.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 17, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fa91ce69-03a0-4def-b9bc-fda85532e237

📥 Commits

Reviewing files that changed from the base of the PR and between a405bf0 and 405f3ea.

📒 Files selected for processing (1)
  • rs/moq-lite/src/model/origin.rs

Walkthrough

Duplicate-broadcast handling in the origin module was changed: when multiple broadcasts are published to the same path, the active broadcast is retained and subsequent publishes are enqueued as FIFO backups. On active broadcast removal, the oldest queued backup is promoted and reannounced; if no backups remain, the entry is unannounced. OriginProducer::publish_broadcast documentation and tests were updated to reflect this behavior.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely summarizes the primary change: switching from immediate reannouncement to FIFO queue-based backup handling for duplicate broadcasts.
Description check ✅ Passed The description is well-related to the changeset, detailing the new backup queuing strategy, FIFO promotion logic, documentation updates, and test changes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/fix-originproducer-ordering-YuHdL
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/fix-originproducer-ordering-YuHdL

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/moq-lite/src/model/origin.rs (1)

687-738: ⚠️ Potential issue | 🟡 Minor

Test coverage gap: FIFO ordering with 3+ backups isn't actually verified.

test_duplicate only confirms that after the active closes, some backup (specifically broadcast3) is promoted — but broadcast2 was already dropped before broadcast1, so the FIFO-vs-LIFO distinction isn't exercised. If the implementation accidentally used pop() (LIFO) instead of remove(0) (FIFO), this test would still pass, because only one backup remains by the time the active closes.

Please add a case where 3 backups are queued, the active is dropped, and the 2nd-oldest (not the newest) is promoted — that's the assertion that actually pins down FIFO semantics.

Suggested additional test
#[tokio::test]
async fn test_duplicate_fifo_order() {
    let origin = Origin::produce();
    let b1 = Broadcast::produce();
    let b2 = Broadcast::produce();
    let b3 = Broadcast::produce();

    let mut consumer = origin.consume();
    origin.publish_broadcast("test", b1.consume());
    origin.publish_broadcast("test", b2.consume());
    origin.publish_broadcast("test", b3.consume());

    consumer.assert_next("test", &b1.consume());

    drop(b1);
    tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
    // FIFO: b2 should be promoted next, not b3.
    consumer.assert_next_none("test");
    consumer.assert_next("test", &b2.consume());

    drop(b2);
    tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
    consumer.assert_next_none("test");
    consumer.assert_next("test", &b3.consume());
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 687 - 738, The test doesn't
verify FIFO promotion when multiple backups exist; add a new async tokio test
(e.g., test_duplicate_fifo_order) that uses Origin::produce and
Broadcast::produce to publish three broadcasts via origin.publish_broadcast,
asserts the first (oldest) is announced via consumer.assert_next, then drop the
active and after a short sleep assert that the second-oldest (not the newest) is
promoted using consumer.assert_next_none and consumer.assert_next with the b2
consumer, and repeat to ensure b3 is promoted after dropping b2; reference
Origin::produce, Broadcast::produce, origin.publish_broadcast,
consumer.consume(), consumer.assert_next, consumer.assert_next_none and
tokio::time::sleep to locate places to add the test.
🧹 Nitpick comments (2)
rs/moq-lite/src/model/origin.rs (2)

365-371: Doc nit: clarify behavior on backup close while active is still live.

The updated docstring covers the happy path (queue as backup, promote oldest on close), but it's silent about what happens when a backup is dropped before being promoted — it's silently removed from the queue with no notification. Worth a sentence so consumers of this API don't assume every published broadcast will eventually surface.

Suggested wording
 	/// The broadcast will be unannounced when it is closed.
 	/// If there is already a broadcast with the same path, then the older broadcast remains active
 	/// and the new one is queued as a backup (no reannounce is triggered).
 	/// When the active broadcast closes, the oldest queued backup is promoted and reannounced.
+	/// If a queued backup is closed before being promoted, it is silently dropped from the queue.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 365 - 371, The docstring for
the broadcast publishing API (the comment above the publish_broadcast /
publish_broadcast_to_consumers method in origin.rs) omits behavior when a queued
backup is dropped before promotion; update the docstring to add a concise
sentence stating that backups removed from the queue before becoming active are
silently discarded with no reannounce/notification, so callers should not assume
every published broadcast will be promoted. Reference the existing "Publish a
broadcast..." docblock and append this clarification after the sentence about
promotion on close.

228-231: FIFO promotion: confirm oldest-wins is the desired policy, and consider VecDeque.

entry.backup.remove(0) is O(n) in the queue length, and Vec is the wrong data structure for FIFO. Since backup is append-on-publish / pop-front-on-promotion, VecDeque<BroadcastConsumer> with push_back / pop_front is a better fit and makes the FIFO intent explicit at the type level. In practice backups are usually tiny so perf isn't the concern — readability is.

Also worth double-checking against cluster.rs cross-origin merging: if both primary and secondary announce the same path near-simultaneously, the one that happened to publish first wins indefinitely until it closes, even if the other is "better" (e.g., local vs remote). Is oldest-wins the intended policy, or is newest-wins / a priority preferable here?

Proposed change
 struct OriginBroadcast {
 	path: PathOwned,
 	active: BroadcastConsumer,
-	backup: Vec<BroadcastConsumer>,
+	backup: std::collections::VecDeque<BroadcastConsumer>,
 }
-			existing.backup.push(broadcast.clone());
+			existing.backup.push_back(broadcast.clone());
-			if !entry.backup.is_empty() {
-				entry.active = entry.backup.remove(0);
+			if let Some(next) = entry.backup.pop_front() {
+				entry.active = next;
 				self.notify.lock().reannounce(full, &entry.active);
 			} else {

And update the position/remove lookup accordingly (VecDeque supports both).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 228 - 231, Change entry.backup
from Vec<BroadcastConsumer> to VecDeque<BroadcastConsumer> and replace the O(n)
remove(0) usage with pop_front (and use push_back when appending) in the
promotion code that assigns entry.active and calls
self.notify.lock().reannounce(full, &entry.active); update any position/remove
usages on entry.backup to VecDeque equivalents; also double-check the
cross-origin merge logic in cluster.rs (where primary/secondary announcement
ordering is resolved) to confirm that the current oldest-wins promotion policy
is intended versus newest-wins or a priority-based choice.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 148-150: The change unconditionally pushes clones into
existing.backup (OriginBroadcast.backup: Vec<BroadcastConsumer>) which can never
be promoted if the active BroadcastConsumer never closes and allows unbounded
growth; update the logic that handles new remote publishes (the code path that
does existing.backup.push(broadcast.clone())) to (1) enforce a bounded backup
queue (e.g. add a MAX_BACKUP constant and either drop oldest or reject new
backups) and (2) ensure the previous active broadcast is explicitly closed or
scheduled for termination when a new remote broadcast replaces it (so backups
can be promoted); update the OriginBroadcast type and any enqueue logic in
rs/moq-relay/src/cluster.rs to implement these safeguards and to document the
policy, and change the async test_duplicate to call tokio::time::pause() at the
start instead of relying on tokio::time::sleep().

---

Outside diff comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 687-738: The test doesn't verify FIFO promotion when multiple
backups exist; add a new async tokio test (e.g., test_duplicate_fifo_order) that
uses Origin::produce and Broadcast::produce to publish three broadcasts via
origin.publish_broadcast, asserts the first (oldest) is announced via
consumer.assert_next, then drop the active and after a short sleep assert that
the second-oldest (not the newest) is promoted using consumer.assert_next_none
and consumer.assert_next with the b2 consumer, and repeat to ensure b3 is
promoted after dropping b2; reference Origin::produce, Broadcast::produce,
origin.publish_broadcast, consumer.consume(), consumer.assert_next,
consumer.assert_next_none and tokio::time::sleep to locate places to add the
test.

---

Nitpick comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 365-371: The docstring for the broadcast publishing API (the
comment above the publish_broadcast / publish_broadcast_to_consumers method in
origin.rs) omits behavior when a queued backup is dropped before promotion;
update the docstring to add a concise sentence stating that backups removed from
the queue before becoming active are silently discarded with no
reannounce/notification, so callers should not assume every published broadcast
will be promoted. Reference the existing "Publish a broadcast..." docblock and
append this clarification after the sentence about promotion on close.
- Around line 228-231: Change entry.backup from Vec<BroadcastConsumer> to
VecDeque<BroadcastConsumer> and replace the O(n) remove(0) usage with pop_front
(and use push_back when appending) in the promotion code that assigns
entry.active and calls self.notify.lock().reannounce(full, &entry.active);
update any position/remove usages on entry.backup to VecDeque equivalents; also
double-check the cross-origin merge logic in cluster.rs (where primary/secondary
announcement ordering is resolved) to confirm that the current oldest-wins
promotion policy is intended versus newest-wins or a priority-based choice.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ac155c6a-9c63-49d3-ac5d-49975f072ae9

📥 Commits

Reviewing files that changed from the base of the PR and between d351360 and a405bf0.

📒 Files selected for processing (1)
  • rs/moq-lite/src/model/origin.rs

Comment thread rs/moq-lite/src/model/origin.rs Outdated
Comment on lines +148 to +150
// Keep the older broadcast active; queue the new one as a backup.
// This avoids reannouncing and potentially disrupting subscribers.
existing.backup.push(broadcast.clone());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find all call sites of publish_broadcast to reason about republish semantics.
rg -nP --type=rust -C3 '\bpublish_broadcast\s*\('

Repository: moq-dev/moq

Length of output: 26038


🏁 Script executed:

# Check the cluster.rs merge/remotes logic to see how broadcasts are managed
sed -n '200,230p' rs/moq-relay/src/cluster.rs

Repository: moq-dev/moq

Length of output: 1122


🏁 Script executed:

# Check the Origin/BroadcastState structure to see if there's a cap on the backup queue
rg -A 20 'struct.*BroadcastState\|struct.*Entry' rs/moq-lite/src/model/origin.rs | head -60

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Look at the publish_broadcast implementation to see if it closes old broadcasts
sed -n '365,430p' rs/moq-lite/src/model/origin.rs

Repository: moq-dev/moq

Length of output: 2324


🏁 Script executed:

# Look at the Entry struct and BroadcastState to understand the backup Vec
rg -B 5 -A 15 'struct Entry|backup:' rs/moq-lite/src/model/origin.rs | head -100

Repository: moq-dev/moq

Length of output: 1307


🏁 Script executed:

# Check the publish method that handles the queuing logic
sed -n '100,200p' rs/moq-lite/src/model/origin.rs

Repository: moq-dev/moq

Length of output: 3300


🏁 Script executed:

# Check the remove method that handles promotion of backups
sed -n '200,250p' rs/moq-lite/src/model/origin.rs

Repository: moq-dev/moq

Length of output: 1374


🏁 Script executed:

# Check if there are any limits or caps enforced on the backup Vec anywhere
rg -n 'backup.*len\|backup.*cap\|backup.*push.*limit\|MAX.*BACKUP' rs/moq-lite/src/model/origin.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Look at cluster.rs more carefully to see the flow of broadcasts in the merge/remotes functions
sed -n '195,275p' rs/moq-relay/src/cluster.rs

Repository: moq-dev/moq

Length of output: 2405


🏁 Script executed:

# Look at the test_duplicate test to understand expected behavior
sed -n '695,730p' rs/moq-lite/src/model/origin.rs

Repository: moq-dev/moq

Length of output: 1272


🏁 Script executed:

# Check if there are any concerns about async tests using sleep without pause()
sed -n '695,760p' rs/moq-lite/src/model/origin.rs

Repository: moq-dev/moq

Length of output: 2226


🏁 Script executed:

# Search for tokio::time::pause usage in the test file
rg -n 'tokio::time::pause\|tokio::time::sleep' rs/moq-lite/src/model/origin.rs

Repository: moq-dev/moq

Length of output: 37


Address silent backup queuing behavior and missing safeguards.

Verification confirms two significant concerns with this change:

  1. Old broadcasts may never close, leaving backups stuck forever. The relay cluster code (rs/moq-relay/src/cluster.rs, lines 207–217) publishes new broadcasts from remote origins without closing the previous one. If the active broadcast closes only when the producer intentionally shuts down (and a remote producer leaks or never disconnects), backups accumulate without ever being promoted.

  2. Unbounded backup queue growth. The backup: Vec<BroadcastConsumer> in OriginBroadcast has no cap. A hostile or misbehaving publisher can queue broadcasts without limit on a single path.

Additionally, the test_duplicate test uses tokio::time::sleep() to wait for async spawned tasks. Per coding guidelines, async tests that sleep should call tokio::time::pause() at the start to simulate time instantly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 148 - 150, The change
unconditionally pushes clones into existing.backup (OriginBroadcast.backup:
Vec<BroadcastConsumer>) which can never be promoted if the active
BroadcastConsumer never closes and allows unbounded growth; update the logic
that handles new remote publishes (the code path that does
existing.backup.push(broadcast.clone())) to (1) enforce a bounded backup queue
(e.g. add a MAX_BACKUP constant and either drop oldest or reject new backups)
and (2) ensure the previous active broadcast is explicitly closed or scheduled
for termination when a new remote broadcast replaces it (so backups can be
promoted); update the OriginBroadcast type and any enqueue logic in
rs/moq-relay/src/cluster.rs to implement these safeguards and to document the
policy, and change the async test_duplicate to call tokio::time::pause() at the
start instead of relying on tokio::time::sleep().

- Switch OriginBroadcast.backup to VecDeque for O(1) FIFO promotion via
  pop_front/push_back.
- Clarify publish_broadcast docstring: backups closed before promotion
  are silently discarded.
- Add test_duplicate_fifo_order to verify that when multiple backups
  are queued, promotion honors publish order (oldest-first).
- Pause tokio time in async tests that sleep so they run deterministically
  without real delays.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kixelated kixelated enabled auto-merge (squash) April 17, 2026 02:11
@kixelated kixelated merged commit 571d044 into main Apr 17, 2026
2 checks passed
@kixelated kixelated deleted the claude/fix-originproducer-ordering-YuHdL branch April 17, 2026 02:33
This was referenced Apr 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants