Conversation
When a new broadcast is published to a path that already has one, keep the older broadcast active and queue the new one as a backup instead of reannouncing. This avoids disrupting subscribers with reannounces when duplicate publishers arrive (e.g. reconnects). When the active broadcast closes, the oldest remaining backup is promoted (and reannounced) to preserve the "prefer older" ordering.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughDuplicate-broadcast handling in the origin module was changed: when multiple broadcasts are published to the same path, the active broadcast is retained and subsequent publishes are enqueued as FIFO backups. On active broadcast removal, the oldest queued backup is promoted and reannounced; if no backups remain, the entry is unannounced. 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-lite/src/model/origin.rs (1)
687-738:⚠️ Potential issue | 🟡 MinorTest coverage gap: FIFO ordering with 3+ backups isn't actually verified.
test_duplicateonly confirms that after the active closes, some backup (specificallybroadcast3) is promoted — butbroadcast2was already dropped beforebroadcast1, so the FIFO-vs-LIFO distinction isn't exercised. If the implementation accidentally usedpop()(LIFO) instead ofremove(0)(FIFO), this test would still pass, because only one backup remains by the time the active closes.Please add a case where 3 backups are queued, the active is dropped, and the 2nd-oldest (not the newest) is promoted — that's the assertion that actually pins down FIFO semantics.
Suggested additional test
#[tokio::test] async fn test_duplicate_fifo_order() { let origin = Origin::produce(); let b1 = Broadcast::produce(); let b2 = Broadcast::produce(); let b3 = Broadcast::produce(); let mut consumer = origin.consume(); origin.publish_broadcast("test", b1.consume()); origin.publish_broadcast("test", b2.consume()); origin.publish_broadcast("test", b3.consume()); consumer.assert_next("test", &b1.consume()); drop(b1); tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; // FIFO: b2 should be promoted next, not b3. consumer.assert_next_none("test"); consumer.assert_next("test", &b2.consume()); drop(b2); tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; consumer.assert_next_none("test"); consumer.assert_next("test", &b3.consume()); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/origin.rs` around lines 687 - 738, The test doesn't verify FIFO promotion when multiple backups exist; add a new async tokio test (e.g., test_duplicate_fifo_order) that uses Origin::produce and Broadcast::produce to publish three broadcasts via origin.publish_broadcast, asserts the first (oldest) is announced via consumer.assert_next, then drop the active and after a short sleep assert that the second-oldest (not the newest) is promoted using consumer.assert_next_none and consumer.assert_next with the b2 consumer, and repeat to ensure b3 is promoted after dropping b2; reference Origin::produce, Broadcast::produce, origin.publish_broadcast, consumer.consume(), consumer.assert_next, consumer.assert_next_none and tokio::time::sleep to locate places to add the test.
🧹 Nitpick comments (2)
rs/moq-lite/src/model/origin.rs (2)
365-371: Doc nit: clarify behavior on backup close while active is still live.The updated docstring covers the happy path (queue as backup, promote oldest on close), but it's silent about what happens when a backup is dropped before being promoted — it's silently removed from the queue with no notification. Worth a sentence so consumers of this API don't assume every published broadcast will eventually surface.
Suggested wording
/// The broadcast will be unannounced when it is closed. /// If there is already a broadcast with the same path, then the older broadcast remains active /// and the new one is queued as a backup (no reannounce is triggered). /// When the active broadcast closes, the oldest queued backup is promoted and reannounced. + /// If a queued backup is closed before being promoted, it is silently dropped from the queue.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/origin.rs` around lines 365 - 371, The docstring for the broadcast publishing API (the comment above the publish_broadcast / publish_broadcast_to_consumers method in origin.rs) omits behavior when a queued backup is dropped before promotion; update the docstring to add a concise sentence stating that backups removed from the queue before becoming active are silently discarded with no reannounce/notification, so callers should not assume every published broadcast will be promoted. Reference the existing "Publish a broadcast..." docblock and append this clarification after the sentence about promotion on close.
228-231: FIFO promotion: confirm oldest-wins is the desired policy, and considerVecDeque.
entry.backup.remove(0)is O(n) in the queue length, andVecis the wrong data structure for FIFO. Sincebackupis append-on-publish / pop-front-on-promotion,VecDeque<BroadcastConsumer>withpush_back/pop_frontis a better fit and makes the FIFO intent explicit at the type level. In practice backups are usually tiny so perf isn't the concern — readability is.Also worth double-checking against
cluster.rscross-origin merging: if both primary and secondary announce the same path near-simultaneously, the one that happened to publish first wins indefinitely until it closes, even if the other is "better" (e.g., local vs remote). Is oldest-wins the intended policy, or is newest-wins / a priority preferable here?Proposed change
struct OriginBroadcast { path: PathOwned, active: BroadcastConsumer, - backup: Vec<BroadcastConsumer>, + backup: std::collections::VecDeque<BroadcastConsumer>, }- existing.backup.push(broadcast.clone()); + existing.backup.push_back(broadcast.clone());- if !entry.backup.is_empty() { - entry.active = entry.backup.remove(0); + if let Some(next) = entry.backup.pop_front() { + entry.active = next; self.notify.lock().reannounce(full, &entry.active); } else {And update the
position/removelookup accordingly (VecDeque supports both).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/origin.rs` around lines 228 - 231, Change entry.backup from Vec<BroadcastConsumer> to VecDeque<BroadcastConsumer> and replace the O(n) remove(0) usage with pop_front (and use push_back when appending) in the promotion code that assigns entry.active and calls self.notify.lock().reannounce(full, &entry.active); update any position/remove usages on entry.backup to VecDeque equivalents; also double-check the cross-origin merge logic in cluster.rs (where primary/secondary announcement ordering is resolved) to confirm that the current oldest-wins promotion policy is intended versus newest-wins or a priority-based choice.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 148-150: The change unconditionally pushes clones into
existing.backup (OriginBroadcast.backup: Vec<BroadcastConsumer>) which can never
be promoted if the active BroadcastConsumer never closes and allows unbounded
growth; update the logic that handles new remote publishes (the code path that
does existing.backup.push(broadcast.clone())) to (1) enforce a bounded backup
queue (e.g. add a MAX_BACKUP constant and either drop oldest or reject new
backups) and (2) ensure the previous active broadcast is explicitly closed or
scheduled for termination when a new remote broadcast replaces it (so backups
can be promoted); update the OriginBroadcast type and any enqueue logic in
rs/moq-relay/src/cluster.rs to implement these safeguards and to document the
policy, and change the async test_duplicate to call tokio::time::pause() at the
start instead of relying on tokio::time::sleep().
---
Outside diff comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 687-738: The test doesn't verify FIFO promotion when multiple
backups exist; add a new async tokio test (e.g., test_duplicate_fifo_order) that
uses Origin::produce and Broadcast::produce to publish three broadcasts via
origin.publish_broadcast, asserts the first (oldest) is announced via
consumer.assert_next, then drop the active and after a short sleep assert that
the second-oldest (not the newest) is promoted using consumer.assert_next_none
and consumer.assert_next with the b2 consumer, and repeat to ensure b3 is
promoted after dropping b2; reference Origin::produce, Broadcast::produce,
origin.publish_broadcast, consumer.consume(), consumer.assert_next,
consumer.assert_next_none and tokio::time::sleep to locate places to add the
test.
---
Nitpick comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 365-371: The docstring for the broadcast publishing API (the
comment above the publish_broadcast / publish_broadcast_to_consumers method in
origin.rs) omits behavior when a queued backup is dropped before promotion;
update the docstring to add a concise sentence stating that backups removed from
the queue before becoming active are silently discarded with no
reannounce/notification, so callers should not assume every published broadcast
will be promoted. Reference the existing "Publish a broadcast..." docblock and
append this clarification after the sentence about promotion on close.
- Around line 228-231: Change entry.backup from Vec<BroadcastConsumer> to
VecDeque<BroadcastConsumer> and replace the O(n) remove(0) usage with pop_front
(and use push_back when appending) in the promotion code that assigns
entry.active and calls self.notify.lock().reannounce(full, &entry.active);
update any position/remove usages on entry.backup to VecDeque equivalents; also
double-check the cross-origin merge logic in cluster.rs (where primary/secondary
announcement ordering is resolved) to confirm that the current oldest-wins
promotion policy is intended versus newest-wins or a priority-based choice.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ac155c6a-9c63-49d3-ac5d-49975f072ae9
📒 Files selected for processing (1)
rs/moq-lite/src/model/origin.rs
| // Keep the older broadcast active; queue the new one as a backup. | ||
| // This avoids reannouncing and potentially disrupting subscribers. | ||
| existing.backup.push(broadcast.clone()); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all call sites of publish_broadcast to reason about republish semantics.
rg -nP --type=rust -C3 '\bpublish_broadcast\s*\('Repository: moq-dev/moq
Length of output: 26038
🏁 Script executed:
# Check the cluster.rs merge/remotes logic to see how broadcasts are managed
sed -n '200,230p' rs/moq-relay/src/cluster.rsRepository: moq-dev/moq
Length of output: 1122
🏁 Script executed:
# Check the Origin/BroadcastState structure to see if there's a cap on the backup queue
rg -A 20 'struct.*BroadcastState\|struct.*Entry' rs/moq-lite/src/model/origin.rs | head -60Repository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Look at the publish_broadcast implementation to see if it closes old broadcasts
sed -n '365,430p' rs/moq-lite/src/model/origin.rsRepository: moq-dev/moq
Length of output: 2324
🏁 Script executed:
# Look at the Entry struct and BroadcastState to understand the backup Vec
rg -B 5 -A 15 'struct Entry|backup:' rs/moq-lite/src/model/origin.rs | head -100Repository: moq-dev/moq
Length of output: 1307
🏁 Script executed:
# Check the publish method that handles the queuing logic
sed -n '100,200p' rs/moq-lite/src/model/origin.rsRepository: moq-dev/moq
Length of output: 3300
🏁 Script executed:
# Check the remove method that handles promotion of backups
sed -n '200,250p' rs/moq-lite/src/model/origin.rsRepository: moq-dev/moq
Length of output: 1374
🏁 Script executed:
# Check if there are any limits or caps enforced on the backup Vec anywhere
rg -n 'backup.*len\|backup.*cap\|backup.*push.*limit\|MAX.*BACKUP' rs/moq-lite/src/model/origin.rsRepository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Look at cluster.rs more carefully to see the flow of broadcasts in the merge/remotes functions
sed -n '195,275p' rs/moq-relay/src/cluster.rsRepository: moq-dev/moq
Length of output: 2405
🏁 Script executed:
# Look at the test_duplicate test to understand expected behavior
sed -n '695,730p' rs/moq-lite/src/model/origin.rsRepository: moq-dev/moq
Length of output: 1272
🏁 Script executed:
# Check if there are any concerns about async tests using sleep without pause()
sed -n '695,760p' rs/moq-lite/src/model/origin.rsRepository: moq-dev/moq
Length of output: 2226
🏁 Script executed:
# Search for tokio::time::pause usage in the test file
rg -n 'tokio::time::pause\|tokio::time::sleep' rs/moq-lite/src/model/origin.rsRepository: moq-dev/moq
Length of output: 37
Address silent backup queuing behavior and missing safeguards.
Verification confirms two significant concerns with this change:
-
Old broadcasts may never close, leaving backups stuck forever. The relay cluster code (
rs/moq-relay/src/cluster.rs, lines 207–217) publishes new broadcasts from remote origins without closing the previous one. If the active broadcast closes only when the producer intentionally shuts down (and a remote producer leaks or never disconnects), backups accumulate without ever being promoted. -
Unbounded backup queue growth. The
backup: Vec<BroadcastConsumer>inOriginBroadcasthas no cap. A hostile or misbehaving publisher can queue broadcasts without limit on a single path.
Additionally, the test_duplicate test uses tokio::time::sleep() to wait for async spawned tasks. Per coding guidelines, async tests that sleep should call tokio::time::pause() at the start to simulate time instantly.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/model/origin.rs` around lines 148 - 150, The change
unconditionally pushes clones into existing.backup (OriginBroadcast.backup:
Vec<BroadcastConsumer>) which can never be promoted if the active
BroadcastConsumer never closes and allows unbounded growth; update the logic
that handles new remote publishes (the code path that does
existing.backup.push(broadcast.clone())) to (1) enforce a bounded backup queue
(e.g. add a MAX_BACKUP constant and either drop oldest or reject new backups)
and (2) ensure the previous active broadcast is explicitly closed or scheduled
for termination when a new remote broadcast replaces it (so backups can be
promoted); update the OriginBroadcast type and any enqueue logic in
rs/moq-relay/src/cluster.rs to implement these safeguards and to document the
policy, and change the async test_duplicate to call tokio::time::pause() at the
start instead of relying on tokio::time::sleep().
- Switch OriginBroadcast.backup to VecDeque for O(1) FIFO promotion via pop_front/push_back. - Clarify publish_broadcast docstring: backups closed before promotion are silently discarded. - Add test_duplicate_fifo_order to verify that when multiple backups are queued, promotion honors publish order (oldest-first). - Pause tokio time in async tests that sleep so they run deterministically without real delays. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
This PR changes how the origin node handles multiple broadcasts on the same path. Instead of immediately replacing the active broadcast and reannouncing when a new one is published, the new broadcast is now queued as a backup. The active broadcast only changes when the current one is closed, at which point the oldest queued backup is promoted and reannounced.
Key Changes
remove(0)instead ofpop()) is promoted to active and reannounced, rather than the most recently added onepublish_broadcastmethod documentation now accurately describes the new queuing behavior and promotion strategyImplementation Details
https://claude.ai/code/session_01GmwjbStTAmsRUaNYjkapxn