diff --git a/app/composer.json b/app/composer.json index b9dcc81c..7f97281 100644 --- a/app/composer.json +++ b/app/composer.json @@ -8,11 +8,13 @@ "ext-gd": "*", "ext-openssl": "*", "ext-pdo": "*", + "ext-sockets": "*", "berrnd/slim-blade-view": "^1", "guzzlehttp/guzzle": "^7", "monolog/monolog": "^3", "nyholm/psr7": "^1", "nyholm/psr7-server": "^1", + "pda/pheanstalk": "^7.0", "php-di/php-di": "^7", "php-di/slim-bridge": "^3", "phpoffice/phpspreadsheet": "^3", @@ -20,8 +22,7 @@ "robmorgan/phinx": "^0.16", "slim/slim": "^4", "symfony/string": "^7.2", - "tedivm/jshrink": "^1.7", - "xobotyi/beansclient": "^1.0" + "tedivm/jshrink": "^1.7" }, "require-dev": { "fakerphp/faker": "^1", diff --git a/app/setup/setups/services.php b/app/setup/setups/services.php index 8e77dfe..9cdecfc 100644 --- a/app/setup/setups/services.php +++ b/app/setup/setups/services.php @@ -162,15 +162,15 @@ return [ ->register('subscription', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Subscription::class)) ->register('invoice', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Invoice::class)); }, - xobotyi\beansclient\Interfaces\ConnectionInterface::class => function(ContainerInterface $container) { - return new xobotyi\beansclient\Connection( + Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) { + return Pheanstalk\Pheanstalk::create( $container->get('BEANSTALKD_HOST'), $container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300 ); }, Incoviba\Service\MQTT::class => function(ContainerInterface $container) { return new Incoviba\Service\MQTT() - ->register('beanstalkd', $container->get(Incoviba\Service\MQTT\Beanstalkd::class)); + ->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class)); }, Incoviba\Service\Queue::class => function(ContainerInterface $container) { return new Incoviba\Service\Queue( diff --git a/app/src/Service/MQTT/Pheanstalk.php b/app/src/Service/MQTT/Pheanstalk.php new file mode 100644 index 0000000..a1e0927 --- /dev/null +++ b/app/src/Service/MQTT/Pheanstalk.php @@ -0,0 +1,59 @@ +tube = new PBA\Values\TubeName($tubeName); + } + + protected PBA\Values\TubeName $tube; + + public function set(string $value, int $delay = 0): self + { + $this->client->useTube($this->tube); + $this->client->put($value, self::DEFAULT_PRIORITY, $delay, self::DEFAULT_TTR); + return $this; + } + public function exists(): bool + { + $stats = $this->client->statsTube($this->tube); + return $stats->currentJobsReady > 0; + } + protected int $currentJobId; + public function get(): string + { + $this->client->useTube($this->tube); + $job = $this->client->reserve(); + $this->currentJobId = $job->getId(); + return $job->getData(); + } + public function update(string $newPayload, ?int $jobId = null): self + { + if ($jobId === null) { + $jobId = $this->currentJobId; + } + $this->remove($jobId); + $this->set($newPayload); + return $this; + } + public function remove(?int $jobId = null): self + { + if ($jobId === null) { + $jobId = $this->currentJobId; + } + $this->client->useTube($this->tube); + $this->client->delete(new PBA\Values\JobId($jobId)); + return $this; + } +} diff --git a/cli/composer.json b/cli/composer.json index a54ff9b..c22433f 100644 --- a/cli/composer.json +++ b/cli/composer.json @@ -2,10 +2,12 @@ "name": "incoviba/cli", "type": "project", "require": { + "ext-sockets": "*", "dragonmantank/cron-expression": "^3.4", "guzzlehttp/guzzle": "^7.8", "hollodotme/fast-cgi-client": "^3.1", "monolog/monolog": "^3.5", + "pda/pheanstalk": "^7.0", "php-di/php-di": "^7.0", "predis/predis": "^3.0", "symfony/console": "^6.3" diff --git a/cli/setup/setups/services.php b/cli/setup/setups/services.php index 2a9bc57..b666a8b 100644 --- a/cli/setup/setups/services.php +++ b/cli/setup/setups/services.php @@ -16,4 +16,15 @@ return [ } return new Predis\Client($options); }, + Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) { + return Pheanstalk\Pheanstalk::create( + $container->get('BEANSTALKD_HOST'), + $container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300 + ); + }, + Incoviba\Service\MQTT\MQTTInterface::class => function(ContainerInterface $container) { + $service = new Incoviba\Service\MQTT($container->get(Psr\Log\LoggerInterface::class)); + $service->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class)); + return $service; + } ]; diff --git a/cli/src/Command/Job/Pending.php b/cli/src/Command/Job/Pending.php index d3a79a4..718b375 100644 --- a/cli/src/Command/Job/Pending.php +++ b/cli/src/Command/Job/Pending.php @@ -12,37 +12,11 @@ class Pending extends Console\Command\Command parent::__construct($name); } - protected function configure(): void - { - $this->addOption('full', 'f', Console\Input\InputOption::VALUE_NONE, 'Full output'); - } - protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int { - $jobs = $this->jobService->getPending(); - $jobCount = count($jobs); + $jobCount = $this->jobService->getPending(); $output->writeln("Found {$jobCount} pending jobs"); - if ($input->getOption('full') and $jobCount > 0) { - $io = new Console\Style\SymfonyStyle($input, $output); - - $rows = []; - foreach ($jobs as $job) { - $retries = $job['retries'] ?? 0; - $updated = $job['updated_at'] ?? ''; - - $rows[] = [ - $job['id'], - $job['created_at'], - $job['configuration']['type'], - $retries, - $updated - ]; - } - - $io->table(['ID', 'Created', 'Type', 'Retries', 'Updated'], $rows); - } - return self::SUCCESS; } } diff --git a/cli/src/Command/Job/Run.php b/cli/src/Command/Job/Run.php index 97b033a..5222c14 100644 --- a/cli/src/Command/Job/Run.php +++ b/cli/src/Command/Job/Run.php @@ -13,6 +13,7 @@ use Symfony\Component\Console; class Run extends Console\Command\Command { public function __construct(protected Service\FastCGI $fastcgi, protected LoggerInterface $logger, + protected Service\Job $jobService, protected DateTimeZone $timeZone, ?string $name = null) { parent::__construct($name); @@ -26,6 +27,11 @@ class Run extends Console\Command\Command $now = new DateTimeImmutable(); } + if ($this->jobService->getPending() === 0) { + $output->writeln("[{$now->format('Y-m-d H:i:s e')}] No pending jobs to run."); + return self::SUCCESS; + } + $output->writeln("[{$now->format('Y-m-d H:i:s e')}] Running Ready Job..."); $this->runJob(); return $this->getResponses(); diff --git a/cli/src/Command/Queue.php b/cli/src/Command/Queue.php index e96427f..302ee0c 100644 --- a/cli/src/Command/Queue.php +++ b/cli/src/Command/Queue.php @@ -35,8 +35,13 @@ class Queue extends Command ]; $io = new Console\Style\SymfonyStyle($input, $this->sections['top']); $now = new DateTimeImmutable('now', $this->timezone); - $io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue..."); + if ($this->jobService->getPending() === 0) { + $io->success("[{$now->format('Y-m-d H:i:s e')}] Queue is empty"); + return self::SUCCESS; + } + + $io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue..."); return $this->runJob(); } diff --git a/cli/src/Exception/MQTT.php b/cli/src/Exception/MQTT.php new file mode 100644 index 0000000..0e59b39 --- /dev/null +++ b/cli/src/Exception/MQTT.php @@ -0,0 +1,18 @@ +redisKey = 'jobs'; - } + public function __construct(protected LoggerInterface $logger, protected MQTTInterface $mqttService) {} protected string $redisKey; - public function getPending(): array + public function getPending(): int { try { - $jobs = $this->redisService->get($this->redisKey); - return json_decode($jobs, true); - } catch (ConnectionException|Exception $exception) { - $exception = new Exception("Could not read {$this->redisKey} from Redis", $exception->getCode(), $exception); + return $this->mqttService->pending(); + } catch (MQTTException $exception) { $this->logger->warning($exception->getMessage(), ['exception' => $exception]); - return []; + return 0; } } @@ -44,9 +39,11 @@ class Job 'updated_at' => null, 'retries' => 0 ]; - $jobs = $this->getPending(); - $jobs []= $data; - $this->redisService->set($this->redisKey, json_encode($jobs), -1); + try { + $this->mqttService->set(json_encode($data)); + } catch (MQTTException $exception) { + $this->logger->warning($exception->getMessage(), ['exception' => $exception]); + } return $data; } } diff --git a/cli/src/Service/MQTT.php b/cli/src/Service/MQTT.php new file mode 100644 index 0000000..9b4980b --- /dev/null +++ b/cli/src/Service/MQTT.php @@ -0,0 +1,124 @@ +transports[$name] = $transport; + return $this; + } + + /** + * @param string $payload + * @param int $delay + * @param string|null $transportName + * @return $this + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Create + */ + public function set(string $payload, int $delay = 0, ?string $transportName = null): self + { + $transport = $this->getTransport($transportName); + $transport->set($payload, $delay); + return $this; + } + + /** + * @param string|null $transportName + * @return int + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Read + */ + public function pending(?string $transportName = null): int + { + $transport = $this->getTransport($transportName); + return $transport->pending(); + } + + /** + * @param int|null $jobId + * @param string|null $transportName + * @return bool + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Read + */ + public function exists(?int $jobId = null, ?string $transportName = null): bool + { + $transport = $this->getTransport($transportName); + return $transport->exists($jobId); + } + + /** + * @param int|null $jobId + * @param string|null $transportName + * @return string + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Read + */ + public function get(?int $jobId = null, ?string $transportName = null): string + { + $transport = $this->getTransport($transportName); + return $transport->get($jobId); + } + + /** + * @param string $newPayload + * @param int|null $jobId + * @param string|null $transportName + * @return $this + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Update + */ + public function update(string $newPayload, ?int $jobId = null, ?string $transportName = null): self + { + $transport = $this->getTransport($transportName); + $transport->update($newPayload, $jobId); + return $this; + } + + /** + * @param int|null $jobId + * @param string|null $transportName + * @return $this + * @throws MQTTException\UnknownTransport + * @throws MQTTException\Delete + */ + public function remove(?int $jobId = null, ?string $transportName = null): self + { + $transport = $this->getTransport($transportName); + $transport->remove($jobId); + return $this; + } + + /** + * @param string|null $transportName + * @return mixed + * @throws MQTTException\UnknownTransport + */ + protected function getTransport(?string $transportName): mixed + { + if (count($this->transports) === 0) { + throw new MQTTException\UnknownTransport(''); + } + if ($transportName === null) { + if (array_key_exists('default', $this->transports)) { + $transportName = 'default'; + } else { + $transportName = array_keys($this->transports)[0]; + } + } + if (!array_key_exists($transportName, $this->transports)) { + if ($transportName === null) { + $transportName = ''; + } + throw new MQTTException\UnknownTransport($transportName); + } + return $this->transports[$transportName]; + } +} diff --git a/cli/src/Service/MQTT/Beanstalkd.php b/cli/src/Service/MQTT/Beanstalkd.php new file mode 100644 index 0000000..6330ed6 --- /dev/null +++ b/cli/src/Service/MQTT/Beanstalkd.php @@ -0,0 +1,127 @@ +client->put($payload, $this->ttr, $this->priority, $delay); + } catch (Exception $exception) { + throw new MQTT\Create($this->tube, $payload, $exception); + } + return $this; + } + + /** + * @return int + * @throws MQTT\Read + */ + public function pending(): int + { + try { + $stats = $this->client + ->statsTube($this->tube); + } catch (Exception $exception) { + throw new MQTT\Read($this->tube, $exception); + } + if (!array_key_exists('current-jobs-ready', $stats)) { + throw new MQTT\Read($this->tube); + } + return $stats['current-jobs-ready']; + } + + /** + * @param int|null $jobId + * @return bool + * @throws MQTT\Read + */ + public function exists(?int $jobId = null): bool + { + return $this->pending() > 0; + } + + protected int $currentJobId; + + /** + * @param int|null $jobId + * @return string + * @throws MQTT\Read + */ + public function get(?int $jobId = null): string + { + try { + if ($jobId !== null) { + $job = (object) $this->client + ->reserveJob($jobId); + } else { + $job = (object) $this->client + ->reserve(); + } + } catch (Exception $exception) { + throw new MQTT\Read($this->tube, $exception); + } + $this->currentJobId = $job->id; + return $job->payload; + } + + /** + * @param string $newPayload + * @param int|null $jobId + * @return self + * @throws MQTT\Update + */ + public function update(string $newPayload, ?int $jobId = null): self + { + try { + $this->remove($jobId); + $this->set($newPayload); + } catch (MQTT\Delete | MQTT\Create $exception) { + throw new MQTT\Update($this->tube, $newPayload, $jobId, $exception); + } + return $this; + } + + /** + * @param int|null $jobId + * @return self + * @throws MQTT\Delete + */ + public function remove(?int $jobId = null): self + { + try { + if ($jobId === null) { + $jobId = $this->currentJobId; + } + $this->client + ->delete($jobId); + } catch (Exception $exception) { + throw new MQTT\Delete($this->tube, $jobId, $exception); + } + return $this; + } +} diff --git a/cli/src/Service/MQTT/MQTTInterface.php b/cli/src/Service/MQTT/MQTTInterface.php new file mode 100644 index 0000000..6e32de0 --- /dev/null +++ b/cli/src/Service/MQTT/MQTTInterface.php @@ -0,0 +1,12 @@ +tube = new PBA\Values\TubeName($tubeName); + } + + protected PBA\Values\TubeName $tube; + + public function set(string $payload, int $delay = 0): self + { + $this->client->useTube($this->tube); + $this->client->put($payload, $delay); + return $this; + } + + public function pending(): int + { + $stats = $this->client->statsTube($this->tube); + return $stats->currentJobsReady; + } + + public function exists(?int $jobId = null): bool + { + return $this->pending() > 0; + } + protected int $currentJobId; + public function get(?int $jobId = null): string + { + $this->client->watch($this->tube); + if ($jobId !== null) { + $jobId = new PBA\Values\JobId($jobId); + $job = $this->client->reserveJob($jobId); + } else { + $job = $this->client->reserve(); + } + $this->currentJobId = $job->getId(); + return $job->getData(); + } + + public function update(string $newPayload, ?int $jobId = null): self + { + $this->remove($jobId); + $this->set($newPayload); + return $this; + } + + public function remove(?int $jobId = null): self + { + if ($jobId === null) { + $jobId = $this->currentJobId; + } + $this->client->watch($this->tube); + $this->client->delete(new PBA\Values\JobId($jobId)); + return $this; + } +}