FastCGI Command max requests

This commit is contained in:
Juan Pablo Vial
2025-06-30 17:09:45 -04:00
parent 829ab86770
commit e35053edce
3 changed files with 30 additions and 7 deletions

View File

@ -42,10 +42,7 @@ class Run extends Console\Command\Command
array_map(function($row) {return [$row];},$jobIds) array_map(function($row) {return [$row];},$jobIds)
]]); ]]);
$this->pushOutput('top', ['progress' => $jobCount]); $this->pushOutput('top', ['progress' => $jobCount]);
foreach ($jobIds as $jobId) { $result = $this->runJobs($jobIds);
$this->runJob($jobId);
}
$result = $this->getResponses();
$this->pushOutput('top', ['progress' => 'finish']); $this->pushOutput('top', ['progress' => 'finish']);
$this->writeOutput($input, $output); $this->writeOutput($input, $output);
@ -53,15 +50,34 @@ class Run extends Console\Command\Command
return $result; return $result;
} }
protected function runJob(int $jobId): void protected function runJobs(array $jobIds): int
{
$pendingJobs = [];
foreach ($jobIds as $jobId) {
if (!$this->runJob($jobId)) {
$pendingJobs []= $jobId;
}
}
$result = $this->getResponses();
if (count($pendingJobs) > 0) {
if ($this->runJobs($pendingJobs) === self::FAILURE) {
$result = self::FAILURE;
}
}
return $result;
}
protected function runJob(int $jobId): bool
{ {
$uri = "/api/queue/run/{$jobId}"; $uri = "/api/queue/run/{$jobId}";
$this->pushOutput('bottom', ['message' => "GET {$uri}"]); $this->pushOutput('bottom', ['message' => "GET {$uri}"]);
try { try {
$this->fastcgi->get($uri); $this->fastcgi->get($uri);
return true;
} catch (FastCGIException $exception) { } catch (FastCGIException $exception) {
$this->logger->error($exception->getMessage(), ['uri' => $uri, 'exception' =>$exception]); $this->logger->error($exception->getMessage(), ['uri' => $uri, 'exception' =>$exception]);
return false;
} }
} }
protected function getResponses(): int protected function getResponses(): int

View File

@ -7,7 +7,7 @@ use Psr\Http\Client\ClientExceptionInterface;
class FastCGI implements ClientExceptionInterface class FastCGI implements ClientExceptionInterface
{ {
public function __construct(protected ?Throwable $previous) {} public function __construct(protected ?Throwable $previous = null) {}
public function getMessage(): string public function getMessage(): string
{ {

View File

@ -10,6 +10,7 @@ class FastCGI implements LoggerAwareInterface
{ {
public function __construct(protected Login $loginService, protected string $hostname, protected int $port, public function __construct(protected Login $loginService, protected string $hostname, protected int $port,
protected string $documentRoot, protected string $documentRoot,
protected int $maxRequests = 50,
protected int $connectionTimeout = 5000, protected int $readTimeout = 5000) protected int $connectionTimeout = 5000, protected int $readTimeout = 5000)
{ {
$this->client = new FCGI\Client(); $this->client = new FCGI\Client();
@ -40,6 +41,9 @@ class FastCGI implements LoggerAwareInterface
*/ */
public function sendRequest(FCGI\Interfaces\ProvidesRequestData $request): self public function sendRequest(FCGI\Interfaces\ProvidesRequestData $request): self
{ {
if (count($this->socketIds) >= $this->maxRequests) {
throw new FastCGIException();
}
if (!isset($this->socket)) { if (!isset($this->socket)) {
$this->connect(); $this->connect();
} }
@ -60,13 +64,15 @@ class FastCGI implements LoggerAwareInterface
{ {
$responses = []; $responses = [];
$repeats = 0; $repeats = 0;
$maxRepeats = count($this->socketIds);
$maxRepeats = min(count($this->socketIds), $this->maxRequests);
while ($this->client->hasUnhandledResponses()) { while ($this->client->hasUnhandledResponses()) {
if ($repeats >= $maxRepeats) { if ($repeats >= $maxRepeats) {
break; break;
} }
try { try {
$readySocketIds = $this->client->getSocketIdsHavingResponse();
$readyResponses = $this->client->readReadyResponses(3000); $readyResponses = $this->client->readReadyResponses(3000);
} catch (FCGI\Exceptions\FastCGIClientException $exception) { } catch (FCGI\Exceptions\FastCGIClientException $exception) {
$this->logger->error($exception->getMessage()); $this->logger->error($exception->getMessage());
@ -77,6 +83,7 @@ class FastCGI implements LoggerAwareInterface
$responses []= $response; $responses []= $response;
$repeats ++; $repeats ++;
} }
$this->socketIds = array_diff($this->socketIds, $readySocketIds);
} }
if ($this->client->hasUnhandledResponses()) { if ($this->client->hasUnhandledResponses()) {
$this->logger->error("Unhandled responses"); $this->logger->error("Unhandled responses");