Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: shivammathur/setup-php@f3e473d116dcccaddc5834248c87452386958240 # ratchet:shivammathur/setup-php@v2
with:
php-version: '8.4'
extensions: swoole
extensions: opentelemetry, protobuf, swoole
tools: phpunit
coverage: none

Expand Down
5 changes: 1 addition & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"require-dev": {
"swoole/ide-helper": "6.*"
},
"suggests": {
"suggest": {
"ext-mongodb": "Needed to support MongoDB database pools",
"ext-redis": "Needed to support Redis cache pools",
"ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools",
Expand All @@ -50,8 +50,5 @@
"php-http/discovery": false,
"tbachert/spi": false
}
},
"suggest": {
"ext-swoole": "Required to use the Swoole pool adapter"
}
}
4 changes: 3 additions & 1 deletion src/Pools/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class Connection
/**
* @param TResource $resource
*/
public function __construct(protected mixed $resource) {}
public function __construct(protected mixed $resource)
{
}

/**
* @return string
Expand Down
59 changes: 40 additions & 19 deletions src/Pools/Group.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,49 @@ public function use(array $names, callable $callback): mixed
if (empty($names)) {
throw new Exception('Cannot use with empty names');
}
return $this->useInternal($names, $callback);
}

/**
* Internal recursive callback for `use`.
*
* @template TReturn
* @param array<string> $names Name of resources
* @param callable(mixed...): TReturn $callback Function that receives the connection resources
* @param array<mixed> $resources
* @return TReturn
* @throws Exception
*/
private function useInternal(array $names, callable $callback, array $resources = []): mixed
{
if (empty($names)) {
return $callback(...$resources);
$connections = [];
$pools = [];
$starts = [];
$started = false;
$failed = false;
$thrown = null;
$result = null;

try {
foreach ($names as $name) {
$pool = $this->get($name);
$starts[] = microtime(true);
$pools[] = $pool;
$connections[] = $pool->pop();
}

$started = true;
$result = $callback(...array_map(fn (Connection $connection) => $connection->getResource(), $connections));
} catch (\Throwable $error) {
$thrown = $error;
$failed = $started;
}

$releaseError = null;

for ($i = \count($connections) - 1; $i >= 0; $i--) {
try {
$pools[$i]->release($connections[$i], $failed, $starts[$i]);
} catch (\Throwable $error) {
$releaseError ??= $error;
}
}

if ($thrown !== null) {
throw $thrown;
}

if ($releaseError !== null) {
throw $releaseError;
}

return $this
->get(array_shift($names))
->use(fn($resource) => $this->useInternal($names, $callback, array_merge($resources, [$resource])));
return $result;
}

/**
Expand Down
77 changes: 75 additions & 2 deletions src/Pools/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,88 @@ public function use(callable $callback): mixed
{
$start = microtime(true);
$connection = null;
$failed = false;

try {
$connection = $this->pop();
return $callback($connection->getResource());
} catch (\Throwable $error) {
$failed = true;
throw $error;
} finally {
$this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes);
if ($connection !== null) {
$this->reclaim($connection);
$this->release($connection, $failed);
}
}
}

/**
* @param Connection<TResource> $connection
* @return $this
* @internal
*/
public function release(Connection $connection, bool $failed = false, ?float $start = null): static
{
if ($start !== null) {
$this->telemetryUseDuration->record(microtime(true) - $start, $this->telemetryAttributes);
}

if (!$failed) {
return $this->reclaim($connection);
}

if ($this->recover($connection)) {
try {
return $this->reclaim($connection);
} catch (\Throwable) {
try {
return $this->destroy($connection);
} catch (\Throwable) {
return $this;
}
}
}

try {
return $this->destroy($connection);
} catch (\Throwable) {
return $this;
}
}

/**
* @param Connection<TResource> $connection
*/
private function recover(Connection $connection): bool
{
$resource = $connection->getResource();

if (!\is_object($resource)) {
return !\is_resource($resource);
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.

try {
$recovered = false;

if (\method_exists($resource, 'reset')) {
$recovered = true;
if ($resource->reset() === false) {
return false;
}
}

if (\method_exists($resource, 'reconnect')) {
$recovered = true;
if ($resource->reconnect() === false) {
return false;
}
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
} catch (\Throwable) {
return false;
}

return $recovered;
}

/**
Expand Down Expand Up @@ -418,7 +491,7 @@ private function destroyConnection(?Connection $connection = null): static
if ($shouldCreate) {
try {
$this->pool->push($this->createConnection());
} catch (Exception $e) {
} catch (\Throwable $e) {
$this->pool->synchronized(function (): void {
$this->connectionsCreated--;
});
Expand Down
2 changes: 1 addition & 1 deletion tests/Pools/Adapter/SwooleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public function testSwooleCoroutineStressTest(): void
}
public function testInitOutsideCoroutineNotThrowAnyError(): void
{
$pool = new Pool(new Swoole(), 'test', 1, fn() => 'x');
$pool = new Pool(new Swoole(), 'test', 1, fn () => 'x');
$this->assertInstanceOf(Pool::class, $pool);
}
}
6 changes: 3 additions & 3 deletions tests/Pools/Scopes/ConnectionTestScope.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public function testConnectionSetPool(): void
{
$this->execute(function (): void {
$this->setUpConnection();
$pool = new Pool($this->getAdapter(), 'test', 1, fn() => 'x');
$pool = new Pool($this->getAdapter(), 'test', 1, fn () => 'x');

$this->assertNull($this->connectionObject->getPool());
$this->assertInstanceOf(Connection::class, $this->connectionObject->setPool($pool));
Expand All @@ -80,7 +80,7 @@ public function testConnectionGetPool(): void
{
$this->execute(function (): void {
$this->setUpConnection();
$pool = new Pool($this->getAdapter(), 'test', 1, fn() => 'x');
$pool = new Pool($this->getAdapter(), 'test', 1, fn () => 'x');

$this->assertNull($this->connectionObject->getPool());
$this->assertInstanceOf(Connection::class, $this->connectionObject->setPool($pool));
Expand All @@ -99,7 +99,7 @@ public function testConnectionGetPool(): void
public function testConnectionReclaim(): void
{
$this->execute(function (): void {
$pool = new Pool($this->getAdapter(), 'test', 2, fn() => 'x');
$pool = new Pool($this->getAdapter(), 'test', 2, fn () => 'x');

$this->assertSame(2, $pool->count());

Expand Down
Loading