Beanstalkd -> Pheanstalk

This commit is contained in:
Juan Pablo Vial
2025-07-15 19:04:19 -04:00
parent 501151a90e
commit 79073ebeb7
20 changed files with 533 additions and 48 deletions

View File

@ -8,11 +8,13 @@
"ext-gd": "*", "ext-gd": "*",
"ext-openssl": "*", "ext-openssl": "*",
"ext-pdo": "*", "ext-pdo": "*",
"ext-sockets": "*",
"berrnd/slim-blade-view": "^1", "berrnd/slim-blade-view": "^1",
"guzzlehttp/guzzle": "^7", "guzzlehttp/guzzle": "^7",
"monolog/monolog": "^3", "monolog/monolog": "^3",
"nyholm/psr7": "^1", "nyholm/psr7": "^1",
"nyholm/psr7-server": "^1", "nyholm/psr7-server": "^1",
"pda/pheanstalk": "^7.0",
"php-di/php-di": "^7", "php-di/php-di": "^7",
"php-di/slim-bridge": "^3", "php-di/slim-bridge": "^3",
"phpoffice/phpspreadsheet": "^3", "phpoffice/phpspreadsheet": "^3",
@ -20,8 +22,7 @@
"robmorgan/phinx": "^0.16", "robmorgan/phinx": "^0.16",
"slim/slim": "^4", "slim/slim": "^4",
"symfony/string": "^7.2", "symfony/string": "^7.2",
"tedivm/jshrink": "^1.7", "tedivm/jshrink": "^1.7"
"xobotyi/beansclient": "^1.0"
}, },
"require-dev": { "require-dev": {
"fakerphp/faker": "^1", "fakerphp/faker": "^1",

View File

@ -162,15 +162,15 @@ return [
->register('subscription', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Subscription::class)) ->register('subscription', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Subscription::class))
->register('invoice', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Invoice::class)); ->register('invoice', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Invoice::class));
}, },
xobotyi\beansclient\Interfaces\ConnectionInterface::class => function(ContainerInterface $container) { Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) {
return new xobotyi\beansclient\Connection( return Pheanstalk\Pheanstalk::create(
$container->get('BEANSTALKD_HOST'), $container->get('BEANSTALKD_HOST'),
$container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300 $container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300
); );
}, },
Incoviba\Service\MQTT::class => function(ContainerInterface $container) { Incoviba\Service\MQTT::class => function(ContainerInterface $container) {
return new Incoviba\Service\MQTT() return new Incoviba\Service\MQTT()
->register('beanstalkd', $container->get(Incoviba\Service\MQTT\Beanstalkd::class)); ->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class));
}, },
Incoviba\Service\Queue::class => function(ContainerInterface $container) { Incoviba\Service\Queue::class => function(ContainerInterface $container) {
return new Incoviba\Service\Queue( return new Incoviba\Service\Queue(

View File

@ -0,0 +1,59 @@
<?php
namespace Incoviba\Service\MQTT;
use Incoviba\Common\Ideal\Service;
use Psr\Log\LoggerInterface;
use Pheanstalk as PBA;
class Pheanstalk extends Service implements MQTTInterface
{
const string DEFAULT_TUBE = 'default';
const int DEFAULT_TTR = 60;
const int DEFAULT_PRIORITY = 1_024;
public function __construct(LoggerInterface $logger, protected PBA\Pheanstalk $client, string $tubeName = self::DEFAULT_TUBE)
{
parent::__construct($logger);
$this->tube = new PBA\Values\TubeName($tubeName);
}
protected PBA\Values\TubeName $tube;
public function set(string $value, int $delay = 0): self
{
$this->client->useTube($this->tube);
$this->client->put($value, self::DEFAULT_PRIORITY, $delay, self::DEFAULT_TTR);
return $this;
}
public function exists(): bool
{
$stats = $this->client->statsTube($this->tube);
return $stats->currentJobsReady > 0;
}
protected int $currentJobId;
public function get(): string
{
$this->client->useTube($this->tube);
$job = $this->client->reserve();
$this->currentJobId = $job->getId();
return $job->getData();
}
public function update(string $newPayload, ?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJobId;
}
$this->remove($jobId);
$this->set($newPayload);
return $this;
}
public function remove(?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJobId;
}
$this->client->useTube($this->tube);
$this->client->delete(new PBA\Values\JobId($jobId));
return $this;
}
}

View File

@ -2,10 +2,12 @@
"name": "incoviba/cli", "name": "incoviba/cli",
"type": "project", "type": "project",
"require": { "require": {
"ext-sockets": "*",
"dragonmantank/cron-expression": "^3.4", "dragonmantank/cron-expression": "^3.4",
"guzzlehttp/guzzle": "^7.8", "guzzlehttp/guzzle": "^7.8",
"hollodotme/fast-cgi-client": "^3.1", "hollodotme/fast-cgi-client": "^3.1",
"monolog/monolog": "^3.5", "monolog/monolog": "^3.5",
"pda/pheanstalk": "^7.0",
"php-di/php-di": "^7.0", "php-di/php-di": "^7.0",
"predis/predis": "^3.0", "predis/predis": "^3.0",
"symfony/console": "^6.3" "symfony/console": "^6.3"

View File

@ -16,4 +16,15 @@ return [
} }
return new Predis\Client($options); return new Predis\Client($options);
}, },
Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) {
return Pheanstalk\Pheanstalk::create(
$container->get('BEANSTALKD_HOST'),
$container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300
);
},
Incoviba\Service\MQTT\MQTTInterface::class => function(ContainerInterface $container) {
$service = new Incoviba\Service\MQTT($container->get(Psr\Log\LoggerInterface::class));
$service->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class));
return $service;
}
]; ];

