From 12c3bf1d8d78a760934b328db56aff9fa532c2ed Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 18 Jun 2026 17:00:11 +1200 Subject: [PATCH] refactor(transaction): own connection recovery in the adapter, drop Swoole PDOProxy dependency The Swoole PDOProxy keeps its own transaction counter that is incremented on beginTransaction but only decremented on a *successful* commit/rollback, and is never reset on reconnect or on pool checkin (utopia-php/pools does not call its reset() hook). A connection lost mid-transaction therefore leaks the counter and poisons the pooled connection: every later startTransaction trusts the stale counter, rolls back a transaction the real connection no longer holds, and fails with "There is no active transaction". This produced a sustained write outage across all projects on cloud nyc3. Make the library self-sufficient for connection-loss recovery so consumers no longer need to wrap connections in a Swoole PDOProxy: - PDOStatement wraps prepared statements and transparently re-prepares on the reconnected PDO when the connection is lost at execution time, replaying bound params/attributes. Recovery is skipped inside a transaction, where it rethrows so withTransaction can replay the whole transaction from the start. - PDO::prepare() returns the wrapper; prepareNative() re-prepares raw on the reconnected connection. ERRMODE_EXCEPTION is enforced by default. - withTransaction() reconnects on a lost connection before replaying, so the retry runs on a fresh, transaction-less connection. - Transaction state now has a single source of truth (the real PDO via Utopia\Database\PDO::inTransaction()); there is no separate counter to desync. - Replace the Swoole\Database\PDOStatementProxy type hints in the SQL adapters. Stacked on #895. Co-Authored-By: Claude Opus 4.8 --- src/Database/Adapter/Postgres.php | 6 +- src/Database/Adapter/SQL.php | 6 +- src/Database/Adapter/SQLite.php | 6 +- src/Database/PDO.php | 48 +++++++ src/Database/PDOStatement.php | 219 +++++++++++++++++++++++++++++ tests/unit/PDOStatementTest.php | 224 ++++++++++++++++++++++++++++++ tests/unit/PDOTest.php | 37 ++++- 7 files changed, 536 insertions(+), 10 deletions(-) create mode 100644 src/Database/PDOStatement.php create mode 100644 tests/unit/PDOStatementTest.php diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index dbe41420c..628e978ca 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -5,7 +5,6 @@ use Exception; use PDO; use PDOException; -use Swoole\Database\PDOStatementProxy; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Exception as DatabaseException; @@ -18,6 +17,7 @@ use Utopia\Database\Exception\Truncate as TruncateException; use Utopia\Database\Helpers\ID; use Utopia\Database\Operator; +use Utopia\Database\PDOStatement; use Utopia\Database\Query; /** @@ -2764,12 +2764,12 @@ protected function getOperatorSQL(string $column, Operator $operator, int &$bind * Bind operator parameters to statement * Override to handle PostgreSQL-specific JSON binding * - * @param \PDOStatement|PDOStatementProxy $stmt + * @param \PDOStatement|PDOStatement $stmt * @param Operator $operator * @param int &$bindIndex * @return void */ - protected function bindOperatorParams(\PDOStatement|PDOStatementProxy $stmt, Operator $operator, int &$bindIndex): void + protected function bindOperatorParams(\PDOStatement|PDOStatement $stmt, Operator $operator, int &$bindIndex): void { $method = $operator->getMethod(); $values = $operator->getValues(); diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index cf6822bc2..f10e3d700 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -4,7 +4,6 @@ use Exception; use PDOException; -use Swoole\Database\PDOStatementProxy; use Utopia\Database\Adapter; use Utopia\Database\Change; use Utopia\Database\Database; @@ -16,6 +15,7 @@ use Utopia\Database\Exception\Timeout as TimeoutException; use Utopia\Database\Exception\Transaction as TransactionException; use Utopia\Database\Operator; +use Utopia\Database\PDOStatement; use Utopia\Database\Query; abstract class SQL extends Adapter @@ -1978,12 +1978,12 @@ abstract protected function getOperatorSQL(string $column, Operator $operator, i /** * Bind operator parameters to prepared statement * - * @param \PDOStatement|PDOStatementProxy $stmt + * @param \PDOStatement|PDOStatement $stmt * @param \Utopia\Database\Operator $operator * @param int &$bindIndex * @return void */ - protected function bindOperatorParams(\PDOStatement|PDOStatementProxy $stmt, Operator $operator, int &$bindIndex): void + protected function bindOperatorParams(\PDOStatement|PDOStatement $stmt, Operator $operator, int &$bindIndex): void { $method = $operator->getMethod(); $values = $operator->getValues(); diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index eb7e92b5b..0f30d5ab4 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -5,7 +5,6 @@ use Exception; use PDO; use PDOException; -use Swoole\Database\PDOStatementProxy; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Exception as DatabaseException; @@ -18,6 +17,7 @@ use Utopia\Database\Exception\Truncate as TruncateException; use Utopia\Database\Helpers\ID; use Utopia\Database\Operator; +use Utopia\Database\PDOStatement; use Utopia\Database\Query; /** @@ -2002,12 +2002,12 @@ private function getSupportForMathFunctions(): bool * Bind operator parameters to statement * Override to handle SQLite-specific operator bindings * - * @param \PDOStatement|PDOStatementProxy $stmt + * @param \PDOStatement|PDOStatement $stmt * @param Operator $operator * @param int &$bindIndex * @return void */ - protected function bindOperatorParams(\PDOStatement|PDOStatementProxy $stmt, Operator $operator, int &$bindIndex): void + protected function bindOperatorParams(\PDOStatement|PDOStatement $stmt, Operator $operator, int &$bindIndex): void { $method = $operator->getMethod(); diff --git a/src/Database/PDO.php b/src/Database/PDO.php index df6f7a9d6..6981ca7cb 100644 --- a/src/Database/PDO.php +++ b/src/Database/PDO.php @@ -26,6 +26,8 @@ public function __construct( protected ?string $password, protected array $config = [] ) { + $this->config[\PDO::ATTR_ERRMODE] ??= \PDO::ERRMODE_EXCEPTION; + $this->pdo = new \PDO( $this->dsn, $this->username, @@ -34,6 +36,52 @@ public function __construct( ); } + /** + * Prepare a statement, returning a wrapper that transparently re-prepares + * itself on the underlying connection if that connection is lost before the + * statement is executed. + * + * @param array $options + * @throws \Throwable + */ + public function prepare(string $query, array $options = []): PDOStatement + { + return new PDOStatement($this, $this->prepareNative($query, $options), $query, $options); + } + + /** + * Prepare a raw \PDOStatement on the underlying connection, reconnecting + * once if a stale connection surfaces during prepare. Used by + * {@see PDOStatement} to re-prepare after a reconnect without re-wrapping. + * + * Under emulated prepares this never reaches the server (so the loss + * surfaces at execution time instead and is recovered by {@see PDOStatement}); + * with native prepares the server is contacted here, so a lost connection + * outside a transaction is reconnected and retried, matching __call(). + * + * @param array $options + * @throws \Throwable + */ + public function prepareNative(string $query, array $options = []): \PDOStatement + { + try { + $statement = $this->pdo->prepare($query, $options); + } catch (\Throwable $e) { + if (!Connection::hasError($e) || $this->pdo->inTransaction()) { + throw $e; + } + + $this->reconnect(); + $statement = $this->pdo->prepare($query, $options); + } + + if ($statement === false) { + throw new \PDOException("Failed to prepare statement: {$query}"); + } + + return $statement; + } + /** * @param string $method * @param array $args diff --git a/src/Database/PDOStatement.php b/src/Database/PDOStatement.php new file mode 100644 index 000000000..5dbfc4d68 --- /dev/null +++ b/src/Database/PDOStatement.php @@ -0,0 +1,219 @@ + + */ +class PDOStatement implements \IteratorAggregate +{ + /** + * @var array + */ + private array $values = []; + + /** + * @var array + */ + private array $params = []; + + /** + * The order bindValue()/bindParam() were called, so a placeholder rebound + * across methods replays with the last binding winning, as PDO applies it. + * + * @var array + */ + private array $bindOrder = []; + + /** + * @var array + */ + private array $columns = []; + + /** + * @var array + */ + private array $attributes = []; + + /** + * @var array|null + */ + private ?array $fetchMode = null; + + /** + * @param array $options + */ + public function __construct( + private readonly PDO $pdo, + private \PDOStatement $statement, + private readonly string $query, + private readonly array $options = [], + ) { + } + + public function __get(string $name): mixed + { + return $this->statement->{$name}; + } + + public function __set(string $name, mixed $value): void + { + $this->statement->{$name} = $value; + } + + public function __isset(string $name): bool + { + return isset($this->statement->{$name}); + } + + public function __unset(string $name): void + { + unset($this->statement->{$name}); + } + + public function __clone(): void + { + throw new \Error('Trying to clone an uncloneable PDOStatement'); + } + + /** + * Preserve \PDOStatement's native iterability (foreach over rows), which + * does not route through __call(). + */ + public function getIterator(): \Traversable + { + return $this->statement; + } + + /** + * @param array $args + * @throws \Throwable + */ + public function __call(string $method, array $args): mixed + { + try { + return $this->statement->{$method}(...$args); + } catch (\Throwable $e) { + if ( + \strcasecmp($method, 'execute') !== 0 + || $this->pdo->inTransaction() + || !Connection::hasError($e) + ) { + throw $e; + } + + Console::warning('[Database] ' . $e->getMessage()); + Console::warning('[Database] Lost connection detected. Re-preparing statement...'); + + $this->reprepare(); + + return $this->statement->{$method}(...$args); + } + } + + public function getStatement(): \PDOStatement + { + return $this->statement; + } + + public function setAttribute(int $attribute, mixed $value): bool + { + $this->attributes[$attribute] = $value; + + return $this->statement->setAttribute($attribute, $value); + } + + public function setFetchMode(int $mode, mixed ...$args): bool + { + $this->fetchMode = [$mode, ...$args]; + + return $this->statement->setFetchMode($mode, ...$args); + } + + public function bindValue(int|string $param, mixed $value, int $type = \PDO::PARAM_STR): bool + { + $this->values[$param] = [$value, $type]; + $this->bindOrder[] = ['value', $param]; + + return $this->statement->bindValue($param, $value, $type); + } + + public function bindParam(int|string $param, mixed &$variable, int $type = \PDO::PARAM_STR, int $maxLength = 0, mixed $driverOptions = null): bool + { + // Store the variable by reference so a value changed between bind and + // execute is the value replayed after a reconnect (PDO binds late). + $this->params[$param] = [&$variable, $type, $maxLength, $driverOptions]; + $this->bindOrder[] = ['param', $param]; + + return $this->statement->bindParam($param, $variable, $type, $maxLength, $driverOptions); + } + + public function bindColumn(int|string $column, mixed &$variable, ?int $type = null, ?int $maxLength = null, mixed $driverOptions = null): bool + { + // Record how many optional arguments were actually supplied so omitted + // ones keep PDO's real defaults instead of being replayed as explicit + // nulls (which would change the call contract / emit deprecations). + $arity = \func_num_args(); + $this->columns[$column] = [&$variable, $arity, $type, $maxLength, $driverOptions]; + + return $this->bindColumnTo($this->statement, $column, $variable, $arity, $type, $maxLength, $driverOptions); + } + + private function reprepare(): void + { + $this->pdo->reconnect(); + $this->statement = $this->pdo->prepareNative($this->query, $this->options); + + foreach ($this->attributes as $attribute => $value) { + $this->statement->setAttribute($attribute, $value); + } + + if ($this->fetchMode !== null) { + $this->statement->setFetchMode(...$this->fetchMode); + } + + // Replay value/param bindings in the original call order so a placeholder + // rebound across methods ends up with the binding the caller applied last. + foreach ($this->bindOrder as [$kind, $key]) { + if ($kind === 'value') { + [$value, $type] = $this->values[$key]; + $this->statement->bindValue($key, $value, $type); + } else { + $bind = $this->params[$key]; + $this->statement->bindParam($key, $bind[0], $bind[1], $bind[2], $bind[3]); + } + } + + foreach ($this->columns as $column => $bind) { + $this->bindColumnTo($this->statement, $column, $bind[0], $bind[1], $bind[2], $bind[3], $bind[4]); + } + } + + /** + * Forward bindColumn passing only the optional arguments the caller + * supplied ($arity counts column + variable + supplied options). + */ + private function bindColumnTo(\PDOStatement $statement, int|string $column, mixed &$variable, int $arity, ?int $type = null, ?int $maxLength = null, mixed $driverOptions = null): bool + { + return match (true) { + $arity <= 2 => $statement->bindColumn($column, $variable), + $arity === 3 => $statement->bindColumn($column, $variable, $type ?? \PDO::PARAM_STR), + $arity === 4 => $statement->bindColumn($column, $variable, $type ?? \PDO::PARAM_STR, $maxLength ?? 0), + default => $statement->bindColumn($column, $variable, $type ?? \PDO::PARAM_STR, $maxLength ?? 0, $driverOptions), + }; + } +} diff --git a/tests/unit/PDOStatementTest.php b/tests/unit/PDOStatementTest.php new file mode 100644 index 000000000..8bd8f280b --- /dev/null +++ b/tests/unit/PDOStatementTest.php @@ -0,0 +1,224 @@ +getMockBuilder(PDO::class) + ->disableOriginalConstructor() + ->onlyMethods(['reconnect', 'prepareNative']) + ->addMethods(['inTransaction']) + ->getMock(); + + $pdo->method('inTransaction')->willReturn($inTransaction); + + return $pdo; + } + + /** + * @return \PDOStatement&\PHPUnit\Framework\MockObject\MockObject + */ + private function statementMock(): \PDOStatement + { + return $this->getMockBuilder(\PDOStatement::class) + ->disableOriginalConstructor() + ->getMock(); + } + + public function testExecuteReconnectsRePreparesAndReplaysWhenNotInTransaction(): void + { + $pdo = $this->pdoMock(inTransaction: false); + + $first = $this->statementMock(); + $first->expects($this->once()) + ->method('bindValue') + ->with(':id', 'abc', \PDO::PARAM_STR) + ->willReturn(true); + $first->expects($this->once()) + ->method('execute') + ->willThrowException(new PDOException('Max connect timeout reached')); + + $second = $this->statementMock(); + $second->expects($this->once()) + ->method('bindValue') + ->with(':id', 'abc', \PDO::PARAM_STR) + ->willReturn(true); + $second->expects($this->once()) + ->method('execute') + ->willReturn(true); + + $pdo->expects($this->once())->method('reconnect'); + $pdo->expects($this->once()) + ->method('prepareNative') + ->with('SELECT :id', []) + ->willReturn($second); + + $statement = new PDOStatement($pdo, $first, 'SELECT :id'); + $statement->bindValue(':id', 'abc'); + + $this->assertTrue($statement->execute()); + } + + public function testExecuteRethrowsAndDoesNotReconnectInsideTransaction(): void + { + $pdo = $this->pdoMock(inTransaction: true); + $pdo->expects($this->never())->method('reconnect'); + $pdo->expects($this->never())->method('prepareNative'); + + $statement = $this->statementMock(); + $statement->expects($this->once()) + ->method('execute') + ->willThrowException(new PDOException('Max connect timeout reached')); + + $wrapper = new PDOStatement($pdo, $statement, 'SELECT 1'); + + $this->expectException(PDOException::class); + $this->expectExceptionMessage('Max connect timeout reached'); + + $wrapper->execute(); + } + + public function testExecuteRethrowsNonConnectionErrors(): void + { + $pdo = $this->pdoMock(inTransaction: false); + $pdo->expects($this->never())->method('reconnect'); + $pdo->expects($this->never())->method('prepareNative'); + + $statement = $this->statementMock(); + $statement->expects($this->once()) + ->method('execute') + ->willThrowException(new PDOException('SQLSTATE[42000]: Syntax error')); + + $wrapper = new PDOStatement($pdo, $statement, 'SELECT 1'); + + $this->expectException(PDOException::class); + $this->expectExceptionMessage('Syntax error'); + + $wrapper->execute(); + } + + public function testForwardsCallsAndPropertiesToUnderlyingStatement(): void + { + $pdo = $this->pdoMock(inTransaction: false); + + $statement = $this->statementMock(); + $statement->expects($this->once()) + ->method('rowCount') + ->willReturn(7); + + $wrapper = new PDOStatement($pdo, $statement, 'SELECT 1'); + + $this->assertSame($statement, $wrapper->getStatement()); + $this->assertSame(7, $wrapper->rowCount()); + } + + public function testIsIterableAndDelegatesIterationToTheStatement(): void + { + $pdo = $this->pdoMock(inTransaction: false); + $statement = $this->statementMock(); + + $wrapper = new PDOStatement($pdo, $statement, 'SELECT 1'); + + $this->assertInstanceOf(\IteratorAggregate::class, $wrapper); + $this->assertSame($statement, $wrapper->getIterator()); + } + + public function testDoesNotReconnectForNonExecuteMethods(): void + { + $pdo = $this->pdoMock(inTransaction: false); + $pdo->expects($this->never())->method('reconnect'); + $pdo->expects($this->never())->method('prepareNative'); + + $statement = $this->statementMock(); + $statement->expects($this->once()) + ->method('fetch') + ->willThrowException(new PDOException('server has gone away')); + + $wrapper = new PDOStatement($pdo, $statement, 'SELECT 1'); + + $this->expectException(PDOException::class); + $this->expectExceptionMessage('server has gone away'); + + $wrapper->fetch(); + } + + public function testBindParamReplaysCurrentValueAfterReconnect(): void + { + $pdo = $this->pdoMock(inTransaction: false); + + $first = $this->statementMock(); + $first->method('bindParam')->willReturn(true); + $first->expects($this->once()) + ->method('execute') + ->willThrowException(new PDOException('server has gone away')); + + $replayed = null; + $second = $this->statementMock(); + $second->expects($this->once()) + ->method('bindParam') + ->willReturnCallback(function (int|string $param, mixed &$variable) use (&$replayed): bool { + $replayed = $variable; + return true; + }); + $second->expects($this->once())->method('execute')->willReturn(true); + + $pdo->expects($this->once())->method('reconnect'); + $pdo->expects($this->once())->method('prepareNative')->willReturn($second); + + $wrapper = new PDOStatement($pdo, $first, 'SELECT :id'); + + $value = 'old'; + $wrapper->bindParam(':id', $value); + $value = 'new'; + + $this->assertTrue($wrapper->execute()); + $this->assertSame('new', $replayed, 'reconnect must replay the value bound by reference at execute time'); + } + + public function testReplaysMixedBindingsInOriginalCallOrder(): void + { + $pdo = $this->pdoMock(inTransaction: false); + + $first = $this->statementMock(); + $first->method('bindValue')->willReturn(true); + $first->method('bindParam')->willReturn(true); + $first->expects($this->once()) + ->method('execute') + ->willThrowException(new PDOException('server has gone away')); + + $replay = []; + $second = $this->statementMock(); + $second->method('bindValue')->willReturnCallback(function (int|string $p, mixed $v) use (&$replay): bool { + $replay[] = "value:{$v}"; + return true; + }); + $second->method('bindParam')->willReturnCallback(function (int|string $p, mixed &$v) use (&$replay): bool { + $replay[] = "param:{$v}"; + return true; + }); + $second->expects($this->once())->method('execute')->willReturn(true); + + $pdo->expects($this->once())->method('reconnect'); + $pdo->expects($this->once())->method('prepareNative')->willReturn($second); + + $wrapper = new PDOStatement($pdo, $first, 'SELECT :id'); + + // Caller rebinds the same placeholder: the later bindParam must win. + $wrapper->bindValue(':id', 'old'); + $current = 'new'; + $wrapper->bindParam(':id', $current); + + $this->assertTrue($wrapper->execute()); + $this->assertSame(['value:old', 'param:new'], $replay, 'replay must preserve original bind order so the last binding wins'); + } +} diff --git a/tests/unit/PDOTest.php b/tests/unit/PDOTest.php index 45e9a12a2..09e4ac122 100644 --- a/tests/unit/PDOTest.php +++ b/tests/unit/PDOTest.php @@ -5,6 +5,7 @@ use PHPUnit\Framework\TestCase; use ReflectionClass; use Utopia\Database\PDO; +use Utopia\Database\PDOStatement; class PDOTest extends TestCase { @@ -148,6 +149,40 @@ public function testMethodCallForPrepare(): void $result = $pdoWrapper->prepare('SELECT * FROM table', [\PDO::ATTR_CURSOR => \PDO::CURSOR_FWDONLY]); - $this->assertSame($pdoStatementMock, $result); + $this->assertInstanceOf(PDOStatement::class, $result); + $this->assertSame($pdoStatementMock, $result->getStatement()); + } + + public function testPrepareNativeReconnectsOutsideTransaction(): void + { + $pdoWrapper = $this->getMockBuilder(PDO::class) + ->setConstructorArgs(['sqlite::memory:', null, null, []]) + ->onlyMethods(['reconnect']) + ->getMock(); + + $pdoMock = $this->getMockBuilder(\PDO::class) + ->disableOriginalConstructor() + ->getMock(); + $statementMock = $this->getMockBuilder(\PDOStatement::class) + ->disableOriginalConstructor() + ->getMock(); + + $pdoMock->method('inTransaction')->willReturn(false); + $pdoMock->expects($this->exactly(2)) + ->method('prepare') + ->with('SELECT 1') + ->willReturnOnConsecutiveCalls( + $this->throwException(new \PDOException('server has gone away')), + $statementMock + ); + + $reflection = new ReflectionClass($pdoWrapper); + $pdoProperty = $reflection->getProperty('pdo'); + $pdoProperty->setAccessible(true); + $pdoProperty->setValue($pdoWrapper, $pdoMock); + + $pdoWrapper->expects($this->once())->method('reconnect'); + + $this->assertSame($statementMock, $pdoWrapper->prepareNative('SELECT 1')); } }