From ee03f1bc68157b97964c3a37580cc6417117d84e Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 18 Jun 2026 15:58:26 +1200 Subject: [PATCH 1/7] fix: destroy failed pool connections --- composer.json | 5 +- src/Pools/Connection.php | 4 +- src/Pools/Group.php | 2 +- src/Pools/Pool.php | 15 +++++- tests/Pools/Adapter/SwooleTest.php | 2 +- tests/Pools/Scopes/ConnectionTestScope.php | 6 +-- tests/Pools/Scopes/GroupTestScope.php | 18 ++++---- tests/Pools/Scopes/PoolTestScope.php | 54 ++++++++++++++++++---- 8 files changed, 78 insertions(+), 28 deletions(-) diff --git a/composer.json b/composer.json index 093bfec..c2e257c 100755 --- a/composer.json +++ b/composer.json @@ -36,7 +36,7 @@ "require-dev": { "swoole/ide-helper": "6.*" }, - "suggests": { + "suggest": { "ext-mongodb": "Needed to support MongoDB database pools", "ext-redis": "Needed to support Redis cache pools", "ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools", @@ -50,8 +50,5 @@ "php-http/discovery": false, "tbachert/spi": false } - }, - "suggest": { - "ext-swoole": "Required to use the Swoole pool adapter" } } diff --git a/src/Pools/Connection.php b/src/Pools/Connection.php index 5e49b94..0cd9688 100644 --- a/src/Pools/Connection.php +++ b/src/Pools/Connection.php @@ -19,7 +19,9 @@ class Connection /** * @param TResource $resource */ - public function __construct(protected mixed $resource) {} + public function __construct(protected mixed $resource) + { + } /** * @return string diff --git a/src/Pools/Group.php b/src/Pools/Group.php index b8997d1..1182a4a 100644 --- a/src/Pools/Group.php +++ b/src/Pools/Group.php @@ -89,7 +89,7 @@ private function useInternal(array $names, callable $callback, array $resources return $this ->get(array_shift($names)) - ->use(fn($resource) => $this->useInternal($names, $callback, array_merge($resources, [$resource]))); + ->use(fn ($resource) => $this->useInternal($names, $callback, array_merge($resources, [$resource]))); } /** diff --git a/src/Pools/Pool.php b/src/Pools/Pool.php index 0fac17a..988b745 100644 --- a/src/Pools/Pool.php +++ b/src/Pools/Pool.php @@ -224,13 +224,26 @@ public function use(callable $callback): mixed { $start = microtime(true); $connection = null; + $failed = false; + try { $connection = $this->pop(); return $callback($connection->getResource()); + } catch (\Throwable $error) { + $failed = true; + throw $error; } finally { $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); if ($connection !== null) { - $this->reclaim($connection); + if ($failed) { + try { + $this->destroy($connection); + } catch (\Throwable) { + // Preserve the callback exception; destroy already removed the connection. + } + } else { + $this->reclaim($connection); + } } } } diff --git a/tests/Pools/Adapter/SwooleTest.php b/tests/Pools/Adapter/SwooleTest.php index c74df13..5a1af2d 100644 --- a/tests/Pools/Adapter/SwooleTest.php +++ b/tests/Pools/Adapter/SwooleTest.php @@ -343,7 +343,7 @@ public function testSwooleCoroutineStressTest(): void } public function testInitOutsideCoroutineNotThrowAnyError(): void { - $pool = new Pool(new Swoole(), 'test', 1, fn() => 'x'); + $pool = new Pool(new Swoole(), 'test', 1, fn () => 'x'); $this->assertInstanceOf(Pool::class, $pool); } } diff --git a/tests/Pools/Scopes/ConnectionTestScope.php b/tests/Pools/Scopes/ConnectionTestScope.php index 67632d4..ebf3c62 100644 --- a/tests/Pools/Scopes/ConnectionTestScope.php +++ b/tests/Pools/Scopes/ConnectionTestScope.php @@ -69,7 +69,7 @@ public function testConnectionSetPool(): void { $this->execute(function (): void { $this->setUpConnection(); - $pool = new Pool($this->getAdapter(), 'test', 1, fn() => 'x'); + $pool = new Pool($this->getAdapter(), 'test', 1, fn () => 'x'); $this->assertNull($this->connectionObject->getPool()); $this->assertInstanceOf(Connection::class, $this->connectionObject->setPool($pool)); @@ -80,7 +80,7 @@ public function testConnectionGetPool(): void { $this->execute(function (): void { $this->setUpConnection(); - $pool = new Pool($this->getAdapter(), 'test', 1, fn() => 'x'); + $pool = new Pool($this->getAdapter(), 'test', 1, fn () => 'x'); $this->assertNull($this->connectionObject->getPool()); $this->assertInstanceOf(Connection::class, $this->connectionObject->setPool($pool)); @@ -99,7 +99,7 @@ public function testConnectionGetPool(): void public function testConnectionReclaim(): void { $this->execute(function (): void { - $pool = new Pool($this->getAdapter(), 'test', 2, fn() => 'x'); + $pool = new Pool($this->getAdapter(), 'test', 2, fn () => 'x'); $this->assertSame(2, $pool->count()); diff --git a/tests/Pools/Scopes/GroupTestScope.php b/tests/Pools/Scopes/GroupTestScope.php index be4a5a1..a8158f3 100644 --- a/tests/Pools/Scopes/GroupTestScope.php +++ b/tests/Pools/Scopes/GroupTestScope.php @@ -22,7 +22,7 @@ public function testGroupAdd(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); }); @@ -32,7 +32,7 @@ public function testGroupGet(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); @@ -46,7 +46,7 @@ public function testGroupRemove(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 1, fn () => 'x')); $this->assertInstanceOf(Pool::class, $this->groupObject->get('test')); @@ -62,7 +62,7 @@ public function testGroupReset(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); $this->assertSame(5, $this->groupObject->get('test')->count()); @@ -82,7 +82,7 @@ public function testGroupReconnectAttempts(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); $this->assertSame(3, $this->groupObject->get('test')->getReconnectAttempts()); @@ -96,7 +96,7 @@ public function testGroupReconnectSleep(): void { $this->execute(function (): void { $this->setUpGroup(); - $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn() => 'x')); + $this->groupObject->add(new Pool($this->getAdapter(), 'test', 5, fn () => 'x')); $this->assertSame(1, $this->groupObject->get('test')->getReconnectSleep()); @@ -110,9 +110,9 @@ public function testGroupUse(): void { $this->execute(function (): void { $this->setUpGroup(); - $pool1 = new Pool($this->getAdapter(), 'pool1', 1, fn() => '1'); - $pool2 = new Pool($this->getAdapter(), 'pool2', 1, fn() => '2'); - $pool3 = new Pool($this->getAdapter(), 'pool3', 1, fn() => '3'); + $pool1 = new Pool($this->getAdapter(), 'pool1', 1, fn () => '1'); + $pool2 = new Pool($this->getAdapter(), 'pool2', 1, fn () => '2'); + $pool3 = new Pool($this->getAdapter(), 'pool3', 1, fn () => '3'); $this->groupObject->add($pool1); $this->groupObject->add($pool2); diff --git a/tests/Pools/Scopes/PoolTestScope.php b/tests/Pools/Scopes/PoolTestScope.php index d9b3350..5ed32f3 100644 --- a/tests/Pools/Scopes/PoolTestScope.php +++ b/tests/Pools/Scopes/PoolTestScope.php @@ -19,7 +19,7 @@ abstract protected function execute(callable $callback): mixed; protected function setUpPool(): void { - $this->poolObject = new Pool($this->getAdapter(), 'test', 5, fn() => 'x'); + $this->poolObject = new Pool($this->getAdapter(), 'test', 5, fn () => 'x'); } public function testPoolGetName(): void @@ -364,23 +364,61 @@ public function testPoolEmptyErrorIncludesActiveCount(): void }); } - public function testUseReclainsConnectionOnCallbackException(): void + public function testUseDestroysConnectionOnCallbackException(): void { $this->execute(function (): void { - $this->setUpPool(); // size 5 + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-destroy-on-error', 2, function () use (&$created) { + $created++; + return 'resource-' . $created; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); - // use() should reclaim the connection even when callback throws try { - $this->poolObject->use(function ($resource): void { - $this->assertSame(4, $this->poolObject->count()); + $pool->use(function (string $resource): void { + $this->assertSame('resource-1', $resource); throw new \RuntimeException('Callback failed'); }); } catch (\RuntimeException) { // expected } - // Connection should be reclaimed, pool back to full - $this->assertSame(5, $this->poolObject->count()); + $this->assertSame(2, $pool->count()); + + $pool->use(function (string $resource): void { + $this->assertSame('resource-2', $resource); + }); + }); + } + + public function testUsePreservesCallbackExceptionWhenReplacementFails(): void + { + $this->execute(function (): void { + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-preserve-callback-error', 1, function () use (&$created) { + $created++; + if ($created > 1) { + throw new \RuntimeException('Replacement failed'); + } + + return 'resource-' . $created; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); + + $error = null; + try { + $pool->use(function (string $resource): void { + $this->assertSame('resource-1', $resource); + throw new \LogicException('Callback failed'); + }); + } catch (\LogicException $error) { + } + + $this->assertInstanceOf(\LogicException::class, $error); + $this->assertSame('Callback failed', $error->getMessage()); + $this->assertSame(1, $pool->count()); }); } From 09eedc026c64452c08d4bd16b0a9f93e5d72dc40 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 18 Jun 2026 16:00:11 +1200 Subject: [PATCH 2/7] fix: install telemetry extension in tests --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a779da5..5cdd85a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,7 +16,7 @@ jobs: - uses: shivammathur/setup-php@f3e473d116dcccaddc5834248c87452386958240 # ratchet:shivammathur/setup-php@v2 with: php-version: '8.4' - extensions: swoole + extensions: opentelemetry, swoole tools: phpunit coverage: none From 9b403e9860a1d032ca171ff7d7d563584ed429f7 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 18 Jun 2026 16:02:05 +1200 Subject: [PATCH 3/7] fix: install protobuf extension in tests --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5cdd85a..f288451 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,7 +16,7 @@ jobs: - uses: shivammathur/setup-php@f3e473d116dcccaddc5834248c87452386958240 # ratchet:shivammathur/setup-php@v2 with: php-version: '8.4' - extensions: opentelemetry, swoole + extensions: opentelemetry, protobuf, swoole tools: phpunit coverage: none From ea694fa0522d28e07ca2377f834cd605079a8531 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 18 Jun 2026 16:09:12 +1200 Subject: [PATCH 4/7] fix: recover failed pool connections --- src/Pools/Pool.php | 40 ++++++++++++++++++--- tests/Pools/Scopes/GroupTestScope.php | 29 ++++++++++++++++ tests/Pools/Scopes/PoolTestScope.php | 50 +++++++++++++++++++++------ 3 files changed, 104 insertions(+), 15 deletions(-) diff --git a/src/Pools/Pool.php b/src/Pools/Pool.php index 988b745..de96847 100644 --- a/src/Pools/Pool.php +++ b/src/Pools/Pool.php @@ -236,10 +236,14 @@ public function use(callable $callback): mixed $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); if ($connection !== null) { if ($failed) { - try { - $this->destroy($connection); - } catch (\Throwable) { - // Preserve the callback exception; destroy already removed the connection. + if ($this->recover($connection)) { + $this->reclaim($connection); + } else { + try { + $this->destroy($connection); + } catch (\Throwable) { + // Preserve the callback exception; destroy already removed the connection. + } } } else { $this->reclaim($connection); @@ -248,6 +252,32 @@ public function use(callable $callback): mixed } } + /** + * @param Connection $connection + */ + private function recover(Connection $connection): bool + { + $resource = $connection->getResource(); + + if (!\is_object($resource)) { + return true; + } + + try { + if (\method_exists($resource, 'reset')) { + $resource->reset(); + } + + if (\method_exists($resource, 'reconnect')) { + $resource->reconnect(); + } + } catch (\Throwable) { + return false; + } + + return true; + } + /** * Summary: * 1. Try to get a connection from the pool @@ -431,7 +461,7 @@ private function destroyConnection(?Connection $connection = null): static if ($shouldCreate) { try { $this->pool->push($this->createConnection()); - } catch (Exception $e) { + } catch (\Throwable $e) { $this->pool->synchronized(function (): void { $this->connectionsCreated--; }); diff --git a/tests/Pools/Scopes/GroupTestScope.php b/tests/Pools/Scopes/GroupTestScope.php index a8158f3..152bba3 100644 --- a/tests/Pools/Scopes/GroupTestScope.php +++ b/tests/Pools/Scopes/GroupTestScope.php @@ -137,4 +137,33 @@ public function testGroupUse(): void $this->assertSame(1, $pool3->count()); }); } + + public function testGroupUseReclaimsEarlierConnectionWhenLaterPoolIsMissing(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $created = 0; + $pool = new Pool($this->getAdapter(), 'pool1', 1, function () use (&$created) { + $created++; + return 'resource-' . $created; + }); + + $this->groupObject->add($pool); + + try { + $this->groupObject->use(['pool1', 'missing'], function (): void { + }); + $this->fail('Should have thrown'); + } catch (Exception) { + // expected + } + + $this->assertSame(1, $pool->count()); + + $pool->use(function (string $resource): void { + $this->assertSame('resource-1', $resource); + }); + $this->assertSame(1, $created); + }); + } } diff --git a/tests/Pools/Scopes/PoolTestScope.php b/tests/Pools/Scopes/PoolTestScope.php index 5ed32f3..816bace 100644 --- a/tests/Pools/Scopes/PoolTestScope.php +++ b/tests/Pools/Scopes/PoolTestScope.php @@ -364,20 +364,36 @@ public function testPoolEmptyErrorIncludesActiveCount(): void }); } - public function testUseDestroysConnectionOnCallbackException(): void + public function testUseDestroysConnectionWhenRecoveryFails(): void { $this->execute(function (): void { $created = 0; $pool = new Pool($this->getAdapter(), 'test-destroy-on-error', 2, function () use (&$created) { $created++; - return 'resource-' . $created; + return new class ('resource-' . $created, $created === 1) implements \Stringable { + public function __construct(private string $name, private bool $failRecovery) + { + } + + public function __toString(): string + { + return $this->name; + } + + public function reconnect(): void + { + if ($this->failRecovery) { + throw new \RuntimeException('Recovery failed'); + } + } + }; }); $pool->setReconnectAttempts(1); $pool->setReconnectSleep(0); try { - $pool->use(function (string $resource): void { - $this->assertSame('resource-1', $resource); + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-1', (string) $resource); throw new \RuntimeException('Callback failed'); }); } catch (\RuntimeException) { @@ -386,8 +402,8 @@ public function testUseDestroysConnectionOnCallbackException(): void $this->assertSame(2, $pool->count()); - $pool->use(function (string $resource): void { - $this->assertSame('resource-2', $resource); + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-2', (string) $resource); }); }); } @@ -399,18 +415,32 @@ public function testUsePreservesCallbackExceptionWhenReplacementFails(): void $pool = new Pool($this->getAdapter(), 'test-preserve-callback-error', 1, function () use (&$created) { $created++; if ($created > 1) { - throw new \RuntimeException('Replacement failed'); + throw new \TypeError('Replacement failed'); } - return 'resource-' . $created; + return new class ('resource-' . $created) implements \Stringable { + public function __construct(private string $name) + { + } + + public function __toString(): string + { + return $this->name; + } + + public function reconnect(): void + { + throw new \RuntimeException('Recovery failed'); + } + }; }); $pool->setReconnectAttempts(1); $pool->setReconnectSleep(0); $error = null; try { - $pool->use(function (string $resource): void { - $this->assertSame('resource-1', $resource); + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-1', (string) $resource); throw new \LogicException('Callback failed'); }); } catch (\LogicException $error) { From a0ecd0c6bda4e6b88b291928e2c7e43f0c2eaa37 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 18 Jun 2026 16:17:27 +1200 Subject: [PATCH 5/7] fix: tighten failed resource recovery --- src/Pools/Group.php | 41 +++++++------- src/Pools/Pool.php | 56 ++++++++++++++----- tests/Pools/Scopes/GroupTestScope.php | 20 +++++-- tests/Pools/Scopes/PoolTestScope.php | 79 +++++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 39 deletions(-) diff --git a/src/Pools/Group.php b/src/Pools/Group.php index 1182a4a..48ddcde 100644 --- a/src/Pools/Group.php +++ b/src/Pools/Group.php @@ -68,28 +68,29 @@ public function use(array $names, callable $callback): mixed if (empty($names)) { throw new Exception('Cannot use with empty names'); } - return $this->useInternal($names, $callback); - } - /** - * Internal recursive callback for `use`. - * - * @template TReturn - * @param array $names Name of resources - * @param callable(mixed...): TReturn $callback Function that receives the connection resources - * @param array $resources - * @return TReturn - * @throws Exception - */ - private function useInternal(array $names, callable $callback, array $resources = []): mixed - { - if (empty($names)) { - return $callback(...$resources); + $connections = []; + $pools = []; + $started = false; + $failed = false; + + try { + foreach ($names as $name) { + $pool = $this->get($name); + $pools[] = $pool; + $connections[] = $pool->pop(); + } + + $started = true; + return $callback(...array_map(fn (Connection $connection) => $connection->getResource(), $connections)); + } catch (\Throwable $error) { + $failed = $started; + throw $error; + } finally { + for ($i = \count($connections) - 1; $i >= 0; $i--) { + $pools[$i]->release($connections[$i], $failed); + } } - - return $this - ->get(array_shift($names)) - ->use(fn ($resource) => $this->useInternal($names, $callback, array_merge($resources, [$resource]))); } /** diff --git a/src/Pools/Pool.php b/src/Pools/Pool.php index de96847..591d99a 100644 --- a/src/Pools/Pool.php +++ b/src/Pools/Pool.php @@ -235,21 +235,39 @@ public function use(callable $callback): mixed } finally { $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); if ($connection !== null) { - if ($failed) { - if ($this->recover($connection)) { - $this->reclaim($connection); - } else { - try { - $this->destroy($connection); - } catch (\Throwable) { - // Preserve the callback exception; destroy already removed the connection. - } - } - } else { - $this->reclaim($connection); + $this->release($connection, $failed); + } + } + } + + /** + * @param Connection $connection + * @return $this + * @internal + */ + public function release(Connection $connection, bool $failed = false): static + { + if (!$failed) { + return $this->reclaim($connection); + } + + if ($this->recover($connection)) { + try { + return $this->reclaim($connection); + } catch (\Throwable) { + try { + return $this->destroy($connection); + } catch (\Throwable) { + return $this; } } } + + try { + return $this->destroy($connection); + } catch (\Throwable) { + return $this; + } } /** @@ -264,18 +282,26 @@ private function recover(Connection $connection): bool } try { + $recovered = false; + if (\method_exists($resource, 'reset')) { - $resource->reset(); + $recovered = true; + if ($resource->reset() === false) { + return false; + } } if (\method_exists($resource, 'reconnect')) { - $resource->reconnect(); + $recovered = true; + if ($resource->reconnect() === false) { + return false; + } } } catch (\Throwable) { return false; } - return true; + return $recovered; } /** diff --git a/tests/Pools/Scopes/GroupTestScope.php b/tests/Pools/Scopes/GroupTestScope.php index 152bba3..fd69b22 100644 --- a/tests/Pools/Scopes/GroupTestScope.php +++ b/tests/Pools/Scopes/GroupTestScope.php @@ -143,9 +143,20 @@ public function testGroupUseReclaimsEarlierConnectionWhenLaterPoolIsMissing(): v $this->execute(function (): void { $this->setUpGroup(); $created = 0; - $pool = new Pool($this->getAdapter(), 'pool1', 1, function () use (&$created) { + $resources = []; + $pool = new Pool($this->getAdapter(), 'pool1', 1, function () use (&$created, &$resources) { $created++; - return 'resource-' . $created; + $resources[] = new class ('resource-' . $created) implements \Stringable { + public function __construct(private string $name) + { + } + + public function __toString(): string + { + return $this->name; + } + }; + return $resources[$created - 1]; }); $this->groupObject->add($pool); @@ -160,8 +171,9 @@ public function testGroupUseReclaimsEarlierConnectionWhenLaterPoolIsMissing(): v $this->assertSame(1, $pool->count()); - $pool->use(function (string $resource): void { - $this->assertSame('resource-1', $resource); + $pool->use(function (\Stringable $resource) use (&$resources): void { + $this->assertSame($resources[0], $resource); + $this->assertSame('resource-1', (string) $resource); }); $this->assertSame(1, $created); }); diff --git a/tests/Pools/Scopes/PoolTestScope.php b/tests/Pools/Scopes/PoolTestScope.php index 816bace..a7ba193 100644 --- a/tests/Pools/Scopes/PoolTestScope.php +++ b/tests/Pools/Scopes/PoolTestScope.php @@ -408,6 +408,85 @@ public function reconnect(): void }); } + public function testUseDestroysConnectionWhenRecoveryReturnsFalse(): void + { + $this->execute(function (): void { + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-destroy-on-false-recovery', 2, function () use (&$created) { + $created++; + return new class ('resource-' . $created) implements \Stringable { + public function __construct(private string $name) + { + } + + public function __toString(): string + { + return $this->name; + } + + public function reconnect(): bool + { + return false; + } + }; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); + + try { + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-1', (string) $resource); + throw new \RuntimeException('Callback failed'); + }); + } catch (\RuntimeException) { + // expected + } + + $this->assertSame(2, $pool->count()); + + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-2', (string) $resource); + }); + }); + } + + public function testUseDestroysObjectConnectionWithoutRecoveryHooks(): void + { + $this->execute(function (): void { + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-destroy-without-recovery', 2, function () use (&$created) { + $created++; + return new class ('resource-' . $created) implements \Stringable { + public function __construct(private string $name) + { + } + + public function __toString(): string + { + return $this->name; + } + }; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); + + try { + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-1', (string) $resource); + throw new \RuntimeException('Callback failed'); + }); + } catch (\RuntimeException) { + // expected + } + + $this->assertSame(2, $pool->count()); + + $pool->use(function (\Stringable $resource): void { + $this->assertSame('resource-2', (string) $resource); + }); + }); + } + public function testUsePreservesCallbackExceptionWhenReplacementFails(): void { $this->execute(function (): void { From 4d1b7c0e0b5230b7b7e12a0a3a10b4d079991b4a Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 18 Jun 2026 16:25:30 +1200 Subject: [PATCH 6/7] fix: handle native resource recovery --- src/Pools/Group.php | 4 ++- src/Pools/Pool.php | 8 ++++-- tests/Pools/Scopes/GroupTestScope.php | 24 +++++++++++++++++ tests/Pools/Scopes/PoolTestScope.php | 38 +++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 3 deletions(-) diff --git a/src/Pools/Group.php b/src/Pools/Group.php index 48ddcde..5992038 100644 --- a/src/Pools/Group.php +++ b/src/Pools/Group.php @@ -71,12 +71,14 @@ public function use(array $names, callable $callback): mixed $connections = []; $pools = []; + $starts = []; $started = false; $failed = false; try { foreach ($names as $name) { $pool = $this->get($name); + $starts[] = microtime(true); $pools[] = $pool; $connections[] = $pool->pop(); } @@ -88,7 +90,7 @@ public function use(array $names, callable $callback): mixed throw $error; } finally { for ($i = \count($connections) - 1; $i >= 0; $i--) { - $pools[$i]->release($connections[$i], $failed); + $pools[$i]->release($connections[$i], $failed, $starts[$i]); } } } diff --git a/src/Pools/Pool.php b/src/Pools/Pool.php index 591d99a..827acd0 100644 --- a/src/Pools/Pool.php +++ b/src/Pools/Pool.php @@ -245,8 +245,12 @@ public function use(callable $callback): mixed * @return $this * @internal */ - public function release(Connection $connection, bool $failed = false): static + public function release(Connection $connection, bool $failed = false, ?float $start = null): static { + if ($start !== null) { + $this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes); + } + if (!$failed) { return $this->reclaim($connection); } @@ -278,7 +282,7 @@ private function recover(Connection $connection): bool $resource = $connection->getResource(); if (!\is_object($resource)) { - return true; + return !\is_resource($resource); } try { diff --git a/tests/Pools/Scopes/GroupTestScope.php b/tests/Pools/Scopes/GroupTestScope.php index fd69b22..a45a983 100644 --- a/tests/Pools/Scopes/GroupTestScope.php +++ b/tests/Pools/Scopes/GroupTestScope.php @@ -5,6 +5,7 @@ use Exception; use Utopia\Pools\Group; use Utopia\Pools\Pool; +use Utopia\Telemetry\Adapter\Test as TestTelemetry; trait GroupTestScope { @@ -178,4 +179,27 @@ public function __toString(): string $this->assertSame(1, $created); }); } + + public function testGroupUseRecordsUseDurationTelemetry(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + $telemetry = new TestTelemetry(); + + $this->groupObject + ->add(new Pool($this->getAdapter(), 'pool1', 1, fn () => '1')) + ->setTelemetry($telemetry); + + $this->assertArrayNotHasKey('pool.connection.use_time', $telemetry->histograms); + + $this->groupObject->use(['pool1'], function (...$resources): void { + $this->assertSame(['1'], $resources); + }); + + $this->assertArrayHasKey('pool.connection.use_time', $telemetry->histograms); + /** @var object{values: array} $useHistogram */ + $useHistogram = $telemetry->histograms['pool.connection.use_time']; + $this->assertCount(1, $useHistogram->values); + }); + } } diff --git a/tests/Pools/Scopes/PoolTestScope.php b/tests/Pools/Scopes/PoolTestScope.php index a7ba193..532ffef 100644 --- a/tests/Pools/Scopes/PoolTestScope.php +++ b/tests/Pools/Scopes/PoolTestScope.php @@ -487,6 +487,44 @@ public function __toString(): string }); } + public function testUseDestroysNativeResourceConnectionAfterCallbackFailure(): void + { + $this->execute(function (): void { + $created = 0; + $pool = new Pool($this->getAdapter(), 'test-destroy-native-resource', 2, function () use (&$created) { + $created++; + $resource = fopen('php://temp', 'r+'); + if ($resource === false) { + throw new \RuntimeException('Failed to open stream'); + } + + fwrite($resource, 'resource-' . $created); + rewind($resource); + + return $resource; + }); + $pool->setReconnectAttempts(1); + $pool->setReconnectSleep(0); + + try { + $pool->use(function ($resource): void { + $this->assertIsResource($resource); + $this->assertSame('resource-1', stream_get_contents($resource)); + throw new \RuntimeException('Callback failed'); + }); + } catch (\RuntimeException) { + // expected + } + + $this->assertSame(2, $pool->count()); + + $pool->use(function ($resource): void { + $this->assertIsResource($resource); + $this->assertSame('resource-2', stream_get_contents($resource)); + }); + }); + } + public function testUsePreservesCallbackExceptionWhenReplacementFails(): void { $this->execute(function (): void { From 0782490eb5138a0113f5562aaf5550a595f96523 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 18 Jun 2026 16:33:59 +1200 Subject: [PATCH 7/7] fix: release all grouped resources --- src/Pools/Group.php | 26 +++++++++++++--- tests/Pools/Scopes/GroupTestScope.php | 45 +++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/src/Pools/Group.php b/src/Pools/Group.php index 5992038..f25450b 100644 --- a/src/Pools/Group.php +++ b/src/Pools/Group.php @@ -74,6 +74,8 @@ public function use(array $names, callable $callback): mixed $starts = []; $started = false; $failed = false; + $thrown = null; + $result = null; try { foreach ($names as $name) { @@ -84,15 +86,31 @@ public function use(array $names, callable $callback): mixed } $started = true; - return $callback(...array_map(fn (Connection $connection) => $connection->getResource(), $connections)); + $result = $callback(...array_map(fn (Connection $connection) => $connection->getResource(), $connections)); } catch (\Throwable $error) { + $thrown = $error; $failed = $started; - throw $error; - } finally { - for ($i = \count($connections) - 1; $i >= 0; $i--) { + } + + $releaseError = null; + + for ($i = \count($connections) - 1; $i >= 0; $i--) { + try { $pools[$i]->release($connections[$i], $failed, $starts[$i]); + } catch (\Throwable $error) { + $releaseError ??= $error; } } + + if ($thrown !== null) { + throw $thrown; + } + + if ($releaseError !== null) { + throw $releaseError; + } + + return $result; } /** diff --git a/tests/Pools/Scopes/GroupTestScope.php b/tests/Pools/Scopes/GroupTestScope.php index a45a983..65c5653 100644 --- a/tests/Pools/Scopes/GroupTestScope.php +++ b/tests/Pools/Scopes/GroupTestScope.php @@ -3,6 +3,7 @@ namespace Utopia\Tests\Scopes; use Exception; +use Utopia\Pools\Connection; use Utopia\Pools\Group; use Utopia\Pools\Pool; use Utopia\Telemetry\Adapter\Test as TestTelemetry; @@ -202,4 +203,48 @@ public function testGroupUseRecordsUseDurationTelemetry(): void $this->assertCount(1, $useHistogram->values); }); } + + public function testGroupUseReleasesEveryConnectionWhenCleanupThrows(): void + { + $this->execute(function (): void { + $this->setUpGroup(); + + $pool1 = new class ($this->getAdapter(), 'pool1', 1, fn () => '1') extends Pool { + public bool $released = false; + + public function release(Connection $connection, bool $failed = false, ?float $start = null): static + { + $this->released = true; + return parent::release($connection, $failed, $start); + } + }; + $pool2 = new class ($this->getAdapter(), 'pool2', 1, fn () => '2') extends Pool { + public bool $released = false; + + public function release(Connection $connection, bool $failed = false, ?float $start = null): static + { + $this->released = true; + throw new \RuntimeException('Release failed'); + } + }; + + $this->groupObject + ->add($pool1) + ->add($pool2); + + $error = null; + try { + $this->groupObject->use(['pool1', 'pool2'], function (...$resources): void { + $this->assertSame(['1', '2'], $resources); + }); + } catch (\RuntimeException $error) { + } + + $this->assertInstanceOf(\RuntimeException::class, $error); + $this->assertSame('Release failed', $error->getMessage()); + $this->assertTrue($pool1->released); + $this->assertTrue($pool2->released); + $this->assertSame(1, $pool1->count()); + }); + } }