View File

@ -12,37 +12,11 @@ class Pending extends Console\Command\Command
parent::__construct($name); parent::__construct($name);
} }
protected function configure(): void
{
$this->addOption('full', 'f', Console\Input\InputOption::VALUE_NONE, 'Full output');
}
protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
{ {
$jobs = $this->jobService->getPending(); $jobCount = $this->jobService->getPending();
$jobCount = count($jobs);
$output->writeln("Found {$jobCount} pending jobs"); $output->writeln("Found {$jobCount} pending jobs");
if ($input->getOption('full') and $jobCount > 0) {
$io = new Console\Style\SymfonyStyle($input, $output);
$rows = [];
foreach ($jobs as $job) {
$retries = $job['retries'] ?? 0;
$updated = $job['updated_at'] ?? '';
$rows[] = [
$job['id'],
$job['created_at'],
$job['configuration']['type'],
$retries,
$updated
];
}
$io->table(['ID', 'Created', 'Type', 'Retries', 'Updated'], $rows);
}
return self::SUCCESS; return self::SUCCESS;
} }
} }

View File

@ -13,6 +13,7 @@ use Symfony\Component\Console;
class Run extends Console\Command\Command class Run extends Console\Command\Command
{ {
public function __construct(protected Service\FastCGI $fastcgi, protected LoggerInterface $logger, public function __construct(protected Service\FastCGI $fastcgi, protected LoggerInterface $logger,
protected Service\Job $jobService,
protected DateTimeZone $timeZone, ?string $name = null) protected DateTimeZone $timeZone, ?string $name = null)
{ {
parent::__construct($name); parent::__construct($name);
@ -26,6 +27,11 @@ class Run extends Console\Command\Command
$now = new DateTimeImmutable(); $now = new DateTimeImmutable();
} }
if ($this->jobService->getPending() === 0) {
$output->writeln("[{$now->format('Y-m-d H:i:s e')}] No pending jobs to run.");
return self::SUCCESS;
}
$output->writeln("[{$now->format('Y-m-d H:i:s e')}] Running Ready Job..."); $output->writeln("[{$now->format('Y-m-d H:i:s e')}] Running Ready Job...");
$this->runJob(); $this->runJob();
return $this->getResponses(); return $this->getResponses();

View File

@ -35,8 +35,13 @@ class Queue extends Command
]; ];
$io = new Console\Style\SymfonyStyle($input, $this->sections['top']); $io = new Console\Style\SymfonyStyle($input, $this->sections['top']);
$now = new DateTimeImmutable('now', $this->timezone); $now = new DateTimeImmutable('now', $this->timezone);
$io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue...");
if ($this->jobService->getPending() === 0) {
$io->success("[{$now->format('Y-m-d H:i:s e')}] Queue is empty");
return self::SUCCESS;
}
$io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue...");
return $this->runJob(); return $this->runJob();
} }

View File

@ -0,0 +1,18 @@
<?php
namespace Incoviba\Exception;
use Throwable;
use Exception;
abstract class MQTT extends Exception
{
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
{
$baseCode = 700;
$code = $baseCode + $code;
if ($message == "") {
$message = "MQTT Exception";
}
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class Create extends MQTT
{
public function __construct(string $tube = '', string $payload = '', ?Throwable $previous = null)
{
$message = "Unable to create MQTT message: {$payload} in tube {$tube}";
$code = 11;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class Delete extends MQTT
{
public function __construct(string $tube, int $jobId, ?Throwable $previous = null)
{
$message = "Could not delete job {$jobId} in tube {$tube}";
$code = 13;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class Read extends MQTT
{
public function __construct(string $tube, ?Throwable $previous = null)
{
$message = "Error reading from tube {$tube}";
$code = 10;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,14 @@
<?php
namespace Incoviba\Exception\MQTT;
use Incoviba\Exception\MQTT;
use Throwable;
class UnknownTransport extends MQTT
{
public function __construct(string $transportName, ?Throwable $previous = null)
{
$message = "Unknown transport {$transportName}";
parent::__construct($message, 1, $previous);
}
}

View File

@ -0,0 +1,16 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class Update extends MQTT
{
public function __construct(string $tube, string $payload, ?int $jobId = null, ?Throwable $previous = null)
{
$jobString = $jobId !== null ? " with jobId {$jobId}" : '';
$message = "Could not update job{$jobString} with {$payload} in tube {$tube}";
$code = 12;
parent::__construct($message, $code, $previous);
}
}

9
cli/src/Service.php Normal file
View File

@ -0,0 +1,9 @@
<?php
namespace Incoviba;
use Psr\Log\LoggerInterface;
abstract class Service
{
public function __construct(protected LoggerInterface $logger) {}
}

View File

@ -5,27 +5,22 @@ use DateInvalidTimeZoneException;
use DateMalformedStringException; use DateMalformedStringException;
use DateTimeImmutable; use DateTimeImmutable;
use DateTimeZone; use DateTimeZone;
use Exception;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Predis\Connection\ConnectionException; use Incoviba\Exception\MQTT as MQTTException;
use Incoviba\Service\MQTT\MQTTInterface;
class Job class Job
{ {
public function __construct(protected LoggerInterface $logger) public function __construct(protected LoggerInterface $logger, protected MQTTInterface $mqttService) {}
{
$this->redisKey = 'jobs';
}
protected string $redisKey; protected string $redisKey;
public function getPending(): array public function getPending(): int
{ {
try { try {
$jobs = $this->redisService->get($this->redisKey); return $this->mqttService->pending();
return json_decode($jobs, true); } catch (MQTTException $exception) {
} catch (ConnectionException|Exception $exception) {
$exception = new Exception("Could not read {$this->redisKey} from Redis", $exception->getCode(), $exception);
$this->logger->warning($exception->getMessage(), ['exception' => $exception]); $this->logger->warning($exception->getMessage(), ['exception' => $exception]);
return []; return 0;
} }
} }
@ -44,9 +39,11 @@ class Job
'updated_at' => null, 'updated_at' => null,
'retries' => 0 'retries' => 0
]; ];
$jobs = $this->getPending(); try {
$jobs []= $data; $this->mqttService->set(json_encode($data));
$this->redisService->set($this->redisKey, json_encode($jobs), -1); } catch (MQTTException $exception) {
$this->logger->warning($exception->getMessage(), ['exception' => $exception]);
}
return $data; return $data;
} }
} }

124
cli/src/Service/MQTT.php Normal file
View File

@ -0,0 +1,124 @@
<?php
namespace Incoviba\Service;
use Incoviba\Exception\MQTT as MQTTException;
use Incoviba\Service;
use Incoviba\Service\MQTT\MQTTInterface;
class MQTT extends Service implements MQTTInterface
{
protected array $transports = [];
public function register(string $name, MQTTInterface $transport): self
{
$this->transports[$name] = $transport;
return $this;
}
/**
* @param string $payload
* @param int $delay
* @param string|null $transportName
* @return $this
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Create
*/
public function set(string $payload, int $delay = 0, ?string $transportName = null): self
{
$transport = $this->getTransport($transportName);
$transport->set($payload, $delay);
return $this;
}
/**
* @param string|null $transportName
* @return int
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Read
*/
public function pending(?string $transportName = null): int
{
$transport = $this->getTransport($transportName);
return $transport->pending();
}
/**
* @param int|null $jobId
* @param string|null $transportName
* @return bool
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Read
*/
public function exists(?int $jobId = null, ?string $transportName = null): bool
{
$transport = $this->getTransport($transportName);
return $transport->exists($jobId);
}
/**
* @param int|null $jobId
* @param string|null $transportName
* @return string
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Read
*/
public function get(?int $jobId = null, ?string $transportName = null): string
{
$transport = $this->getTransport($transportName);
return $transport->get($jobId);
}
/**
* @param string $newPayload
* @param int|null $jobId
* @param string|null $transportName
* @return $this
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Update
*/
public function update(string $newPayload, ?int $jobId = null, ?string $transportName = null): self
{
$transport = $this->getTransport($transportName);
$transport->update($newPayload, $jobId);
return $this;
}
/**
* @param int|null $jobId
* @param string|null $transportName
* @return $this
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Delete
*/
public function remove(?int $jobId = null, ?string $transportName = null): self
{
$transport = $this->getTransport($transportName);
$transport->remove($jobId);
return $this;
}
/**
* @param string|null $transportName
* @return mixed
* @throws MQTTException\UnknownTransport
*/
protected function getTransport(?string $transportName): mixed
{
if (count($this->transports) === 0) {
throw new MQTTException\UnknownTransport('');
}
if ($transportName === null) {
if (array_key_exists('default', $this->transports)) {
$transportName = 'default';
} else {
$transportName = array_keys($this->transports)[0];
}
}
if (!array_key_exists($transportName, $this->transports)) {
if ($transportName === null) {
$transportName = '';
}
throw new MQTTException\UnknownTransport($transportName);
}
return $this->transports[$transportName];
}
}

View File

@ -0,0 +1,127 @@
<?php
namespace Incoviba\Service\MQTT;
use Exception;
use Psr\Log\LoggerInterface;
use xobotyi\beansclient;
use Incoviba\Service;
use Incoviba\Exception\MQTT;
class Beanstalkd extends Service implements MQTTInterface
{
const string DEFAULT_TUBE = 'default';
const int DEFAULT_TTR = 30;
const int DEFAULT_PRIORITY = 1_024;
public function __construct(LoggerInterface $logger, protected beansclient\Client $client,
protected string $tube = self::DEFAULT_TUBE,
protected int $ttr = self::DEFAULT_TTR,
protected int $priority = self::DEFAULT_PRIORITY)
{
parent::__construct($logger);
}
/**
* @param string $payload
* @param int $delay
* @return self
* @throws MQTT\Create
*/
public function set(string $payload, int $delay = 60): self
{
try {
$this->client->put($payload, $this->ttr, $this->priority, $delay);
} catch (Exception $exception) {
throw new MQTT\Create($this->tube, $payload, $exception);
}
return $this;
}
/**
* @return int
* @throws MQTT\Read
*/
public function pending(): int
{
try {
$stats = $this->client
->statsTube($this->tube);
} catch (Exception $exception) {
throw new MQTT\Read($this->tube, $exception);
}
if (!array_key_exists('current-jobs-ready', $stats)) {
throw new MQTT\Read($this->tube);
}
return $stats['current-jobs-ready'];
}
/**
* @param int|null $jobId
* @return bool
* @throws MQTT\Read
*/
public function exists(?int $jobId = null): bool
{
return $this->pending() > 0;
}
protected int $currentJobId;
/**
* @param int|null $jobId
* @return string
* @throws MQTT\Read
*/
public function get(?int $jobId = null): string
{
try {
if ($jobId !== null) {
$job = (object) $this->client
->reserveJob($jobId);
} else {
$job = (object) $this->client
->reserve();
}
} catch (Exception $exception) {
throw new MQTT\Read($this->tube, $exception);
}
$this->currentJobId = $job->id;
return $job->payload;
}
/**
* @param string $newPayload
* @param int|null $jobId
* @return self
* @throws MQTT\Update
*/
public function update(string $newPayload, ?int $jobId = null): self
{
try {
$this->remove($jobId);
$this->set($newPayload);
} catch (MQTT\Delete | MQTT\Create $exception) {
throw new MQTT\Update($this->tube, $newPayload, $jobId, $exception);
}
return $this;
}
/**
* @param int|null $jobId
* @return self
* @throws MQTT\Delete
*/
public function remove(?int $jobId = null): self
{
try {
if ($jobId === null) {
$jobId = $this->currentJobId;
}
$this->client
->delete($jobId);
} catch (Exception $exception) {
throw new MQTT\Delete($this->tube, $jobId, $exception);
}
return $this;
}
}

View File

@ -0,0 +1,12 @@
<?php
namespace Incoviba\Service\MQTT;
interface MQTTInterface
{
public function set(string $payload, int $delay = 0): self;
public function pending(): int;
public function exists(?int $jobId = null): bool;
public function get(?int $jobId = null): string;
public function update(string $newPayload, ?int $jobId = null): self;
public function remove(?int $jobId = null): self;
}

View File

@ -0,0 +1,65 @@
<?php
namespace Incoviba\Service\MQTT;
use Psr\Log\LoggerInterface;
use Pheanstalk as PBA;
use Incoviba\Service;
class Pheanstalk extends Service implements MQTTInterface
{
public function __construct(LoggerInterface $logger, protected PBA\Pheanstalk $client, string $tubeName = 'default')
{
parent::__construct($logger);
$this->tube = new PBA\Values\TubeName($tubeName);
}
protected PBA\Values\TubeName $tube;
public function set(string $payload, int $delay = 0): self
{
$this->client->useTube($this->tube);
$this->client->put($payload, $delay);
return $this;
}
public function pending(): int
{
$stats = $this->client->statsTube($this->tube);
return $stats->currentJobsReady;
}
public function exists(?int $jobId = null): bool
{
return $this->pending() > 0;
}
protected int $currentJobId;
public function get(?int $jobId = null): string
{
$this->client->watch($this->tube);
if ($jobId !== null) {
$jobId = new PBA\Values\JobId($jobId);
$job = $this->client->reserveJob($jobId);
} else {
$job = $this->client->reserve();
}
$this->currentJobId = $job->getId();
return $job->getData();
}
public function update(string $newPayload, ?int $jobId = null): self
{
$this->remove($jobId);
$this->set($newPayload);
return $this;
}
public function remove(?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJobId;
}
$this->client->watch($this->tube);
$this->client->delete(new PBA\Values\JobId($jobId));
return $this;
}
}