Merge remote-tracking branch 'origin/feature/cierres' into feature/cierres
This commit is contained in:
@ -2,10 +2,12 @@
|
||||
"name": "incoviba/cli",
|
||||
"type": "project",
|
||||
"require": {
|
||||
"ext-sockets": "*",
|
||||
"dragonmantank/cron-expression": "^3.4",
|
||||
"guzzlehttp/guzzle": "^7.8",
|
||||
"hollodotme/fast-cgi-client": "^3.1",
|
||||
"monolog/monolog": "^3.5",
|
||||
"pda/pheanstalk": "^7.0",
|
||||
"php-di/php-di": "^7.0",
|
||||
"predis/predis": "^3.0",
|
||||
"symfony/console": "^6.3"
|
||||
|
@ -6,8 +6,13 @@ then
|
||||
then
|
||||
CMD=$1
|
||||
shift
|
||||
$CMD -c "$@"
|
||||
exit
|
||||
if [[ $# -gt 0 ]]
|
||||
then
|
||||
$CMD -c "$@"
|
||||
exit 0
|
||||
fi
|
||||
$CMD
|
||||
exit 0
|
||||
fi
|
||||
fi
|
||||
|
||||
|
@ -16,4 +16,15 @@ return [
|
||||
}
|
||||
return new Predis\Client($options);
|
||||
},
|
||||
Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) {
|
||||
return Pheanstalk\Pheanstalk::create(
|
||||
$container->get('BEANSTALKD_HOST'),
|
||||
$container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300
|
||||
);
|
||||
},
|
||||
Incoviba\Service\MQTT\MQTTInterface::class => function(ContainerInterface $container) {
|
||||
$service = new Incoviba\Service\MQTT($container->get(Psr\Log\LoggerInterface::class));
|
||||
$service->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class));
|
||||
return $service;
|
||||
}
|
||||
];
|
||||
|
@ -12,37 +12,11 @@ class Pending extends Console\Command\Command
|
||||
parent::__construct($name);
|
||||
}
|
||||
|
||||
protected function configure(): void
|
||||
{
|
||||
$this->addOption('full', 'f', Console\Input\InputOption::VALUE_NONE, 'Full output');
|
||||
}
|
||||
|
||||
protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
|
||||
{
|
||||
$jobs = $this->jobService->getPending();
|
||||
$jobCount = count($jobs);
|
||||
$jobCount = $this->jobService->getPending();
|
||||
$output->writeln("Found {$jobCount} pending jobs");
|
||||
|
||||
if ($input->getOption('full') and $jobCount > 0) {
|
||||
$io = new Console\Style\SymfonyStyle($input, $output);
|
||||
|
||||
$rows = [];
|
||||
foreach ($jobs as $job) {
|
||||
$retries = $job['retries'] ?? 0;
|
||||
$updated = $job['updated_at'] ?? '';
|
||||
|
||||
$rows[] = [
|
||||
$job['id'],
|
||||
$job['created_at'],
|
||||
$job['configuration']['type'],
|
||||
$retries,
|
||||
$updated
|
||||
];
|
||||
}
|
||||
|
||||
$io->table(['ID', 'Created', 'Type', 'Retries', 'Updated'], $rows);
|
||||
}
|
||||
|
||||
return self::SUCCESS;
|
||||
}
|
||||
}
|
||||
|
@ -9,22 +9,16 @@ use Incoviba\Service;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console;
|
||||
|
||||
#[Console\Attribute\AsCommand(name: 'jobs:run', description: 'Run jobs')]
|
||||
#[Console\Attribute\AsCommand(name: 'jobs:run', description: 'Run job')]
|
||||
class Run extends Console\Command\Command
|
||||
{
|
||||
public function __construct(protected Service\FastCGI $fastcgi, protected LoggerInterface $logger,
|
||||
protected Service\Job $jobService,
|
||||
protected DateTimeZone $timeZone, ?string $name = null)
|
||||
{
|
||||
parent::__construct($name);
|
||||
}
|
||||
|
||||
protected function configure(): void
|
||||
{
|
||||
$this->addArgument('job_ids',
|
||||
Console\Input\InputArgument::IS_ARRAY | Console\Input\InputArgument::REQUIRED, 'Job IDs');
|
||||
}
|
||||
|
||||
protected array $output = [];
|
||||
public function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
|
||||
{
|
||||
try {
|
||||
@ -33,44 +27,18 @@ class Run extends Console\Command\Command
|
||||
$now = new DateTimeImmutable();
|
||||
}
|
||||
|
||||
$jobIds = $input->getArgument('job_ids');
|
||||
$jobCount = count($jobIds);
|
||||
|
||||
$this->pushOutput('top', ['message' => "[{$now->format('Y-m-d H:i:s e')}] Running {$jobCount} jobs..."]);
|
||||
$this->pushOutput('bottom', ['table' => [
|
||||
['Job IDs'],
|
||||
array_map(function($row) {return [$row];},$jobIds)
|
||||
]]);
|
||||
$this->pushOutput('top', ['progress' => $jobCount]);
|
||||
$result = $this->runJobs($jobIds);
|
||||
$this->pushOutput('top', ['progress' => 'finish']);
|
||||
|
||||
$this->writeOutput($input, $output);
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
protected function runJobs(array $jobIds): int
|
||||
{
|
||||
$pendingJobs = [];
|
||||
foreach ($jobIds as $jobId) {
|
||||
if (!$this->runJob($jobId)) {
|
||||
$pendingJobs []= $jobId;
|
||||
}
|
||||
if ($this->jobService->getPending() === 0) {
|
||||
$output->writeln("[{$now->format('Y-m-d H:i:s e')}] No pending jobs to run.");
|
||||
return self::SUCCESS;
|
||||
}
|
||||
$result = $this->getResponses();
|
||||
|
||||
if (count($pendingJobs) > 0) {
|
||||
if ($this->runJobs($pendingJobs) === self::FAILURE) {
|
||||
$result = self::FAILURE;
|
||||
}
|
||||
}
|
||||
return $result;
|
||||
$output->writeln("[{$now->format('Y-m-d H:i:s e')}] Running Ready Job...");
|
||||
$this->runJob();
|
||||
return $this->getResponses();
|
||||
}
|
||||
protected function runJob(int $jobId): bool
|
||||
protected function runJob(): bool
|
||||
{
|
||||
$uri = "/api/queue/run/{$jobId}";
|
||||
$this->pushOutput('bottom', ['message' => "GET {$uri}"]);
|
||||
$uri = "/api/queue/run";
|
||||
|
||||
try {
|
||||
$this->fastcgi->get($uri);
|
||||
@ -85,7 +53,6 @@ class Run extends Console\Command\Command
|
||||
$result = self::SUCCESS;
|
||||
$responses = $this->fastcgi->awaitResponses();
|
||||
foreach ($responses as $response) {
|
||||
$this->pushOutput('top', ['progress' => 'advance']);
|
||||
if ($response->getError() !== '') {
|
||||
$this->logger->error("Error running job", [
|
||||
'error' => $response->getError(),
|
||||
@ -93,100 +60,8 @@ class Run extends Console\Command\Command
|
||||
'headers' => $response->getHeaders(),
|
||||
]);
|
||||
$result = self::FAILURE;
|
||||
continue;
|
||||
}
|
||||
$this->pushOutput('bottom', ['message' => $response->getBody()]);
|
||||
}
|
||||
return $result;
|
||||
}
|
||||
|
||||
protected function pushOutput(string $section, array $configuration): void
|
||||
{
|
||||
if (!isset($this->output[$section])) {
|
||||
$this->output[$section] = [];
|
||||
}
|
||||
foreach ($configuration as $key => $value) {
|
||||
if (!isset($this->output[$section][$key])) {
|
||||
$this->output[$section][$key] = [];
|
||||
}
|
||||
$this->output[$section][$key] []= $value;
|
||||
}
|
||||
if (isset($this->output[$section]['progress'])) {
|
||||
usort($this->output[$section]['progress'], function($a, $b) {
|
||||
if ($a === $b) {
|
||||
return 0;
|
||||
}
|
||||
if (is_int($a)) {
|
||||
return -1;
|
||||
}
|
||||
if (is_int($b)) {
|
||||
return 1;
|
||||
}
|
||||
if ($a === 'finish') {
|
||||
return 1;
|
||||
}
|
||||
if ($b === 'finish') {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
}
|
||||
}
|
||||
protected function writeOutput(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): void
|
||||
{
|
||||
$sectionNames = array_keys($this->output);
|
||||
$ios = [];
|
||||
foreach ($sectionNames as $sectionName) {
|
||||
$section = $output->section();
|
||||
$ios[$sectionName] = new Console\Style\SymfonyStyle($input, $section);
|
||||
}
|
||||
|
||||
foreach ($this->output as $sectionName => $configurations) {
|
||||
$io = $ios[$sectionName];
|
||||
$this->writeSection($io, $configurations);
|
||||
}
|
||||
}
|
||||
protected function writeSection(Console\Style\SymfonyStyle $io, array $configurations): void
|
||||
{
|
||||
if (array_key_exists('table', $configurations)) {
|
||||
$this->writeTables($io, $configurations['table']);
|
||||
}
|
||||
if (array_key_exists('progress', $configurations)) {
|
||||
$this->writeProgress($io, $configurations['progress']);
|
||||
}
|
||||
if (array_key_exists('message', $configurations)) {
|
||||
$this->writeMessages($io, $configurations['message']);
|
||||
}
|
||||
}
|
||||
protected function writeTables(Console\Style\SymfonyStyle $io, array $tableConfigurations): void
|
||||
{
|
||||
foreach ($tableConfigurations as $tableData) {
|
||||
$io->table(...$tableData);
|
||||
}
|
||||
}
|
||||
protected function writeMessages(Console\Style\SymfonyStyle $io, array $messages): void
|
||||
{
|
||||
foreach ($messages as $message) {
|
||||
$io->writeln($message);
|
||||
}
|
||||
}
|
||||
protected function writeProgress(Console\Style\SymfonyStyle $io, array $progresses): void
|
||||
{
|
||||
$progressBar = null;
|
||||
foreach ($progresses as $progress) {
|
||||
if ($progress === 'advance' and $progressBar !== null) {
|
||||
$progressBar->advance();
|
||||
continue;
|
||||
}
|
||||
if ($progress === 'finish' and $progressBar !== null) {
|
||||
$progressBar->finish();
|
||||
continue;
|
||||
}
|
||||
if (in_array($progress, ['finish', 'advance'])) {
|
||||
continue;
|
||||
}
|
||||
$progressBar = $io->createProgressBar($progress);
|
||||
}
|
||||
$io->newLine();
|
||||
}
|
||||
}
|
||||
|
@ -35,85 +35,22 @@ class Queue extends Command
|
||||
];
|
||||
$io = new Console\Style\SymfonyStyle($input, $this->sections['top']);
|
||||
$now = new DateTimeImmutable('now', $this->timezone);
|
||||
$io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue...");
|
||||
|
||||
$jobs = $this->getJobs();
|
||||
$jobCount = count($jobs);
|
||||
if ($jobCount === 0) {
|
||||
return Console\Command\Command::SUCCESS;
|
||||
if ($this->jobService->getPending() === 0) {
|
||||
$io->success("[{$now->format('Y-m-d H:i:s e')}] Queue is empty");
|
||||
return self::SUCCESS;
|
||||
}
|
||||
|
||||
$io->writeln("Found {$jobCount} jobs to run");
|
||||
$result = $this->runJob($jobs[0]);
|
||||
/*$result = $this->runJobs($io, $jobs);
|
||||
foreach ($this->outputs as $output) {
|
||||
$this->sections['bottom']->writeln($output);
|
||||
}*/
|
||||
return $result;
|
||||
$io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue...");
|
||||
return $this->runJob();
|
||||
}
|
||||
|
||||
protected array $sections;
|
||||
|
||||
protected function getJobs(): array
|
||||
{
|
||||
$this->logger->debug("Getting jobs");
|
||||
$jobs = $this->jobService->getPending();
|
||||
$jobCount = count($jobs);
|
||||
if ($jobCount === 0) {
|
||||
$this->logger->debug("No jobs to run");
|
||||
return [];
|
||||
}
|
||||
$this->logger->debug("Found {$jobCount} jobs");
|
||||
return array_column($jobs, 'id');
|
||||
}
|
||||
protected function runJobs(Console\Style\SymfonyStyle $io, array $jobs): int
|
||||
{
|
||||
$chunks = array_chunk($jobs, $this->batchSize);
|
||||
$chunkCount = count($chunks);
|
||||
$result = self::SUCCESS;
|
||||
$progress1 = $io->createProgressBar($chunkCount);
|
||||
$progress1->start();
|
||||
foreach ($chunks as $chunk) {
|
||||
if ($this->runJobBatch($chunk) === self::FAILURE) {
|
||||
$result = self::FAILURE;
|
||||
}
|
||||
$progress1->advance();
|
||||
}
|
||||
$progress1->finish();
|
||||
return $result;
|
||||
}
|
||||
|
||||
protected array $outputs = [];
|
||||
protected function runJobBatch(array $jobIds): int
|
||||
protected function runJob(): int
|
||||
{
|
||||
$baseCommand = "{$this->baseCommand} jobs:run";
|
||||
|
||||
$jobsLine = implode(' ', $jobIds);
|
||||
$command = "{$baseCommand} {$jobsLine}";
|
||||
|
||||
try {
|
||||
exec($command, $output, $resultCode);
|
||||
$this->outputs []= $output;
|
||||
} catch (Throwable $exception) {
|
||||
$this->logger->error("Failed to run command", [
|
||||
'command' => $command,
|
||||
'exception' => $exception
|
||||
]);
|
||||
return self::FAILURE;
|
||||
}
|
||||
if ($resultCode !== 0) {
|
||||
$this->logger->error("Failed to run command", [
|
||||
'command' => $command,
|
||||
'result_code' => $resultCode
|
||||
]);
|
||||
return self::FAILURE;
|
||||
}
|
||||
return self::SUCCESS;
|
||||
}
|
||||
protected function runJob(int $jobId): int
|
||||
{
|
||||
$baseCommand = "{$this->baseCommand} jobs:run";
|
||||
$command = "{$baseCommand} {$jobId}";
|
||||
$command = "{$baseCommand}";
|
||||
try {
|
||||
exec($command, $output, $resultCode);
|
||||
$this->outputs []= $output;
|
||||
|
18
cli/src/Exception/MQTT.php
Normal file
18
cli/src/Exception/MQTT.php
Normal file
@ -0,0 +1,18 @@
|
||||
<?php
|
||||
namespace Incoviba\Exception;
|
||||
|
||||
use Throwable;
|
||||
use Exception;
|
||||
|
||||
abstract class MQTT extends Exception
|
||||
{
|
||||
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
|
||||
{
|
||||
$baseCode = 700;
|
||||
$code = $baseCode + $code;
|
||||
if ($message == "") {
|
||||
$message = "MQTT Exception";
|
||||
}
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
||||
}
|
15
cli/src/Exception/MQTT/Create.php
Normal file
15
cli/src/Exception/MQTT/Create.php
Normal file
@ -0,0 +1,15 @@
|
||||
<?php
|
||||
namespace Incoviba\Exception\MQTT;
|
||||
|
||||
use Throwable;
|
||||
use Incoviba\Exception\MQTT;
|
||||
|
||||
class Create extends MQTT
|
||||
{
|
||||
public function __construct(string $tube = '', string $payload = '', ?Throwable $previous = null)
|
||||
{
|
||||
$message = "Unable to create MQTT message: {$payload} in tube {$tube}";
|
||||
$code = 11;
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
||||
}
|
15
cli/src/Exception/MQTT/Delete.php
Normal file
15
cli/src/Exception/MQTT/Delete.php
Normal file
@ -0,0 +1,15 @@
|
||||
<?php
|
||||
namespace Incoviba\Exception\MQTT;
|
||||
|
||||
use Throwable;
|
||||
use Incoviba\Exception\MQTT;
|
||||
|
||||
class Delete extends MQTT
|
||||
{
|
||||
public function __construct(string $tube, int $jobId, ?Throwable $previous = null)
|
||||
{
|
||||
$message = "Could not delete job {$jobId} in tube {$tube}";
|
||||
$code = 13;
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
||||
}
|
15
cli/src/Exception/MQTT/Read.php
Normal file
15
cli/src/Exception/MQTT/Read.php
Normal file
@ -0,0 +1,15 @@
|
||||
<?php
|
||||
namespace Incoviba\Exception\MQTT;
|
||||
|
||||
use Throwable;
|
||||
use Incoviba\Exception\MQTT;
|
||||
|
||||
class Read extends MQTT
|
||||
{
|
||||
public function __construct(string $tube, ?Throwable $previous = null)
|
||||
{
|
||||
$message = "Error reading from tube {$tube}";
|
||||
$code = 10;
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
||||
}
|
14
cli/src/Exception/MQTT/UnknownTransport.php
Normal file
14
cli/src/Exception/MQTT/UnknownTransport.php
Normal file
@ -0,0 +1,14 @@
|
||||
<?php
|
||||
namespace Incoviba\Exception\MQTT;
|
||||
|
||||
use Incoviba\Exception\MQTT;
|
||||
use Throwable;
|
||||
|
||||
class UnknownTransport extends MQTT
|
||||
{
|
||||
public function __construct(string $transportName, ?Throwable $previous = null)
|
||||
{
|
||||
$message = "Unknown transport {$transportName}";
|
||||
parent::__construct($message, 1, $previous);
|
||||
}
|
||||
}
|
16
cli/src/Exception/MQTT/Update.php
Normal file
16
cli/src/Exception/MQTT/Update.php
Normal file
@ -0,0 +1,16 @@
|
||||
<?php
|
||||
namespace Incoviba\Exception\MQTT;
|
||||
|
||||
use Throwable;
|
||||
use Incoviba\Exception\MQTT;
|
||||
|
||||
class Update extends MQTT
|
||||
{
|
||||
public function __construct(string $tube, string $payload, ?int $jobId = null, ?Throwable $previous = null)
|
||||
{
|
||||
$jobString = $jobId !== null ? " with jobId {$jobId}" : '';
|
||||
$message = "Could not update job{$jobString} with {$payload} in tube {$tube}";
|
||||
$code = 12;
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
||||
}
|
9
cli/src/Service.php
Normal file
9
cli/src/Service.php
Normal file
@ -0,0 +1,9 @@
|
||||
<?php
|
||||
namespace Incoviba;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
abstract class Service
|
||||
{
|
||||
public function __construct(protected LoggerInterface $logger) {}
|
||||
}
|
@ -5,27 +5,22 @@ use DateInvalidTimeZoneException;
|
||||
use DateMalformedStringException;
|
||||
use DateTimeImmutable;
|
||||
use DateTimeZone;
|
||||
use Exception;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Predis\Connection\ConnectionException;
|
||||
use Incoviba\Exception\MQTT as MQTTException;
|
||||
use Incoviba\Service\MQTT\MQTTInterface;
|
||||
|
||||
class Job
|
||||
{
|
||||
public function __construct(protected LoggerInterface $logger, protected Redis $redisService)
|
||||
{
|
||||
$this->redisKey = 'jobs';
|
||||
}
|
||||
public function __construct(protected LoggerInterface $logger, protected MQTTInterface $mqttService) {}
|
||||
protected string $redisKey;
|
||||
|
||||
public function getPending(): array
|
||||
public function getPending(): int
|
||||
{
|
||||
try {
|
||||
$jobs = $this->redisService->get($this->redisKey);
|
||||
return json_decode($jobs, true);
|
||||
} catch (ConnectionException|Exception $exception) {
|
||||
$exception = new Exception("Could not read {$this->redisKey} from Redis", $exception->getCode(), $exception);
|
||||
return $this->mqttService->pending();
|
||||
} catch (MQTTException $exception) {
|
||||
$this->logger->warning($exception->getMessage(), ['exception' => $exception]);
|
||||
return [];
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,9 +39,11 @@ class Job
|
||||
'updated_at' => null,
|
||||
'retries' => 0
|
||||
];
|
||||
$jobs = $this->getPending();
|
||||
$jobs []= $data;
|
||||
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
|
||||
try {
|
||||
$this->mqttService->set(json_encode($data));
|
||||
} catch (MQTTException $exception) {
|
||||
$this->logger->warning($exception->getMessage(), ['exception' => $exception]);
|
||||
}
|
||||
return $data;
|
||||
}
|
||||
}
|
||||
|
124
cli/src/Service/MQTT.php
Normal file
124
cli/src/Service/MQTT.php
Normal file
@ -0,0 +1,124 @@
|
||||
<?php
|
||||
namespace Incoviba\Service;
|
||||
|
||||
use Incoviba\Exception\MQTT as MQTTException;
|
||||
use Incoviba\Service;
|
||||
use Incoviba\Service\MQTT\MQTTInterface;
|
||||
|
||||
class MQTT extends Service implements MQTTInterface
|
||||
{
|
||||
protected array $transports = [];
|
||||
public function register(string $name, MQTTInterface $transport): self
|
||||
{
|
||||
$this->transports[$name] = $transport;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $payload
|
||||
* @param int $delay
|
||||
* @param string|null $transportName
|
||||
* @return $this
|
||||
* @throws MQTTException\UnknownTransport
|
||||
* @throws MQTTException\Create
|
||||
*/
|
||||
public function set(string $payload, int $delay = 0, ?string $transportName = null): self
|
||||
{
|
||||
$transport = $this->getTransport($transportName);
|
||||
$transport->set($payload, $delay);
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string|null $transportName
|
||||
* @return int
|
||||
* @throws MQTTException\UnknownTransport
|
||||
* @throws MQTTException\Read
|
||||
*/
|
||||
public function pending(?string $transportName = null): int
|
||||
{
|
||||
$transport = $this->getTransport($transportName);
|
||||
return $transport->pending();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @param string|null $transportName
|
||||
* @return bool
|
||||
* @throws MQTTException\UnknownTransport
|
||||
* @throws MQTTException\Read
|
||||
*/
|
||||
public function exists(?int $jobId = null, ?string $transportName = null): bool
|
||||
{
|
||||
$transport = $this->getTransport($transportName);
|
||||
return $transport->exists($jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @param string|null $transportName
|
||||
* @return string
|
||||
* @throws MQTTException\UnknownTransport
|
||||
* @throws MQTTException\Read
|
||||
*/
|
||||
public function get(?int $jobId = null, ?string $transportName = null): string
|
||||
{
|
||||
$transport = $this->getTransport($transportName);
|
||||
return $transport->get($jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $newPayload
|
||||
* @param int|null $jobId
|
||||
* @param string|null $transportName
|
||||
* @return $this
|
||||
* @throws MQTTException\UnknownTransport
|
||||
* @throws MQTTException\Update
|
||||
*/
|
||||
public function update(string $newPayload, ?int $jobId = null, ?string $transportName = null): self
|
||||
{
|
||||
$transport = $this->getTransport($transportName);
|
||||
$transport->update($newPayload, $jobId);
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @param string|null $transportName
|
||||
* @return $this
|
||||
* @throws MQTTException\UnknownTransport
|
||||
* @throws MQTTException\Delete
|
||||
*/
|
||||
public function remove(?int $jobId = null, ?string $transportName = null): self
|
||||
{
|
||||
$transport = $this->getTransport($transportName);
|
||||
$transport->remove($jobId);
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string|null $transportName
|
||||
* @return mixed
|
||||
* @throws MQTTException\UnknownTransport
|
||||
*/
|
||||
protected function getTransport(?string $transportName): mixed
|
||||
{
|
||||
if (count($this->transports) === 0) {
|
||||
throw new MQTTException\UnknownTransport('');
|
||||
}
|
||||
if ($transportName === null) {
|
||||
if (array_key_exists('default', $this->transports)) {
|
||||
$transportName = 'default';
|
||||
} else {
|
||||
$transportName = array_keys($this->transports)[0];
|
||||
}
|
||||
}
|
||||
if (!array_key_exists($transportName, $this->transports)) {
|
||||
if ($transportName === null) {
|
||||
$transportName = '';
|
||||
}
|
||||
throw new MQTTException\UnknownTransport($transportName);
|
||||
}
|
||||
return $this->transports[$transportName];
|
||||
}
|
||||
}
|
127
cli/src/Service/MQTT/Beanstalkd.php
Normal file
127
cli/src/Service/MQTT/Beanstalkd.php
Normal file
@ -0,0 +1,127 @@
|
||||
<?php
|
||||
namespace Incoviba\Service\MQTT;
|
||||
|
||||
use Exception;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use xobotyi\beansclient;
|
||||
use Incoviba\Service;
|
||||
use Incoviba\Exception\MQTT;
|
||||
|
||||
class Beanstalkd extends Service implements MQTTInterface
|
||||
{
|
||||
const string DEFAULT_TUBE = 'default';
|
||||
const int DEFAULT_TTR = 30;
|
||||
const int DEFAULT_PRIORITY = 1_024;
|
||||
|
||||
public function __construct(LoggerInterface $logger, protected beansclient\Client $client,
|
||||
protected string $tube = self::DEFAULT_TUBE,
|
||||
protected int $ttr = self::DEFAULT_TTR,
|
||||
protected int $priority = self::DEFAULT_PRIORITY)
|
||||
{
|
||||
parent::__construct($logger);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $payload
|
||||
* @param int $delay
|
||||
* @return self
|
||||
* @throws MQTT\Create
|
||||
*/
|
||||
public function set(string $payload, int $delay = 60): self
|
||||
{
|
||||
try {
|
||||
$this->client->put($payload, $this->ttr, $this->priority, $delay);
|
||||
} catch (Exception $exception) {
|
||||
throw new MQTT\Create($this->tube, $payload, $exception);
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
* @throws MQTT\Read
|
||||
*/
|
||||
public function pending(): int
|
||||
{
|
||||
try {
|
||||
$stats = $this->client
|
||||
->statsTube($this->tube);
|
||||
} catch (Exception $exception) {
|
||||
throw new MQTT\Read($this->tube, $exception);
|
||||
}
|
||||
if (!array_key_exists('current-jobs-ready', $stats)) {
|
||||
throw new MQTT\Read($this->tube);
|
||||
}
|
||||
return $stats['current-jobs-ready'];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @return bool
|
||||
* @throws MQTT\Read
|
||||
*/
|
||||
public function exists(?int $jobId = null): bool
|
||||
{
|
||||
return $this->pending() > 0;
|
||||
}
|
||||
|
||||
protected int $currentJobId;
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @return string
|
||||
* @throws MQTT\Read
|
||||
*/
|
||||
public function get(?int $jobId = null): string
|
||||
{
|
||||
try {
|
||||
if ($jobId !== null) {
|
||||
$job = (object) $this->client
|
||||
->reserveJob($jobId);
|
||||
} else {
|
||||
$job = (object) $this->client
|
||||
->reserve();
|
||||
}
|
||||
} catch (Exception $exception) {
|
||||
throw new MQTT\Read($this->tube, $exception);
|
||||
}
|
||||
$this->currentJobId = $job->id;
|
||||
return $job->payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $newPayload
|
||||
* @param int|null $jobId
|
||||
* @return self
|
||||
* @throws MQTT\Update
|
||||
*/
|
||||
public function update(string $newPayload, ?int $jobId = null): self
|
||||
{
|
||||
try {
|
||||
$this->remove($jobId);
|
||||
$this->set($newPayload);
|
||||
} catch (MQTT\Delete | MQTT\Create $exception) {
|
||||
throw new MQTT\Update($this->tube, $newPayload, $jobId, $exception);
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @return self
|
||||
* @throws MQTT\Delete
|
||||
*/
|
||||
public function remove(?int $jobId = null): self
|
||||
{
|
||||
try {
|
||||
if ($jobId === null) {
|
||||
$jobId = $this->currentJobId;
|
||||
}
|
||||
$this->client
|
||||
->delete($jobId);
|
||||
} catch (Exception $exception) {
|
||||
throw new MQTT\Delete($this->tube, $jobId, $exception);
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
}
|
12
cli/src/Service/MQTT/MQTTInterface.php
Normal file
12
cli/src/Service/MQTT/MQTTInterface.php
Normal file
@ -0,0 +1,12 @@
|
||||
<?php
|
||||
namespace Incoviba\Service\MQTT;
|
||||
|
||||
interface MQTTInterface
|
||||
{
|
||||
public function set(string $payload, int $delay = 0): self;
|
||||
public function pending(): int;
|
||||
public function exists(?int $jobId = null): bool;
|
||||
public function get(?int $jobId = null): string;
|
||||
public function update(string $newPayload, ?int $jobId = null): self;
|
||||
public function remove(?int $jobId = null): self;
|
||||
}
|
65
cli/src/Service/MQTT/Pheanstalk.php
Normal file
65
cli/src/Service/MQTT/Pheanstalk.php
Normal file
@ -0,0 +1,65 @@
|
||||
<?php
|
||||
namespace Incoviba\Service\MQTT;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Pheanstalk as PBA;
|
||||
use Incoviba\Service;
|
||||
|
||||
class Pheanstalk extends Service implements MQTTInterface
|
||||
{
|
||||
public function __construct(LoggerInterface $logger, protected PBA\Pheanstalk $client, string $tubeName = 'default')
|
||||
{
|
||||
parent::__construct($logger);
|
||||
$this->tube = new PBA\Values\TubeName($tubeName);
|
||||
}
|
||||
|
||||
protected PBA\Values\TubeName $tube;
|
||||
|
||||
public function set(string $payload, int $delay = 0): self
|
||||
{
|
||||
$this->client->useTube($this->tube);
|
||||
$this->client->put($payload, $delay);
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function pending(): int
|
||||
{
|
||||
$stats = $this->client->statsTube($this->tube);
|
||||
return $stats->currentJobsReady;
|
||||
}
|
||||
|
||||
public function exists(?int $jobId = null): bool
|
||||
{
|
||||
return $this->pending() > 0;
|
||||
}
|
||||
protected int $currentJobId;
|
||||
public function get(?int $jobId = null): string
|
||||
{
|
||||
$this->client->watch($this->tube);
|
||||
if ($jobId !== null) {
|
||||
$jobId = new PBA\Values\JobId($jobId);
|
||||
$job = $this->client->reserveJob($jobId);
|
||||
} else {
|
||||
$job = $this->client->reserve();
|
||||
}
|
||||
$this->currentJobId = $job->getId();
|
||||
return $job->getData();
|
||||
}
|
||||
|
||||
public function update(string $newPayload, ?int $jobId = null): self
|
||||
{
|
||||
$this->remove($jobId);
|
||||
$this->set($newPayload);
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function remove(?int $jobId = null): self
|
||||
{
|
||||
if ($jobId === null) {
|
||||
$jobId = $this->currentJobId;
|
||||
}
|
||||
$this->client->watch($this->tube);
|
||||
$this->client->delete(new PBA\Values\JobId($jobId));
|
||||
return $this;
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user