Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses correctness and reliability issues in the coroutine-based BatchApiExecutor by fixing batch response sizing, using a monotonic clock for deadline management, and avoiding potentially blocking result delivery back to individual requests.
Changes:
- Replace
Instant.now()-based batching deadlines withTimeSource.Monotonic(ComparableTimeMark) and useselect { onTimeout }for deadline handling. - Fix the batch response size check and send per-request results using
trySendto avoid blocking the batch worker. - Adjust
PersistentLoggerbulk-receive behavior to drain only buffered channel items viatryReceive().
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| app/src/main/java/org/thoughtcrime/securesms/api/batch/BatchApiExecutor.kt | Improves batching deadline correctness (monotonic time), response-size validation, and non-blocking callback delivery + loop resilience. |
| app/src/main/java/org/thoughtcrime/securesms/logging/PersistentLogger.kt | Changes log channel draining strategy and adds (currently unused) imports. |
Comments suppressed due to low confidence (1)
app/src/main/java/org/thoughtcrime/securesms/api/batch/BatchApiExecutor.kt:229
send()creates a one-shotcallbackchannel, but it is never closed/cancelled after receiving (or on cancellation). With the new non-suspendingtrySendinexecuteBatch, a cancelled caller can leave an undrained element sitting in the channel buffer, and the channel itself stays open longer than needed. Consider closing/cancelling thecallbackin afinallyafterreceive(), and also cancelling it in theCancellationExceptionpath, so batch execution will reliably drop results for cancelled callers and avoid unnecessary buffering.
val callback = Channel<Result<*>>(capacity = 1)
batchCommandSender.send(BatchCommand.Send(
ctx = ctx,
batchKey = batchKey,
req = req,
callback = callback
))
try {
@Suppress("UNCHECKED_CAST")
return callback.receive().getOrThrow() as Res
} catch (e: CancellationException) {
// Best effort cancellation
batchCommandSender.trySend(BatchCommand.Cancel(batchKey, req))
throw e
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private suspend fun ReceiveChannel<LogEntry>.receiveBulkLogs(out: MutableList<LogEntry>) { | ||
| out += receive() | ||
|
|
||
| withTimeoutOrNull(500.milliseconds) { | ||
| repeat(15) { | ||
| out += receiveCatching().getOrNull() ?: return@repeat | ||
| } | ||
| // We may have many items cached in the channel, try to receive up to 15 items | ||
| repeat(15) { | ||
| out += tryReceive().getOrNull() ?: return@repeat | ||
| } |
There was a problem hiding this comment.
PR description is focused on fixing BatchApiExecutor, but this PR also changes PersistentLogger.receiveBulkLogs() behavior. If this change is intentional, it would help to call it out in the PR description (or split it into a separate PR) so reviewers can evaluate the logging behavior change independently.
app/src/main/java/org/thoughtcrime/securesms/logging/PersistentLogger.kt
Outdated
Show resolved
Hide resolved
app/src/main/java/org/thoughtcrime/securesms/logging/PersistentLogger.kt
Outdated
Show resolved
Hide resolved
app/src/main/java/org/thoughtcrime/securesms/logging/PersistentLogger.kt
Outdated
Show resolved
Hide resolved
| private suspend fun ReceiveChannel<LogEntry>.receiveBulkLogs(out: MutableList<LogEntry>) { | ||
| out += receive() | ||
|
|
||
| withTimeoutOrNull(500.milliseconds) { | ||
| repeat(15) { | ||
| out += receiveCatching().getOrNull() ?: return@repeat | ||
| } | ||
| // We may have many items cached in the channel, try to receive up to 15 items | ||
| repeat(15) { | ||
| out += tryReceive().getOrNull() ?: return@repeat | ||
| } |
There was a problem hiding this comment.
receiveBulkLogs no longer waits briefly to accumulate more log entries (previously up to 500ms). Given bulkWrite() flushes every time, this change can significantly increase flush frequency and I/O when logs are trickling in (often writing 1 entry at a time). Consider restoring a short timed window (e.g., keep draining with tryReceive() but also wait with a timeout/select for additional entries) to preserve batching behavior.
app/src/main/java/org/thoughtcrime/securesms/api/batch/BatchApiExecutor.kt
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There are a few issues with this class: