feature/cierres (#25)
Varios cambios Co-authored-by: Juan Pablo Vial <jpvialb@incoviba.cl> Reviewed-on: #25
This commit is contained in:
127
cli/src/Service/MQTT/Beanstalkd.php
Normal file
127
cli/src/Service/MQTT/Beanstalkd.php
Normal file
@ -0,0 +1,127 @@
|
||||
<?php
|
||||
namespace Incoviba\Service\MQTT;
|
||||
|
||||
use Exception;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use xobotyi\beansclient;
|
||||
use Incoviba\Service;
|
||||
use Incoviba\Exception\MQTT;
|
||||
|
||||
class Beanstalkd extends Service implements MQTTInterface
|
||||
{
|
||||
const string DEFAULT_TUBE = 'default';
|
||||
const int DEFAULT_TTR = 30;
|
||||
const int DEFAULT_PRIORITY = 1_024;
|
||||
|
||||
public function __construct(LoggerInterface $logger, protected beansclient\Client $client,
|
||||
protected string $tube = self::DEFAULT_TUBE,
|
||||
protected int $ttr = self::DEFAULT_TTR,
|
||||
protected int $priority = self::DEFAULT_PRIORITY)
|
||||
{
|
||||
parent::__construct($logger);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $payload
|
||||
* @param int $delay
|
||||
* @return self
|
||||
* @throws MQTT\Create
|
||||
*/
|
||||
public function set(string $payload, int $delay = 60): self
|
||||
{
|
||||
try {
|
||||
$this->client->put($payload, $this->ttr, $this->priority, $delay);
|
||||
} catch (Exception $exception) {
|
||||
throw new MQTT\Create($this->tube, $payload, $exception);
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
* @throws MQTT\Read
|
||||
*/
|
||||
public function pending(): int
|
||||
{
|
||||
try {
|
||||
$stats = $this->client
|
||||
->statsTube($this->tube);
|
||||
} catch (Exception $exception) {
|
||||
throw new MQTT\Read($this->tube, $exception);
|
||||
}
|
||||
if (!array_key_exists('current-jobs-ready', $stats)) {
|
||||
throw new MQTT\Read($this->tube);
|
||||
}
|
||||
return $stats['current-jobs-ready'];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @return bool
|
||||
* @throws MQTT\Read
|
||||
*/
|
||||
public function exists(?int $jobId = null): bool
|
||||
{
|
||||
return $this->pending() > 0;
|
||||
}
|
||||
|
||||
protected int $currentJobId;
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @return string
|
||||
* @throws MQTT\Read
|
||||
*/
|
||||
public function get(?int $jobId = null): string
|
||||
{
|
||||
try {
|
||||
if ($jobId !== null) {
|
||||
$job = (object) $this->client
|
||||
->reserveJob($jobId);
|
||||
} else {
|
||||
$job = (object) $this->client
|
||||
->reserve();
|
||||
}
|
||||
} catch (Exception $exception) {
|
||||
throw new MQTT\Read($this->tube, $exception);
|
||||
}
|
||||
$this->currentJobId = $job->id;
|
||||
return $job->payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $newPayload
|
||||
* @param int|null $jobId
|
||||
* @return self
|
||||
* @throws MQTT\Update
|
||||
*/
|
||||
public function update(string $newPayload, ?int $jobId = null): self
|
||||
{
|
||||
try {
|
||||
$this->remove($jobId);
|
||||
$this->set($newPayload);
|
||||
} catch (MQTT\Delete | MQTT\Create $exception) {
|
||||
throw new MQTT\Update($this->tube, $newPayload, $jobId, $exception);
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @return self
|
||||
* @throws MQTT\Delete
|
||||
*/
|
||||
public function remove(?int $jobId = null): self
|
||||
{
|
||||
try {
|
||||
if ($jobId === null) {
|
||||
$jobId = $this->currentJobId;
|
||||
}
|
||||
$this->client
|
||||
->delete($jobId);
|
||||
} catch (Exception $exception) {
|
||||
throw new MQTT\Delete($this->tube, $jobId, $exception);
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user