Skip to content

Fix batch API executor#2003

Open
SessionHero01 wants to merge 5 commits intodevfrom
batch-fix
Open

Fix batch API executor#2003
SessionHero01 wants to merge 5 commits intodevfrom
batch-fix

Conversation

@SessionHero01
Copy link
Collaborator

There are a few issues with this class:

  1. Incorrect batch size check on getting response
  2. Potentially dangerous time source used for batch management
  3. Potential issue sending batch result back risking other batch tasks getting stuck.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses correctness and reliability issues in the coroutine-based BatchApiExecutor by fixing batch response sizing, using a monotonic clock for deadline management, and avoiding potentially blocking result delivery back to individual requests.

Changes:

  • Replace Instant.now()-based batching deadlines with TimeSource.Monotonic (ComparableTimeMark) and use select { onTimeout } for deadline handling.
  • Fix the batch response size check and send per-request results using trySend to avoid blocking the batch worker.
  • Adjust PersistentLogger bulk-receive behavior to drain only buffered channel items via tryReceive().

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
app/src/main/java/org/thoughtcrime/securesms/api/batch/BatchApiExecutor.kt Improves batching deadline correctness (monotonic time), response-size validation, and non-blocking callback delivery + loop resilience.
app/src/main/java/org/thoughtcrime/securesms/logging/PersistentLogger.kt Changes log channel draining strategy and adds (currently unused) imports.
Comments suppressed due to low confidence (1)

app/src/main/java/org/thoughtcrime/securesms/api/batch/BatchApiExecutor.kt:229

  • send() creates a one-shot callback channel, but it is never closed/cancelled after receiving (or on cancellation). With the new non-suspending trySend in executeBatch, a cancelled caller can leave an undrained element sitting in the channel buffer, and the channel itself stays open longer than needed. Consider closing/cancelling the callback in a finally after receive(), and also cancelling it in the CancellationException path, so batch execution will reliably drop results for cancelled callers and avoid unnecessary buffering.
        val callback = Channel<Result<*>>(capacity = 1)
        batchCommandSender.send(BatchCommand.Send(
            ctx = ctx,
            batchKey = batchKey,
            req = req,
            callback = callback
        ))

        try {
            @Suppress("UNCHECKED_CAST")
            return callback.receive().getOrThrow() as Res
        } catch (e: CancellationException) {
            // Best effort cancellation
            batchCommandSender.trySend(BatchCommand.Cancel(batchKey, req))

            throw e
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 163 to 169
private suspend fun ReceiveChannel<LogEntry>.receiveBulkLogs(out: MutableList<LogEntry>) {
out += receive()

withTimeoutOrNull(500.milliseconds) {
repeat(15) {
out += receiveCatching().getOrNull() ?: return@repeat
}
// We may have many items cached in the channel, try to receive up to 15 items
repeat(15) {
out += tryReceive().getOrNull() ?: return@repeat
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description is focused on fixing BatchApiExecutor, but this PR also changes PersistentLogger.receiveBulkLogs() behavior. If this change is intentional, it would help to call it out in the PR description (or split it into a separate PR) so reviewers can evaluate the logging behavior change independently.

Copilot uses AI. Check for mistakes.
Comment on lines 163 to 169
private suspend fun ReceiveChannel<LogEntry>.receiveBulkLogs(out: MutableList<LogEntry>) {
out += receive()

withTimeoutOrNull(500.milliseconds) {
repeat(15) {
out += receiveCatching().getOrNull() ?: return@repeat
}
// We may have many items cached in the channel, try to receive up to 15 items
repeat(15) {
out += tryReceive().getOrNull() ?: return@repeat
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiveBulkLogs no longer waits briefly to accumulate more log entries (previously up to 500ms). Given bulkWrite() flushes every time, this change can significantly increase flush frequency and I/O when logs are trickling in (often writing 1 entry at a time). Consider restoring a short timed window (e.g., keep draining with tryReceive() but also wait with a timeout/select for additional entries) to preserve batching behavior.

Copilot uses AI. Check for mistakes.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants