client->put($payload, $this->ttr, $this->priority, $delay); } catch (Exception $exception) { throw new MQTT\Create($this->tube, $payload, $exception); } return $this; } /** * @return int * @throws MQTT\Read */ public function pending(): int { try { $stats = $this->client ->statsTube($this->tube); } catch (Exception $exception) { throw new MQTT\Read($this->tube, $exception); } if (!array_key_exists('current-jobs-ready', $stats)) { throw new MQTT\Read($this->tube); } return $stats['current-jobs-ready']; } /** * @param int|null $jobId * @return bool * @throws MQTT\Read */ public function exists(?int $jobId = null): bool { return $this->pending() > 0; } protected int $currentJobId; /** * @param int|null $jobId * @return string * @throws MQTT\Read */ public function get(?int $jobId = null): string { try { if ($jobId !== null) { $job = (object) $this->client ->reserveJob($jobId); } else { $job = (object) $this->client ->reserve(); } } catch (Exception $exception) { throw new MQTT\Read($this->tube, $exception); } $this->currentJobId = $job->id; return $job->payload; } /** * @param string $newPayload * @param int|null $jobId * @return self * @throws MQTT\Update */ public function update(string $newPayload, ?int $jobId = null): self { try { $this->remove($jobId); $this->set($newPayload); } catch (MQTT\Delete | MQTT\Create $exception) { throw new MQTT\Update($this->tube, $newPayload, $jobId, $exception); } return $this; } /** * @param int|null $jobId * @return self * @throws MQTT\Delete */ public function remove(?int $jobId = null): self { try { if ($jobId === null) { $jobId = $this->currentJobId; } $this->client ->delete($jobId); } catch (Exception $exception) { throw new MQTT\Delete($this->tube, $jobId, $exception); } return $this; } }