Job retries
This commit is contained in:
@ -7,12 +7,14 @@ class Job extends Ideal\Model
|
||||
{
|
||||
public array $configuration;
|
||||
public bool $executed = false;
|
||||
public int $retries = 0;
|
||||
|
||||
protected function jsonComplement(): array
|
||||
{
|
||||
return [
|
||||
'configuration' => $this->configuration,
|
||||
'executed' => $this->executed
|
||||
'executed' => $this->executed,
|
||||
'retries' => $this->retries
|
||||
];
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
<?php
|
||||
namespace Incoviba\Service;
|
||||
|
||||
use DateInvalidTimeZoneException;
|
||||
use DateMalformedStringException;
|
||||
use DateTimeImmutable;
|
||||
use DateTimeZone;
|
||||
use InvalidArgumentException;
|
||||
@ -10,7 +12,7 @@ use Predis\Connection\ConnectionException;
|
||||
use Incoviba\Common\Ideal;
|
||||
use Incoviba\Common\Implement\Exception\EmptyRedis;
|
||||
use Incoviba\Common\Implement\Exception\EmptyResult;
|
||||
use Incoviba\Exception\ServiceAction\{Create, Read, Update};
|
||||
use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update};
|
||||
use Incoviba\Repository;
|
||||
use Incoviba\Model;
|
||||
|
||||
@ -62,17 +64,22 @@ class Job extends Ideal\Service
|
||||
/**
|
||||
* @param array $configuration
|
||||
* @return Model\Job
|
||||
* @throws Read
|
||||
* @throws Create
|
||||
*/
|
||||
public function add(array $configuration): Model\Job
|
||||
{
|
||||
$now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago')));
|
||||
try {
|
||||
$now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago')));
|
||||
} catch (DateMalformedStringException | DateInvalidTimeZoneException $exception) {
|
||||
$now = new DateTimeImmutable();
|
||||
}
|
||||
$data = [
|
||||
'id' => $now->format('Uu'),
|
||||
'configuration' => $configuration,
|
||||
'executed' => false,
|
||||
'created_at' => $now->format('Y-m-d H:i:s'),
|
||||
'updated_at' => null
|
||||
'updated_at' => null,
|
||||
'retries' => 0
|
||||
];
|
||||
$jobs = [];
|
||||
try {
|
||||
@ -82,15 +89,69 @@ class Job extends Ideal\Service
|
||||
}
|
||||
} catch (EmptyRedis) {}
|
||||
$jobs []= $data;
|
||||
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
|
||||
try {
|
||||
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
|
||||
} catch (ConnectionException $exception) {
|
||||
throw new Create(__CLASS__, $exception);
|
||||
}
|
||||
return $this->load($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Model\Job $job
|
||||
* @return bool
|
||||
* @return Model\Job
|
||||
* @throws Update
|
||||
* @throws Read
|
||||
*/
|
||||
public function update(Model\Job $job): Model\Job
|
||||
{
|
||||
try {
|
||||
$now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago')));
|
||||
} catch (DateMalformedStringException | DateInvalidTimeZoneException) {
|
||||
$now = new DateTimeImmutable();
|
||||
}
|
||||
$jobs = $this->getJobs();
|
||||
try {
|
||||
$idx = $this->findJob($jobs, $job->id);
|
||||
} catch (EmptyResult $exception) {
|
||||
throw new Read(__CLASS__, $exception);
|
||||
}
|
||||
$jobs[$idx]['updated_at'] = $now->format('Y-m-d H:i:s');
|
||||
$jobs[$idx]['retries'] = $job->retries;
|
||||
try {
|
||||
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
|
||||
} catch (ConnectionException $exception) {
|
||||
throw new Update(__CLASS__, $exception);
|
||||
}
|
||||
return $this->load($jobs[$idx]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Model\Job $job
|
||||
* @throws Read
|
||||
* @throws Delete
|
||||
*/
|
||||
public function remove(Model\Job $job): void
|
||||
{
|
||||
$jobs = $this->getJobs();
|
||||
try {
|
||||
$idx = $this->findJob($jobs, $job->id);
|
||||
} catch (EmptyResult $exception) {
|
||||
throw new Read(__CLASS__, $exception);
|
||||
}
|
||||
unset($jobs[$idx]);
|
||||
try {
|
||||
$this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1);
|
||||
} catch (ConnectionException $exception) {
|
||||
throw new Delete(__CLASS__, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Model\Job $job
|
||||
* @return bool
|
||||
* @throws Read | Create
|
||||
*/
|
||||
public function execute(Model\Job $job): bool
|
||||
{
|
||||
$jobs = $this->getJobs();
|
||||
@ -101,7 +162,11 @@ class Job extends Ideal\Service
|
||||
throw new Read(__CLASS__, $exception);
|
||||
}
|
||||
unset($jobs[$idx]);
|
||||
$this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1);
|
||||
try {
|
||||
$this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1);
|
||||
} catch (ConnectionException $exception) {
|
||||
throw new Create(__CLASS__, $exception);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -144,6 +209,7 @@ class Job extends Ideal\Service
|
||||
$job->id = $id ?? $data['id'] ?? null;
|
||||
$job->configuration = $data['configuration'] ?? [];
|
||||
$job->executed = $data['executed'] ?? false;
|
||||
$job->retries = $data['retries'] ?? 0;
|
||||
return $job;
|
||||
}
|
||||
}
|
||||
|
@ -5,12 +5,13 @@ use Exception;
|
||||
use Psr\Http\Message\RequestInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Incoviba\Common\Ideal;
|
||||
use Incoviba\Exception\ServiceAction\{Create, Read};
|
||||
use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update};
|
||||
use Incoviba\Service;
|
||||
|
||||
class Queue extends Ideal\Service
|
||||
{
|
||||
public function __construct(LoggerInterface $logger, protected Service\Job $jobService, Worker $defaultWorker)
|
||||
public function __construct(LoggerInterface $logger, protected Service\Job $jobService, Worker $defaultWorker,
|
||||
protected int $maxRetries = 5)
|
||||
{
|
||||
parent::__construct($logger);
|
||||
$this->register('default', $defaultWorker);
|
||||
@ -94,9 +95,23 @@ class Queue extends Ideal\Service
|
||||
|
||||
$errors = [];
|
||||
foreach ($jobs as $job) {
|
||||
if ($job->retries >= $this->maxRetries) {
|
||||
try {
|
||||
$this->jobService->remove($job);
|
||||
} catch (Read | Delete $exception) {
|
||||
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
$this->runJob($job->id, $request);
|
||||
} catch (Exception) {
|
||||
$job->retries ++;
|
||||
try {
|
||||
$this->jobService->update($job);
|
||||
} catch (Read | Update $exception) {
|
||||
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
|
||||
}
|
||||
$errors []= $job->id;
|
||||
}
|
||||
}
|
||||
|
@ -12,14 +12,18 @@ class Redis
|
||||
/**
|
||||
* @param string $name
|
||||
* @return string|null
|
||||
* @throws Exception|ConnectionException
|
||||
* @throws Exception
|
||||
*/
|
||||
public function get(string $name): ?string
|
||||
{
|
||||
if (!$this->client->exists($name)) {
|
||||
throw new Exception($name);
|
||||
}
|
||||
return $this->client->get($name);
|
||||
try {
|
||||
return $this->client->get($name);
|
||||
} catch (ConnectionException $exception) {
|
||||
throw new Exception($exception->getMessage(), $exception->getCode(), $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -27,7 +31,6 @@ class Redis
|
||||
* @param mixed $value
|
||||
* @param int $expirationTTL
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function set(string $name, mixed $value, int $expirationTTL = 60 * 60 * 24): void
|
||||
{
|
||||
|
Reference in New Issue
Block a user