4 Commits

Author SHA1 Message Date
47679cd4e4 Se registra todo envio a Toku 2025-08-12 19:18:18 -04:00
0f8db5a3f8 Merge pull request 'FIX: Telefono sobre rango maximo de integer en MySQL' (#26) from hotfix/nueva-venta-fallando into develop
Reviewed-on: #26
2025-08-05 15:44:54 -04:00
c38e89d3f1 FIX: Telefono sobre rango maximo de integer en MySQL 2025-08-05 15:41:23 -04:00
307f2ac7d7 feature/cierres (#25)
Varios cambios

Co-authored-by: Juan Pablo Vial <jpvialb@incoviba.cl>
Reviewed-on: #25
2025-07-22 13:18:00 +00:00
73 changed files with 1933 additions and 530 deletions

View File

@ -3,7 +3,8 @@ FROM php:8.4-cli
ENV TZ "${TZ}" ENV TZ "${TZ}"
ENV APP_NAME "${APP_NAME}" ENV APP_NAME "${APP_NAME}"
RUN apt-get update && apt-get install -y --no-install-recommends cron rsyslog nano && rm -r /var/lib/apt/lists/* RUN apt-get update && apt-get install -y --no-install-recommends cron rsyslog nano beanstalkd \
&& rm -r /var/lib/apt/lists/*
RUN pecl install xdebug-3.4.2 \ RUN pecl install xdebug-3.4.2 \
&& docker-php-ext-enable xdebug \ && docker-php-ext-enable xdebug \

View File

@ -3,7 +3,7 @@ FROM php:8.4-fpm
ENV TZ=America/Santiago ENV TZ=America/Santiago
RUN apt-get update && apt-get install -y --no-install-recommends libzip-dev libicu-dev git \ RUN apt-get update && apt-get install -y --no-install-recommends libzip-dev libicu-dev git \
libpng-dev unzip tzdata libxml2-dev \ libpng-dev unzip tzdata libxml2-dev beanstalkd \
&& rm -r /var/lib/apt/lists/* \ && rm -r /var/lib/apt/lists/* \
&& docker-php-ext-install pdo pdo_mysql zip intl gd bcmath dom \ && docker-php-ext-install pdo pdo_mysql zip intl gd bcmath dom \
&& pecl install xdebug-3.4.2 \ && pecl install xdebug-3.4.2 \

0
app/bin/console Normal file → Executable file
View File

0
app/bin/integration_tests Normal file → Executable file
View File

0
app/bin/performance_tests Normal file → Executable file
View File

0
app/bin/unit_tests Normal file → Executable file
View File

View File

@ -0,0 +1,10 @@
<?php
namespace Incoviba\Common\Ideal\Service;
use Incoviba\Common\Define;
use Incoviba\Common\Ideal;
abstract class Repository extends Ideal\Service
{
abstract public function getRepository(): Define\Repository;
}

View File

@ -64,10 +64,10 @@ class Select extends Ideal\Query implements Define\Query\Select
public function having(array|string $conditions): Select public function having(array|string $conditions): Select
{ {
if (is_string($conditions)) { if (is_string($conditions)) {
return $this->addCondition($conditions); return $this->addHaving($conditions);
} }
foreach ($conditions as $condition) { foreach ($conditions as $condition) {
$this->addCondition($condition); $this->addHaving($condition);
} }
return $this; return $this;
} }

View File

@ -8,11 +8,13 @@
"ext-gd": "*", "ext-gd": "*",
"ext-openssl": "*", "ext-openssl": "*",
"ext-pdo": "*", "ext-pdo": "*",
"ext-sockets": "*",
"berrnd/slim-blade-view": "^1", "berrnd/slim-blade-view": "^1",
"guzzlehttp/guzzle": "^7", "guzzlehttp/guzzle": "^7",
"monolog/monolog": "^3", "monolog/monolog": "^3",
"nyholm/psr7": "^1", "nyholm/psr7": "^1",
"nyholm/psr7-server": "^1", "nyholm/psr7-server": "^1",
"pda/pheanstalk": "^7.0",
"php-di/php-di": "^7", "php-di/php-di": "^7",
"php-di/slim-bridge": "^3", "php-di/slim-bridge": "^3",
"phpoffice/phpspreadsheet": "^3", "phpoffice/phpspreadsheet": "^3",

View File

@ -125,7 +125,7 @@ pm = ondemand
; forget to tweak pm.* to fit your needs. ; forget to tweak pm.* to fit your needs.
; Note: Used when pm is set to 'static', 'dynamic' or 'ondemand' ; Note: Used when pm is set to 'static', 'dynamic' or 'ondemand'
; Note: This value is mandatory. ; Note: This value is mandatory.
pm.max_children = 5 pm.max_children = 2
; The number of child processes created on startup. ; The number of child processes created on startup.
; Note: Used only when pm is set to 'dynamic' ; Note: Used only when pm is set to 'dynamic'
@ -152,6 +152,7 @@ pm.max_children = 5
; Note: Used only when pm is set to 'ondemand' ; Note: Used only when pm is set to 'ondemand'
; Default Value: 10s ; Default Value: 10s
;pm.process_idle_timeout = 10s; ;pm.process_idle_timeout = 10s;
pm.process_idle_timeout = 10s
; The number of requests each child process should execute before respawning. ; The number of requests each child process should execute before respawning.
; This can be useful to work around memory leaks in 3rd party libraries. For ; This can be useful to work around memory leaks in 3rd party libraries. For

View File

@ -19,14 +19,18 @@ final class CreateTokuAccounts extends AbstractMigration
*/ */
public function change(): void public function change(): void
{ {
$this->execute('SET unique_checks=0; SET foreign_key_checks=0;');
$this->table('toku_accounts') $this->table('toku_accounts')
->addColumn('toku_id', 'integer') ->addColumn('sociedad_rut', 'integer', ['limit' => 8, 'signed' => false, 'null' => false])
->addColumn('sociedad_rut', 'integer', ['limit' => 8]) ->addColumn('toku_id', 'string', ['length' => 255, 'null' => false])
->addColumn('account_key', 'string', ['length' => 255]) ->addColumn('account_key', 'string', ['length' => 255, 'null' => false])
->addIndex(['toku_id'], ['unique' => true]) ->addColumn('enabled', 'boolean', ['default' => true])
->addIndex(['account_id'], ['unique' => true])
->addForeignKey('sociedad_rut', 'sociedades', 'rut', ['delete' => 'CASCADE', 'update' => 'CASCADE'])
->addTimestamps() ->addTimestamps()
#->addForeignKey('sociedad_rut', 'inmobiliaria', 'rut', ['delete' => 'CASCADE', 'update' => 'CASCADE'])
->addIndex(['toku_id'], ['unique' => true])
->create(); ->create();
$this->execute('SET unique_checks=1; SET foreign_key_checks=1;');
} }
} }

View File

@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
use Phinx\Migration\AbstractMigration;
final class ChangeTelefonoSizeInPropietario extends AbstractMigration
{
/**
* Change Method.
*
* Write your reversible migrations using this method.
*
* More information on writing migrations is available here:
* https://book.cakephp.org/phinx/0/en/migrations.html#the-change-method
*
* Remember to call "create()" or "update()" and NOT "save()" when working
* with the Table class.
*/
public function change(): void
{
$this->table('propietario')
->changeColumn('telefono', 'biginteger', ['null' => true, 'signed' => false, 'default' => null])
->update();
}
}

View File

@ -7,4 +7,5 @@ $app->group('/toku', function($app) {
$app->get('/test[/]', [Toku::class, 'test']); $app->get('/test[/]', [Toku::class, 'test']);
$app->delete('/reset[/]', [Toku::class, 'reset']); $app->delete('/reset[/]', [Toku::class, 'reset']);
$app->post('/enqueue[/]', [Toku::class, 'enqueue']); $app->post('/enqueue[/]', [Toku::class, 'enqueue']);
$app->post('/update[/{type}[/]]', [Toku::class, 'update']);
}); });

View File

@ -2,9 +2,9 @@
use Incoviba\Controller\API\Queues; use Incoviba\Controller\API\Queues;
$app->group('/queue', function($app) { $app->group('/queue', function($app) {
$app->get('/jobs[/]', [Queues::class, 'jobs']); #$app->get('/jobs[/]', [Queues::class, 'jobs']);
$app->group('/run', function($app) { $app->group('/run', function($app) {
$app->get('/{job_id:[0-9]+}[/]', [Queues::class, 'run']); #$app->get('/{job_id:[0-9]+}[/]', [Queues::class, 'run']);
$app->get('[/]', Queues::class); $app->get('[/]', Queues::class);
}); });
}); });

View File

@ -156,7 +156,7 @@
<script> <script>
const regiones = [ const regiones = [
@foreach ($regiones as $region) @foreach ($regiones as $region)
'<div class="item" data-value="{{$region->id}}">{{$region->descripcion}}</div>', '<div class="item" data-value="{{$region->id}}">{{$region->numeral}} - {{$region->descripcion}}</div>',
@endforeach @endforeach
] ]

View File

@ -54,22 +54,23 @@
} }
$(document).ready(() => { $(document).ready(() => {
const url = '{{$urls->api}}/ventas/pago/{{$venta->resciliacion()->id}}' const url = '{{$urls->api}}/ventas/pago/{{$venta->resciliacion()->id}}'
let old = new Date({{$venta->resciliacion()?->fecha->format('Y') ?? date('Y')}}, let old = new Date(Date.parse('{{$venta->resciliacion()?->fecha->format('Y-m-d') ?? $venta->currentEstado()->fecha->format('Y-m-d') ?? $venta->fecha->format('Y-m-d')}}') + 24 * 60 * 60 * 1000)
{{$venta->resciliacion()?->fecha->format('n') ?? date('n')}}-1, {{$venta->resciliacion()?->fecha->format('j') ?? date('j')}})
calendar_date_options['initialDate'] = old calendar_date_options['initialDate'] = old
calendar_date_options['onChange'] = function(date, text, mode) { calendar_date_options['onChange'] = function(date, text, mode) {
if (date.getTime() === old.getTime()) { if (date.getTime() === old.getTime()) {
return return
} }
const body = new FormData() const body = new FormData()
body.set('fecha', date.toISOString()) const fecha = new Date(date.getTime())
fecha.setDate(fecha.getDate() - 1)
body.set('fecha', fecha.toISOString())
$('#loading-spinner-fecha').show() $('#loading-spinner-fecha').show()
APIClient.fetch(url, {method: 'post', body}).then(response => { APIClient.fetch(url, {method: 'post', body}).then(response => {
$('#loading-spinner-fecha').hide() $('#loading-spinner-fecha').hide()
if (!response) { if (!response) {
return return
} }
old = date old = new Date(date.getTime())
alertResponse('Fecha cambiada correctamente.') alertResponse('Fecha cambiada correctamente.')
}) })
} }

View File

@ -6,6 +6,7 @@
@section('venta_content') @section('venta_content')
<div class="ui list"> <div class="ui list">
@if (isset($venta->formaPago()->pie))
<div class="item"> <div class="item">
<div class="header">Valor Pagado</div> <div class="header">Valor Pagado</div>
<div class="content"> <div class="content">
@ -15,6 +16,16 @@
</div> </div>
</div> </div>
</div> </div>
@else
<div class="item">
<div class="ui compact warning message">
<div class="content">
<i class="exclamation triangle icon"></i>
No tiene valor pagado
</div>
</div>
</div>
@endif
<div class="item"> <div class="item">
<div class="header"> <div class="header">
Multa Estandar Multa Estandar

View File

@ -116,7 +116,18 @@ return [
->registerSub($container->get(Incoviba\Service\Contabilidad\Cartola\BCI\Mes::class)); ->registerSub($container->get(Incoviba\Service\Contabilidad\Cartola\BCI\Mes::class));
}, },
'TokuClient' => function(ContainerInterface $container) { 'TokuClient' => function(ContainerInterface $container) {
$logger = $container->get('externalLogger');
$stack = GuzzleHttp\HandlerStack::create();
$stack->push(GuzzleHttp\Middleware::mapRequest(function(Psr\Http\Message\RequestInterface $request) use ($logger) {
$logger->info('Toku Request', [
'method' => $request->getMethod(),
'uri' => (string) $request->getUri(),
'headers' => $request->getHeaders(),
'body' => $request->getBody()->getContents(),
]);
}));
return new GuzzleHttp\Client([ return new GuzzleHttp\Client([
'handler' => $stack,
'base_uri' => $container->get('TOKU_URL'), 'base_uri' => $container->get('TOKU_URL'),
'headers' => [ 'headers' => [
'x-api-key' => $container->get('TOKU_TOKEN'), 'x-api-key' => $container->get('TOKU_TOKEN'),
@ -162,6 +173,16 @@ return [
->register('subscription', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Subscription::class)) ->register('subscription', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Subscription::class))
->register('invoice', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Invoice::class)); ->register('invoice', $container->get(Incoviba\Service\Venta\MediosPago\Toku\Invoice::class));
}, },
Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) {
return Pheanstalk\Pheanstalk::create(
$container->get('BEANSTALKD_HOST'),
$container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300
);
},
Incoviba\Service\MQTT::class => function(ContainerInterface $container) {
return new Incoviba\Service\MQTT()
->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class));
},
Incoviba\Service\Queue::class => function(ContainerInterface $container) { Incoviba\Service\Queue::class => function(ContainerInterface $container) {
return new Incoviba\Service\Queue( return new Incoviba\Service\Queue(
$container->get(Psr\Log\LoggerInterface::class), $container->get(Psr\Log\LoggerInterface::class),

View File

@ -22,20 +22,4 @@ class Queues extends Ideal\Controller
} }
return $this->withJson($response, $output); return $this->withJson($response, $output);
} }
public function jobs(ServerRequestInterface $request, ResponseInterface $response,
Service\Queue $queueService): ResponseInterface
{
$output = [
'jobs' => array_column($queueService->getPendingJobs(), 'id')
];
return $this->withJson($response, $output);
}
public function run(ServerRequestInterface $request, ResponseInterface $response, Service\Queue $queueService,
int $job_id): ResponseInterface
{
if ($queueService->runJob($job_id, $request)) {
return $response->withStatus(200);
}
return $response->withStatus(422);
}
} }

View File

@ -146,4 +146,25 @@ class Toku extends Controller
} }
return $this->withJson($response, $output); return $this->withJson($response, $output);
} }
public function update(ServerRequestInterface $request, ResponseInterface $response,
Service\Venta\MediosPago\Toku $tokuService, ?string $type = null): ResponseInterface
{
$body = $request->getBody()->getContents();
$input = json_decode($body, true);
$output = [
'type' => $type,
'input' => $input,
'output' => [],
'success' => false
];
try {
$output['output'] = $tokuService->update($input, $type);
$output['success'] = true;
} catch (Exception $exception) {
$this->logger->error($exception);
}
return $this->withJson($response, $output);
}
} }

View File

@ -4,6 +4,9 @@ namespace Incoviba\Controller;
use Incoviba\Common\Alias\View; use Incoviba\Common\Alias\View;
use Incoviba\Common\Implement\Exception\EmptyRedis; use Incoviba\Common\Implement\Exception\EmptyRedis;
use Incoviba\Common\Implement\Exception\EmptyResult; use Incoviba\Common\Implement\Exception\EmptyResult;
use Incoviba\Exception\ServiceAction\Create;
use Incoviba\Exception\ServiceAction\Read;
use Incoviba\Exception\ServiceAction\Update;
use Incoviba\Model; use Incoviba\Model;
use Incoviba\Repository; use Incoviba\Repository;
use Incoviba\Service; use Incoviba\Service;
@ -142,9 +145,24 @@ class Ventas
return $view->render($response, 'ventas.desistir', compact('venta')); return $view->render($response, 'ventas.desistir', compact('venta'));
} }
public function desistida(ServerRequestInterface $request, ResponseInterface $response, Service\Venta $ventaService, public function desistida(ServerRequestInterface $request, ResponseInterface $response, Service\Venta $ventaService,
Service\Venta\Pago $pagoService,
View $view, int $venta_id): ResponseInterface View $view, int $venta_id): ResponseInterface
{ {
try {
$venta = $ventaService->getById($venta_id); $venta = $ventaService->getById($venta_id);
} catch (Read) {
return $view->render($response->withStatus(404), 'not_found');
}
if ($venta->resciliacion() === null) {
$pagoData = [
'fecha' => $venta->currentEstado()->fecha->format('Y-m-d'),
'valor' => 0
];
try {
$pago = $pagoService->add($pagoData);
$venta = $ventaService->edit($venta, ['resciliacion' => $pago->id]);
} catch (Create | Update) {}
}
return $view->render($response, 'ventas.desistida', compact('venta')); return $view->render($response, 'ventas.desistida', compact('venta'));
} }
} }

View File

@ -0,0 +1,18 @@
<?php
namespace Incoviba\Exception;
use Throwable;
use Exception;
abstract class MQTT extends Exception
{
public function __construct($message = "", $code = 0, ?Throwable $previous = null)
{
$baseCode = 700;
$code = $baseCode + $code;
if ($message == "") {
$message = "MQTT Exception";
}
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,18 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class MissingClient extends MQTT
{
public function __construct(string $host = '', ?Throwable $previous = null)
{
$message = 'Missing MQTT client';
if ($host !== '') {
$message = "{$message} for host {$host}";
}
$code = 1;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class MissingJob extends MQTT
{
public function __construct(?Throwable $previous = null)
{
$message = 'Missing MQTT job';
$code = 10;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class RemoveJob extends MQTT
{
public function __construct(int $jobId, ?Throwable $previous = null)
{
$message = "Could not remove job {$jobId}";
$code = 13;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class SetJob extends MQTT
{
public function __construct(string $payload, ?Throwable $previous = null)
{
$message = "Could not set job with {$payload}";
$code = 11;
parent::__construct($message, $code, $previous);
}
}

View File

@ -5,60 +5,43 @@ use DateInvalidTimeZoneException;
use DateMalformedStringException; use DateMalformedStringException;
use DateTimeImmutable; use DateTimeImmutable;
use DateTimeZone; use DateTimeZone;
use InvalidArgumentException;
use OutOfRangeException;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Predis\Connection\ConnectionException;
use Incoviba\Common\Ideal; use Incoviba\Common\Ideal;
use Incoviba\Common\Implement\Exception\EmptyRedis; use Incoviba\Exception\MQTT as MQTTException;
use Incoviba\Common\Implement\Exception\EmptyResult;
use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update}; use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update};
use Incoviba\Repository;
use Incoviba\Model; use Incoviba\Model;
use Incoviba\Repository;
class Job extends Ideal\Service class Job extends Ideal\Service
{ {
public function __construct(LoggerInterface $logger, protected Redis $redisService, public function __construct(LoggerInterface $logger, protected MQTT $mqttService,
protected Repository\Job $jobRepository) protected Repository\Job $jobRepository)
{ {
parent::__construct($logger); parent::__construct($logger);
} }
protected string $redisKey = 'jobs';
public function getPending(null|string|array $orderBy = null): array public function isPending(): bool
{ {
try { try {
$jobs = $this->redisService->get($this->redisKey); return $this->mqttService->exists();
if ($jobs === null) { } catch (MQTTException $exception) {
return []; $this->logger->error($exception->getMessage(), ['exception' => $exception]);
} return false;
$jobs = json_decode($jobs, true);
if ($orderBy !== null) {
uksort($jobs, function($a, $b) use ($orderBy) {
return $a[$orderBy] <=> $b[$orderBy];
});
}
return array_map([$this, 'load'], $jobs);
} catch (ConnectionException | EmptyRedis) {
return [];
} }
} }
/** /**
* @param int $id
* @return Model\Job * @return Model\Job
* @throws Read * @throws Read
*/ */
public function getPendingById(int $id): Model\Job public function get(): Model\Job
{ {
$jobs = $this->getJobs();
try { try {
$idx = $this->findJob($jobs, $id); return $this->load(json_decode($this->mqttService->get(), true));
} catch (EmptyResult $exception) { } catch (MQTTException $exception) {
$exception = new OutOfRangeException('Job not found', count($jobs), $exception); $this->logger->error($exception->getMessage(), ['exception' => $exception]);
throw new Read(__CLASS__, $exception); throw new Read(__CLASS__, $exception);
} }
return $this->load($jobs[$idx]);
} }
/** /**
@ -71,6 +54,7 @@ class Job extends Ideal\Service
try { try {
$now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago'))); $now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago')));
} catch (DateMalformedStringException | DateInvalidTimeZoneException $exception) { } catch (DateMalformedStringException | DateInvalidTimeZoneException $exception) {
$this->logger->warning($exception->getMessage(), ['exception' => $exception]);
$now = new DateTimeImmutable(); $now = new DateTimeImmutable();
} }
$data = [ $data = [
@ -81,17 +65,9 @@ class Job extends Ideal\Service
'updated_at' => null, 'updated_at' => null,
'retries' => 0 'retries' => 0
]; ];
$jobs = [];
try { try {
$jobs = $this->redisService->get($this->redisKey); $this->mqttService->set(json_encode($data));
if ($jobs !== null) { } catch (MQTTException $exception) {
$jobs = json_decode($jobs, true);
}
} catch (EmptyRedis) {}
$jobs []= $data;
try {
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
} catch (ConnectionException $exception) {
throw new Create(__CLASS__, $exception); throw new Create(__CLASS__, $exception);
} }
return $this->load($data); return $this->load($data);
@ -99,50 +75,35 @@ class Job extends Ideal\Service
/** /**
* @param Model\Job $job * @param Model\Job $job
* @return Model\Job * @return void
* @throws Update * @throws Update
* @throws Read
*/ */
public function update(Model\Job $job): Model\Job public function update(Model\Job $job): void
{ {
try { try {
$now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago'))); $now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago')));
} catch (DateMalformedStringException | DateInvalidTimeZoneException) { } catch (DateMalformedStringException | DateInvalidTimeZoneException) {
$now = new DateTimeImmutable(); $now = new DateTimeImmutable();
} }
$jobs = $this->getJobs(); $data = json_decode(json_encode($job), true);
$data['updated_at'] = $now->format('Y-m-d H:i:s');
try { try {
$idx = $this->findJob($jobs, $job->id); $this->mqttService->update(json_encode($data));
} catch (EmptyResult $exception) { } catch (MQTTException $exception) {
throw new Read(__CLASS__, $exception);
}
$jobs[$idx]['updated_at'] = $now->format('Y-m-d H:i:s');
$jobs[$idx]['retries'] = $job->retries;
try {
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
} catch (ConnectionException $exception) {
throw new Update(__CLASS__, $exception); throw new Update(__CLASS__, $exception);
} }
return $this->load($jobs[$idx]);
} }
/** /**
* @param Model\Job $job * @param Model\Job $job
* @throws Read
* @throws Delete * @throws Delete
*/ */
public function remove(Model\Job $job): void public function remove(Model\Job $job): void
{ {
$jobs = $this->getJobs();
try { try {
$idx = $this->findJob($jobs, $job->id); $this->mqttService->remove();
} catch (EmptyResult $exception) { } catch (MQTTException $exception) {
throw new Read(__CLASS__, $exception);
}
unset($jobs[$idx]);
try {
$this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1);
} catch (ConnectionException $exception) {
throw new Delete(__CLASS__, $exception); throw new Delete(__CLASS__, $exception);
} }
} }
@ -150,59 +111,18 @@ class Job extends Ideal\Service
/** /**
* @param Model\Job $job * @param Model\Job $job
* @return bool * @return bool
* @throws Read | Create
*/ */
public function execute(Model\Job $job): bool public function execute(Model\Job $job): bool
{ {
$jobs = $this->getJobs();
try { try {
$idx = $this->findJob($jobs, $job->id); $this->mqttService->remove();
} catch (EmptyResult $exception) {
$exception = new OutOfRangeException('Job not found', count($jobs), $exception);
throw new Read(__CLASS__, $exception);
}
unset($jobs[$idx]);
try {
$this->redisService->set($this->redisKey, json_encode(array_values($jobs)), -1);
} catch (ConnectionException $exception) {
throw new Create(__CLASS__, $exception);
}
return true; return true;
} catch (MQTTException $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
return false;
}
} }
/**
* @return array
* @throws Read
*/
protected function getJobs(): array
{
try {
$jobs = $this->redisService->get($this->redisKey);
} catch (EmptyRedis $exception) {
throw new Read(__CLASS__, $exception);
}
if ($jobs === null) {
$exception = new InvalidArgumentException("Redis Key {$this->redisKey} not found");
throw new Read(__CLASS__, $exception);
}
return json_decode($jobs, true);
}
/**
* @param array $jobs
* @param int $id
* @return int
* @throws EmptyResult
*/
protected function findJob(array $jobs, int $id): int
{
$idx = array_find_key($jobs, function($job) use ($id) {
return (int) $job['id'] === $id;
});
if ($idx === null) {
throw new EmptyResult("SELECT * FROM jobs WHERE id = ?");
}
return $idx;
}
protected function load(array $data, ?int $id = null): Model\Job protected function load(array $data, ?int $id = null): Model\Job
{ {
$job = new Model\Job(); $job = new Model\Job();

102
app/src/Service/MQTT.php Normal file
View File

@ -0,0 +1,102 @@
<?php
namespace Incoviba\Service;
use Incoviba\Exception\MQTT as MQTTException;
use Incoviba\Service\MQTT\MQTTInterface;
class MQTT implements MQTTInterface
{
protected array $clients = [];
public function register(string $name, MQTTInterface $client): self
{
$this->clients[$name] = $client;
return $this;
}
public function clientExists(string $name): bool
{
return isset($this->clients[$name]);
}
/**
* @param string|null $name
* @return MQTTInterface
* @throws MQTTException/MissingClient
*/
public function getClient(?string $name = null): MQTTInterface
{
if ($name === null) {
$name = array_keys($this->clients)[0];
}
if (!$this->clientExists($name)) {
throw new MQTTException\MissingClient($name);
}
return $this->clients[$name];
}
/**
* @param string|null $host
* @return bool
* @throws MQTTException/MissingClient
* @throws MQTTException/MissingJob
*/
public function exists(?string $host = null): bool
{
$client = $this->getClient($host);
return $client->exists();
}
/**
* @param string|null $host
* @return string
* @throws MQTTException/MissingClient
* @throws MQTTException/MissingJob
*/
public function get(?string $host = null): string
{
$client = $this->getClient($host);
return $client->get();
}
/**
* @param string $value
* @param int $delay
* @param string|null $host
* @return $this
* @throws MQTTException/MissingClient
* @throws MQTTException/SetJob
*/
public function set(string $value, int $delay = 0, ?string $host = null): self
{
$client = $this->getClient($host);
$client->set($value, $delay);
return $this;
}
/**
* @param int|null $jobId
* @param string|null $host
* @return $this
* @throws MQTTException/MissingJob
* @throws MQTTException/RemoveJob
*/
public function remove(?int $jobId = null, ?string $host = null): self
{
$this->getClient($host)->remove($jobId);
return $this;
}
/**
* @param string $newPayload
* @param int|null $jobId
* @param string|null $host
* @return $this
* @throws MQTTException/MissingJob
* @throws MQTTException/RemoveJob
* @throws MQTTException/SetJob
*/
public function update(string $newPayload, ?int $jobId = null, ?string $host = null): self
{
$this->getClient($host)->update($newPayload, $jobId);
return $this;
}
}

View File

@ -0,0 +1,111 @@
<?php
namespace Incoviba\Service\MQTT;
use Exception;
use Incoviba\Exception\MQTT\MissingJob;
use Incoviba\Exception\MQTT\RemoveJob;
use Incoviba\Exception\MQTT\SetJob;
use Psr\Log\LoggerInterface;
use xobotyi\beansclient;
use xobotyi\beansclient\Exception\ClientException;
use xobotyi\beansclient\Exception\CommandException;
use xobotyi\beansclient\Exception\JobException;
class Beanstalkd implements MQTTInterface
{
/**
* @throws JobException
* @throws ClientException
* @throws CommandException
*/
public function __construct(protected LoggerInterface $logger, protected beansclient\BeansClient $client,
protected string $tube = 'default', protected int $ttr = beansclient\BeansClient::DEFAULT_TTR,
protected int $priority = 1)
{
$this->client->watchTube($this->tube);
}
public function exists(): bool
{
try {
$stats = $this->client->statsTube($this->tube);
return $stats['current-jobs-ready'] > 0;
} catch (Exception $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
return false;
}
}
protected ?beansclient\Job $currentJob = null;
public function get(): string
{
if (!$this->exists()) {
throw new MissingJob();
}
try {
$job = $this->client->watchTube($this->tube)->reserve();
$this->currentJob = $job;
return $job->payload;
} catch (Exception $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
throw new MissingJob($exception);
}
}
/**
* @param string $value
* @param int $delay
* @return $this
* @throws SetJob
*/
public function set(string $value, int $delay = 0): self
{
try {
$this->client->useTube($this->tube)->put($value, $this->priority, $delay, $this->ttr ?? 0);
} catch (Exception $exception) {
$this->logger->error($exception->getMessage(), ['payload' => $value, 'delay' => $delay, 'exception' => $exception]);
throw new SetJob($value, $exception);
}
return $this;
}
/**
* @param string $newPayload
* @param int|null $jobId
* @return self
* @throws RemoveJob
* @throws SetJob
*/
public function update(string $newPayload, ?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJob->id;
}
return $this->remove($jobId)
->set($newPayload);
}
/**
* @param int|null $jobId
* @return $this
* @throws RemoveJob
*/
public function remove(?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJob->id;
}
try {
if (!$this->client->useTube($this->tube)->delete($jobId)) {
throw new JobException("Failed to delete job {$jobId}");
}
if ($this->currentJob !== null && $this->currentJob->id === $jobId) {
$this->currentJob = null;
}
} catch (Exception $exception) {
$this->logger->error($exception->getMessage(), ['jobId' => $jobId, 'exception' => $exception]);
throw new RemoveJob($jobId, $exception);
}
return $this;
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace Incoviba\Service\MQTT;
use Incoviba\Exception\MQTT\MissingJob;
interface MQTTInterface {
/**
* @return bool
*/
public function exists(): bool;
/**
* @return string
* @throws MissingJob
*/
public function get(): string;
/**
* @param string $value
* @param int $delay
* @return self
*/
public function set(string $value, int $delay = 0): self;
public function remove(?int $jobId = null): self;
}

View File

@ -0,0 +1,59 @@
<?php
namespace Incoviba\Service\MQTT;
use Incoviba\Common\Ideal\Service;
use Psr\Log\LoggerInterface;
use Pheanstalk as PBA;
class Pheanstalk extends Service implements MQTTInterface
{
const string DEFAULT_TUBE = 'default';
const int DEFAULT_TTR = 60;
const int DEFAULT_PRIORITY = 1_024;
public function __construct(LoggerInterface $logger, protected PBA\Pheanstalk $client, string $tubeName = self::DEFAULT_TUBE)
{
parent::__construct($logger);
$this->tube = new PBA\Values\TubeName($tubeName);
}
protected PBA\Values\TubeName $tube;
public function set(string $value, int $delay = 0): self
{
$this->client->useTube($this->tube);
$this->client->put($value, self::DEFAULT_PRIORITY, $delay, self::DEFAULT_TTR);
return $this;
}
public function exists(): bool
{
$stats = $this->client->statsTube($this->tube);
return $stats->currentJobsReady > 0;
}
protected int $currentJobId;
public function get(): string
{
$this->client->useTube($this->tube);
$job = $this->client->reserve();
$this->currentJobId = $job->getId();
return $job->getData();
}
public function update(string $newPayload, ?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJobId;
}
$this->remove($jobId);
$this->set($newPayload);
return $this;
}
public function remove(?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJobId;
}
$this->client->useTube($this->tube);
$this->client->delete(new PBA\Values\JobId($jobId));
return $this;
}
}

View File

@ -7,6 +7,7 @@ use Psr\Log\LoggerInterface;
use Incoviba\Common\Ideal; use Incoviba\Common\Ideal;
use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update}; use Incoviba\Exception\ServiceAction\{Create, Delete, Read, Update};
use Incoviba\Service; use Incoviba\Service;
use Incoviba\Model;
class Queue extends Ideal\Service class Queue extends Ideal\Service
{ {
@ -29,7 +30,7 @@ class Queue extends Ideal\Service
try { try {
$this->jobService->add($configuration); $this->jobService->add($configuration);
return true; return true;
} catch (Read $exception) { } catch (Create $exception) {
$final = new Exception("Could not enqueue job", 0, $exception); $final = new Exception("Could not enqueue job", 0, $exception);
$this->logger->warning($final); $this->logger->warning($final);
return false; return false;
@ -40,22 +41,8 @@ class Queue extends Ideal\Service
return $this->enqueue($configuration); return $this->enqueue($configuration);
} }
/** public function runJob(Model\Job $job, ?RequestInterface $request = null): bool
* @return array
*/
public function getPendingJobs(): array
{ {
return $this->jobService->getPending();
}
public function runJob(int $job_id, ?RequestInterface $request = null): bool
{
try {
$job = $this->jobService->getPendingById($job_id);
} catch (Read $exception) {
$this->logger->debug($exception);
return false;
}
$type = 'default'; $type = 'default';
if (isset($job->configuration['type'])) { if (isset($job->configuration['type'])) {
$type = strtolower($job->configuration['type']); $type = strtolower($job->configuration['type']);
@ -71,50 +58,57 @@ class Queue extends Ideal\Service
try { try {
if (!$worker->execute($job)) { if (!$worker->execute($job)) {
$this->logger->debug("Could not execute job {$job_id}"); $this->logger->debug("Could not execute job {$job->id}");
$job->retries++;
$this->jobService->update($job);
return false; return false;
} }
if (!$this->jobService->execute($job)) { if (!$this->jobService->execute($job)) {
$this->logger->debug("Could not remove job {$job_id}"); $this->logger->debug("Could not remove job {$job->id}");
return false; return false;
} }
} catch (Exception $exception) { } catch (Exception $exception) {
$final = new Exception("Could not run job", 0, $exception); $this->logger->warning("Could not run job {$job->id}", ['exception' => $exception]);
$this->logger->warning($final); $job->retries++;
try {
$this->jobService->update($job);
} catch (Update $exception) {
$this->logger->error($exception->getMessage(), ['job' => $job, 'exception' => $exception]);
}
return false; return false;
} }
return true; return true;
} }
public function run(?RequestInterface $request = null): bool public function run(?RequestInterface $request = null): bool
{ {
$jobs = $this->jobService->getPending(); if (!$this->jobService->isPending()) {
if (count($jobs) === 0) {
$this->logger->debug("No pending jobs");
return true; return true;
} }
try {
$errors = []; $job = $this->jobService->get();
foreach ($jobs as $job) { } catch (Read $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
return false;
}
if ($job->retries >= $this->maxRetries) { if ($job->retries >= $this->maxRetries) {
try { try {
$this->jobService->remove($job); $this->jobService->remove($job);
} catch (Read | Delete $exception) { } catch (Delete $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]); $this->logger->error($exception->getMessage(), ['job' => $job, 'exception' => $exception]);
} }
continue; return true;
} }
try { try {
$this->runJob($job->id, $request); $this->runJob($job, $request);
} catch (Exception) { } catch (Exception) {
$job->retries ++; $job->retries ++;
try { try {
$this->jobService->update($job); $this->jobService->update($job);
} catch (Read | Update $exception) { } catch (Update $exception) {
$this->logger->error($exception->getMessage(), ['exception' => $exception]); $this->logger->error($exception->getMessage(), ['job' => $job, 'exception' => $exception]);
} }
$errors []= $job->id; return false;
} }
} return true;
return count($errors) === 0;
} }
} }

View File

@ -4,7 +4,7 @@ namespace Incoviba\Service;
use DateTimeInterface; use DateTimeInterface;
use DateTimeImmutable; use DateTimeImmutable;
use DateMalformedStringException; use DateMalformedStringException;
use function PHPUnit\Framework\countOf; use Incoviba\Service\Valor\Phone;
class Valor class Valor
{ {
@ -40,6 +40,14 @@ class Valor
} }
return $value / $this->ufService->get($date); return $value / $this->ufService->get($date);
} }
public function phone(): Phone
{
return new Phone();
}
public function telefono(): Phone
{
return $this->phone();
}
protected function getDateTime(null|string|DateTimeInterface $date): DateTimeInterface protected function getDateTime(null|string|DateTimeInterface $date): DateTimeInterface
{ {

View File

@ -0,0 +1,28 @@
<?php
namespace Incoviba\Service\Valor;
class Phone
{
public function toDatabase(?string $phone): ?int
{
if ($phone === null) {
return null;
}
return (int) str_replace([' ', '+'], '', $phone) ?? null;
}
public function toDisplay(?int $phone): ?string
{
if ($phone === null) {
return null;
}
$parts = preg_split('/(?=<country>\d{2})?(?=<area>\d)(?=<first>\d{4})(?=<last>\d{4})/', $phone);
$output = [];
if (array_key_exists('country', $parts)) {
$output [] = "+{$parts[0]}";
}
$output [] = $parts[1] ?? '';
$output [] = $parts[2] ?? '';
$output [] = $parts[3] ?? '';
return implode(' ', $output);
}
}

View File

@ -1,10 +1,10 @@
<?php <?php
namespace Incoviba\Service; namespace Incoviba\Service;
use Exception;
use DateTimeImmutable; use DateTimeImmutable;
use DateMalformedStringException; use DateMalformedStringException;
use Incoviba\Exception\ServiceAction\{Create, Read, Update}; use Incoviba\Exception\ServiceAction\{Create, Read, Update};
use Incoviba\Common\Define;
use PDOException; use PDOException;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Incoviba\Common\Ideal\Service; use Incoviba\Common\Ideal\Service;
@ -12,7 +12,7 @@ use Incoviba\Common\Implement;
use Incoviba\Repository; use Incoviba\Repository;
use Incoviba\Model; use Incoviba\Model;
class Venta extends Service class Venta extends Service\Repository
{ {
public function __construct( public function __construct(
LoggerInterface $logger, LoggerInterface $logger,
@ -189,6 +189,11 @@ class Venta extends Service
} }
} }
public function getRepository(): Define\Repository
{
return $this->ventaRepository;
}
protected function process(Model\Venta $venta): Model\Venta protected function process(Model\Venta $venta): Model\Venta
{ {
if ($venta->uf === 0.0) { if ($venta->uf === 0.0) {

View File

@ -76,15 +76,24 @@ abstract class AbstractEndPoint extends LoggerEnabled implements EndPoint
* @param array $data * @param array $data
* @param array $validStatus * @param array $validStatus
* @param array $invalidStatus * @param array $invalidStatus
* @param string|null $accountKey
* @return bool * @return bool
* @throws EmptyResponse * @throws EmptyResponse
*/ */
protected function sendAdd(string $request_uri, array $data, array $validStatus, array $invalidStatus): bool protected function sendAdd(string $request_uri, array $data, array $validStatus, array $invalidStatus, ?string $accountKey = null): bool
{ {
$params = $this->mapParams($data); $params = $this->mapParams($data);
$this->logger->info('Send Add', ['uri' => $request_uri, 'params' => $params]); $this->logger->info('Send Add', ['uri' => $request_uri, 'params' => $params]);
try { try {
$response = $this->client->post($request_uri, ['json' => $params]); $options = [
'json' => $params
];
if ($accountKey !== null) {
$options['headers'] = [
'X-Account-Key' => $accountKey
];
}
$response = $this->client->post($request_uri, $options);
} catch (ClientExceptionInterface $exception) { } catch (ClientExceptionInterface $exception) {
throw new EmptyResponse($request_uri, $exception); throw new EmptyResponse($request_uri, $exception);
} }
@ -111,14 +120,23 @@ abstract class AbstractEndPoint extends LoggerEnabled implements EndPoint
* @param array $data * @param array $data
* @param array $validStatus * @param array $validStatus
* @param array $invalidStatus * @param array $invalidStatus
* @param string|null $accountKey
* @return bool * @return bool
* @throws EmptyResponse * @throws EmptyResponse
*/ */
protected function sendEdit(string $request_uri, array $data, array $validStatus, array $invalidStatus): bool protected function sendEdit(string $request_uri, array $data, array $validStatus, array $invalidStatus, ?string $accountKey = null): bool
{ {
$params = $this->mapParams($data); $params = $this->mapParams($data);
try { try {
$response = $this->client->put($request_uri, ['json' => $params]); $options = [
'json' => $params
];
if ($accountKey !== null) {
$options['headers'] = [
'X-Account-Key' => $accountKey
];
}
$response = $this->client->put($request_uri, $options);
} catch (ClientExceptionInterface $exception) { } catch (ClientExceptionInterface $exception) {
throw new EmptyResponse($request_uri, $exception); throw new EmptyResponse($request_uri, $exception);
} }

View File

@ -28,18 +28,20 @@ interface EndPoint
/** /**
* @param array $data * @param array $data
* @param string|null $accountKey
* @return bool * @return bool
* @throws EmptyResponse * @throws EmptyResponse
*/ */
public function add(array $data): bool; public function add(array $data, ?string $accountKey = null): bool;
/** /**
* @param string $id * @param string $id
* @param array $data * @param array $data
* @param string|null $accountKey
* @return bool * @return bool
* @throws EmptyResponse * @throws EmptyResponse
*/ */
public function edit(string $id, array $data): bool; public function edit(string $id, array $data, ?string $accountKey = null): bool;
/** /**
* @param string $id * @param string $id

View File

@ -1,8 +1,10 @@
<?php <?php
namespace Incoviba\Service\Venta\MediosPago; namespace Incoviba\Service\Venta\MediosPago;
use Incoviba\Common\Implement\Exception\EmptyResult;
use InvalidArgumentException; use InvalidArgumentException;
use PDO; use PDO;
use PDOException;
use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\ServerRequestInterface;
use Incoviba\Common\Define\Connection; use Incoviba\Common\Define\Connection;
use Incoviba\Common\Ideal; use Incoviba\Common\Ideal;
@ -80,13 +82,18 @@ class Toku extends Ideal\Service
try { try {
return $this->subscription->getById($venta->id); return $this->subscription->getById($venta->id);
} catch (InvalidResult $exception) { } catch (InvalidResult $exception) {
$inmobiliaria = $venta->proyecto()->inmobiliaria();
$accountKey = null;
try {
$accountKey = $this->getAccountKey($inmobiliaria->rut);
} catch (EmptyResult) {}
$subscriptionData = [ $subscriptionData = [
'customer' => $customer['toku_id'], 'customer' => $customer['toku_id'],
'product_id' => $venta->id, 'product_id' => $venta->id,
'venta' => $venta 'venta' => $venta
]; ];
try { try {
if (!$this->subscription->add($subscriptionData)) { if (!$this->subscription->add($subscriptionData, $accountKey)) {
throw new InvalidResult("Could not save Subscription for Venta {$venta->id}", 409, $exception); throw new InvalidResult("Could not save Subscription for Venta {$venta->id}", 409, $exception);
} }
} catch (EmptyResponse $exception) { } catch (EmptyResponse $exception) {
@ -95,7 +102,6 @@ class Toku extends Ideal\Service
return $this->subscription->getById($venta->id); return $this->subscription->getById($venta->id);
} }
} }
/** /**
* @param Model\Venta $venta * @param Model\Venta $venta
* @param array $cuotas_ids * @param array $cuotas_ids
@ -115,6 +121,12 @@ class Toku extends Ideal\Service
}); });
} catch (EmptyResponse) {} } catch (EmptyResponse) {}
$inmobiliaria = $venta->proyecto()->inmobiliaria();
$accountKey = null;
try {
$accountKey = $this->getAccountKey($inmobiliaria->rut);
} catch (EmptyResult) {}
$invoices = []; $invoices = [];
$errors = []; $errors = [];
foreach ($venta->formaPago()->pie->cuotas() as $cuota) { foreach ($venta->formaPago()->pie->cuotas() as $cuota) {
@ -142,7 +154,7 @@ class Toku extends Ideal\Service
'cuota' => $cuota, 'cuota' => $cuota,
'venta' => $venta 'venta' => $venta
]; ];
if (!$this->invoice->add($invoiceData)) { if (!$this->invoice->add($invoiceData, $accountKey)) {
throw new EmptyResponse("Could not add Invoice for Cuota {$cuota->id}", $exception); throw new EmptyResponse("Could not add Invoice for Cuota {$cuota->id}", $exception);
} }
$invoices []= $this->invoice->getById($cuota->id); $invoices []= $this->invoice->getById($cuota->id);
@ -290,6 +302,95 @@ class Toku extends Ideal\Service
return $queues; return $queues;
} }
public function update(array $ids, ?string $type = null): array
{
if ($type === null) {
$types = [
'customers',
'subscriptions',
'invoices'
];
$results = [];
foreach ($types as $type) {
$results[$type] = $this->update($ids[$type], $type);
}
return $results;
}
$results = [];
switch ($type) {
case 'subscriptions':
try {
$results['subscription'] = $this->subscription->update($ids);
} catch (EmptyResult | EmptyResponse $exception) {
$this->logger->error($exception);
}
break;
case 'invoices':
try {
$results['invoice'] = $this->invoice->updateAll($ids);
} catch (EmptyResult $exception) {
$this->logger->error($exception);
}
break;
}
return $results;
}
/**
* @param ServerRequestInterface $request
* @param array $tokenConfig
* @return bool
*/
public function validateToken(ServerRequestInterface $request, array $tokenConfig): bool
{
if (!$request->hasHeader('User-Agent') or !str_starts_with($request->getHeaderLine('User-Agent'), 'Toku-Webhooks')) {
return false;
}
if (!$request->hasHeader('X-Datadog-Tags') or !$request->hasHeader('Tracestate')) {
return false;
}
if (!$request->hasHeader('Toku-Signature')) {
return false;
}
$tokuSignature = $request->getHeaderLine('Toku-Signature');
try {
list($timestamp, $signature) = array_map(function($elem) {
return explode('=', $elem)[1];
}, explode(',', $tokuSignature));
$body = $request->getBody()->getContents();
$json = json_decode($body, true);
if (!is_array($json)) {
return false;
}
if (!array_key_exists('id', $json)) {
return false;
}
$eventId = $json['id'];
$eventType = $json['event_type'];
$query = $this->connection->getQueryBuilder()
->select('secret')
->from('toku_webhooks')
->where('enabled = ? AND JSON_SEARCH(events, "one", ?) IS NOT NULL');
$params = [true, $eventType];
$statement = $this->connection->prepare($query);
$statement->execute($params);
$results = $statement->fetchAll(PDO::FETCH_COLUMN);
if (count($results) === 0) {
return false;
}
if (array_any($results, fn($secret) => $this->hmac->validate($timestamp, $signature, $eventId, $secret))) {
return true;
}
} catch (Throwable $throwable) {
$this->logger->error($throwable);
}
return false;
}
/** /**
* @param array $request * @param array $request
* @return bool * @return bool
@ -406,54 +507,20 @@ class Toku extends Ideal\Service
$data['date'] = $data['transaction_date']; $data['date'] = $data['transaction_date'];
return $data; return $data;
} }
protected function getAccountKey(int $sociedad_rut): string
public function validateToken(ServerRequestInterface $request, array $tokenConfig): bool
{ {
if (!$request->hasHeader('User-Agent') or !str_starts_with($request->getHeaderLine('User-Agent'), 'Toku-Webhooks')) {
return false;
}
if (!$request->hasHeader('X-Datadog-Tags') or !$request->hasHeader('Tracestate')) {
return false;
}
if (!$request->hasHeader('Toku-Signature')) {
return false;
}
$tokuSignature = $request->getHeaderLine('Toku-Signature');
try {
list($timestamp, $signature) = array_map(function($elem) {
return explode('=', $elem)[1];
}, explode(',', $tokuSignature));
$body = $request->getBody()->getContents();
$json = json_decode($body, true);
if (!is_array($json)) {
return false;
}
if (!array_key_exists('id', $json)) {
return false;
}
$eventId = $json['id'];
$eventType = $json['event_type'];
$query = $this->connection->getQueryBuilder() $query = $this->connection->getQueryBuilder()
->select('secret') ->select('account_key')
->from('toku_webhooks') ->from('toku_accounts')
->where('enabled = ? AND JSON_SEARCH(events, "one", ?) IS NOT NULL'); ->where('enabled = ? AND sociedad_rut = ?');
$params = [true, $eventType]; $params = [true, $sociedad_rut];
try {
$statement = $this->connection->prepare($query); $statement = $this->connection->prepare($query);
$statement->execute($params); $statement->execute($params);
$results = $statement->fetchAll(PDO::FETCH_COLUMN); return $statement->fetchColumn();
if (count($results) === 0) { } catch (PDOException $exception) {
return false; $this->logger->error($exception);
} throw new EmptyResult($query, $exception);
}
if (array_any($results, fn($secret) => $this->hmac->validate($timestamp, $signature, $eventId, $secret))) {
return true;
}
} catch (Throwable $throwable) {
$this->logger->error($throwable);
}
return false;
} }
} }

View File

@ -29,15 +29,15 @@ class Customer extends AbstractEndPoint
$request_uri = "/customers/{$id}"; $request_uri = "/customers/{$id}";
return $this->sendGet($request_uri, [200], [404, 422]); return $this->sendGet($request_uri, [200], [404, 422]);
} }
public function add(array $data): bool public function add(array $data, ?string $accountKey = null): bool
{ {
$request_uri = "/customers"; $request_uri = "/customers";
return $this->sendAdd($request_uri, $data, [200, 201], [400, 422]); return $this->sendAdd($request_uri, $data, [200, 201], [400, 422], $accountKey);
} }
public function edit(string $id, array $data): bool public function edit(string $id, array $data, ?string $accountKey = null): bool
{ {
$request_uri = "customers/{$id}"; $request_uri = "customers/{$id}";
return $this->sendEdit($request_uri, $data, [200], [400, 404, 422]); return $this->sendEdit($request_uri, $data, [200], [400, 404, 422], $accountKey);
} }
public function delete(string $id): void public function delete(string $id): void
{ {

View File

@ -4,6 +4,7 @@ namespace Incoviba\Service\Venta\MediosPago\Toku;
use DateMalformedStringException; use DateMalformedStringException;
use DateTimeImmutable; use DateTimeImmutable;
use DateTimeZone; use DateTimeZone;
use PDO;
use PDOException; use PDOException;
use Psr\Http\Client\ClientInterface; use Psr\Http\Client\ClientInterface;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
@ -39,15 +40,15 @@ class Invoice extends AbstractEndPoint
$request_uri = "/invoices/{$id}"; $request_uri = "/invoices/{$id}";
return $this->sendGet($request_uri, [200], [404]); return $this->sendGet($request_uri, [200], [404]);
} }
public function add(array $data): bool public function add(array $data, ?string $accountKey = null): bool
{ {
$request_uri = "/invoices"; $request_uri = "/invoices";
return $this->sendAdd($request_uri, $data, [200, 201], [400, 409, 422]); return $this->sendAdd($request_uri, $data, [200, 201], [400, 409, 422], $accountKey);
} }
public function edit(string $id, array $data): bool public function edit(string $id, array $data, ?string $accountKey = null): bool
{ {
$request_uri = "/invoices/{$id}"; $request_uri = "/invoices/{$id}";
return $this->sendEdit($request_uri, $data, [200], [400, 404, 409, 422]); return $this->sendEdit($request_uri, $data, [200], [400, 404, 409, 422], $accountKey);
} }
public function delete(string $id): void public function delete(string $id): void
{ {
@ -199,6 +200,41 @@ class Invoice extends AbstractEndPoint
return $this->pagoService->depositar($invoice->cuota->pago, $date); return $this->pagoService->depositar($invoice->cuota->pago, $date);
} }
/**
* @param array $idsData
* @return array
* @throws EmptyResult
*/
public function updateAll(array $idsData): array
{
$tokuIds = array_column($idsData, 'toku_id');
$oldIds = array_column($idsData, 'product_id');
$placeholders = array_map(fn($id) => "id{$id}", array_keys($oldIds));
$placeholdersString = implode(', ', array_map(fn($id) => ":{$id}", $placeholders));
$query = $this->pagoService->getRepository()->getConnection()->getQueryBuilder()
->select('pago.id, CONCAT_WS("-", unidad.descripcion, CONCAT_WS("-", propietario.rut, propietario.dv)) AS old_pid')
->from('pago')
->joined('JOIN cuota ON cuota.pago = pago.id')
->joined('JOIN venta ON venta.pie = cuota.pie')
->joined('JOIN propietario ON propietario.rut = venta.propietario')
->joined('JOIN propiedad_unidad pu ON pu.propiedad = venta.propiedad')
->joined('JOIN unidad ON pu.unidad = unidad.id')
->having("old_pid IN ({$placeholdersString})");
$values = array_combine($placeholders, $oldIds);
try {
$statement = $this->pagoService->getRepository()->getConnection()->execute($query, $values);
$results = $statement->fetchAll(PDO::FETCH_ASSOC);
} catch (PDOException $exception) {
$this->logger->error($exception);
throw new EmptyResult($query, $exception);
}
$ids = array_column($results, 'pago.id');
$newIds = array_combine($ids, $tokuIds);
return array_map(fn($id) => ['product_id' => $id, 'toku_id' => $newIds[$id]], $ids);
}
public function save(array $data): bool public function save(array $data): bool
{ {
return $this->doSave($this->invoiceRepository, $data); return $this->doSave($this->invoiceRepository, $data);
@ -215,7 +251,7 @@ class Invoice extends AbstractEndPoint
{ {
$paramsMap = [ $paramsMap = [
'customer' => 'customer', 'customer' => 'customer',
'product_id' => 'product_id', 'product_id' => 'cuota_id',
'due_date' => 'fecha', 'due_date' => 'fecha',
'subscription' => 'subscription', 'subscription' => 'subscription',
'amount' => 'valor', 'amount' => 'valor',

View File

@ -1,6 +1,7 @@
<?php <?php
namespace Incoviba\Service\Venta\MediosPago\Toku; namespace Incoviba\Service\Venta\MediosPago\Toku;
use PDO;
use PDOException; use PDOException;
use Psr\Http\Client\ClientInterface; use Psr\Http\Client\ClientInterface;
use Incoviba\Common\Implement\Exception\EmptyResponse; use Incoviba\Common\Implement\Exception\EmptyResponse;
@ -34,15 +35,15 @@ class Subscription extends AbstractEndPoint
$request_uri = "/subscriptions/{$id}"; $request_uri = "/subscriptions/{$id}";
return $this->sendGet($request_uri, [200], [401, 404, 422]); return $this->sendGet($request_uri, [200], [401, 404, 422]);
} }
public function add(array $data): bool public function add(array $data, ?string $accountKey = null): bool
{ {
$request_uri = '/subscriptions'; $request_uri = '/subscriptions';
return $this->sendAdd($request_uri, $data, [200, 201], [401, 404, 409, 422]); return $this->sendAdd($request_uri, $data, [200, 201], [401, 404, 409, 422], $accountKey);
} }
public function edit(string $id, array $data): bool public function edit(string $id, array $data, ?string $accountKey = null): bool
{ {
$request_uri = "/subscriptions/{$id}"; $request_uri = "/subscriptions/{$id}";
return $this->sendEdit($request_uri, $data, [200], [401, 404, 409, 422]); return $this->sendEdit($request_uri, $data, [200], [401, 404, 409, 422], $accountKey);
} }
public function delete(string $id): void public function delete(string $id): void
{ {
@ -111,6 +112,101 @@ class Subscription extends AbstractEndPoint
} }
} }
/**
* @param array $idsData
* @return array
* @throws EmptyResult
* @throws EmptyResponse
*/
public function update(array $idsData): array
{
$tokuIds = array_column($idsData, 'toku_id');
$oldPids = array_column($idsData, 'product_id');
$placeholders = array_map(fn($id) => "id{$id}", array_keys($oldPids));
$placeholdersString = implode(', ', array_map(fn($id) => ":{$id}", $placeholders));
$query = $this->ventaService->getRepository()->getConnection()->getQueryBuilder()
->select('venta.id, CONCAT_WS("-", unidad.descripcion, CONCAT_WS("-", propietario.rut, propietario.dv)) AS old_pid')
->from('venta')
->joined('JOIN propietario ON propietario.rut = venta.propietario')
->joined('JOIN propiedad_unidad pu ON pu.propiedad = venta.propiedad')
->joined('JOIN unidad ON pu.unidad = unidad.id')
->having("old_pid IN ({$placeholdersString})");
$values = array_combine($placeholders, $oldPids);
try {
$statement = $this->ventaService->getRepository()->getConnection()->execute($query, $values);
$results = $statement->fetchAll(PDO::FETCH_ASSOC);
} catch (PDOException $exception) {
$this->logger->error($exception->getMessage(), [
'query' => $query,
'values' => $values,
'ids' => $idsData,
'exception' => $exception]);
throw new EmptyResult($query, $exception);
}
$accountKeys = $this->getAccountKey(array_column($results, 'id'));
$newPids = [];
$keys = [];
foreach ($results as $result) {
$idx = array_search($result['old_pid'], $oldPids);
$newPids[$idx] = $result['id'];
if (array_key_exists($result['id'], $accountKeys)) {
$keys[$idx] = $accountKeys[$result['id']];
}
}
$output = [];
foreach ($tokuIds as $idx => $tokuId) {
if (!isset($newPids[$idx])) {
continue;
}
$data = [
'product_id' => $newPids[$idx],
];
try {
if (!$this->edit($tokuId, $data, array_key_exists($idx, $keys) ? $keys[$idx] : null)) {
$this->logger->error('Error while updating Toku', [
'toku_id' => $tokuId,
'old_pid' => $oldPids[$idx],
'product_id' => $newPids[$idx],
'account_key' => array_key_exists($idx, $keys) ? $keys[$idx] : null]);
$output[] = [
'toku_id' => $tokuId,
'old_pid' => $oldPids[$idx],
'product_id' => $newPids[$idx],
'account_key' => array_key_exists($idx, $keys) ? $keys[$idx] : null,
'error' => 'Error while updating Toku'
];
continue;
}
} catch (EmptyResponse $exception) {
$this->logger->error($exception->getMessage(), [
'toku_id' => $tokuId,
'old_pid' => $oldPids[$idx],
'product_id' => $newPids[$idx],
'account_key' => array_key_exists($idx, $keys) ? $keys[$idx] : null,
'exception' => $exception]);
$output[] = [
'toku_id' => $tokuId,
'old_pid' => $oldPids[$idx],
'product_id' => $newPids[$idx],
'account_key' => array_key_exists($idx, $keys) ? $keys[$idx] : null,
'error' => $exception->getMessage()
];
continue;
}
$output[] = [
'toku_id' => $tokuId,
'old_pid' => $oldPids[$idx],
'product_id' => $newPids[$idx],
'account_key' => array_key_exists($idx, $keys) ? $keys[$idx] : null
];
}
return $output;
}
public function save(array $data): bool public function save(array $data): bool
{ {
return $this->doSave($this->subscriptionRepsitory, $data); return $this->doSave($this->subscriptionRepsitory, $data);
@ -133,11 +229,11 @@ class Subscription extends AbstractEndPoint
if ($ref === null) { if ($ref === null) {
continue; continue;
} }
if ($ref === 'pieValor') { if ($ref === 'pieValor' and array_key_exists('venta', $data)) {
$params[$key] = $data['venta']->formaPago()?->pie?->valor ?? 0; $params[$key] = $data['venta']?->formaPago()?->pie?->valor ?? 0;
continue; continue;
} }
if ($ref === 'datosVenta') { if ($ref === 'datosVenta' and array_key_exists('venta', $data)) {
$params[$key] = $this->datosVenta($data['venta']); $params[$key] = $this->datosVenta($data['venta']);
continue; continue;
} }
@ -169,4 +265,38 @@ class Subscription extends AbstractEndPoint
'Unidades' => $venta->propiedad()->summary() 'Unidades' => $venta->propiedad()->summary()
]; ];
} }
/**
* @param array $ventaIds
* @return array
* @throws EmptyResult
*/
protected function getAccountKey(array $ventaIds): array
{
$placeholders = array_map(fn($id) => "id{$id}", array_keys($ventaIds));
$placeholdersString = implode(', ', array_map(fn($id) => ":{$id}", $placeholders));
$query = $this->ventaService->getRepository()->getConnection()->getQueryBuilder()
->select('account_key, venta.id AS venta_id')
->from('toku_accounts')
->joined('JOIN proyecto ON proyecto.inmobiliaria = toku_accounts.sociedad_rut')
->joined('JOIN proyecto_tipo_unidad ptu ON ptu.proyecto = proyecto.id')
->joined('JOIN unidad ON unidad.pt = ptu.id')
->joined('JOIN propiedad_unidad pu ON pu.unidad = unidad.id')
->joined('JOIN venta ON venta.propiedad = pu.propiedad')
->where("venta.id IN ({$placeholdersString}) AND toku_accounts.enabled = 1");
$values = array_combine($placeholders, $ventaIds);
try {
$statement = $this->ventaService->getRepository()->getConnection()->execute($query, $values);
$results = $statement->fetchAll(PDO::FETCH_ASSOC);
} catch (PDOException $exception) {
$this->logger->error($exception->getMessage(), [
'query' => $query,
'values' => $values,
'exception' => $exception]);
throw new EmptyResult($query, $exception);
}
$keys = array_column($results, 'account_key');
$ids = array_column($results, 'venta_id');
return array_combine($ids, $keys);
}
} }

View File

@ -4,25 +4,37 @@ namespace Incoviba\Service\Venta;
use DateTimeInterface; use DateTimeInterface;
use DateTimeImmutable; use DateTimeImmutable;
use DateMalformedStringException; use DateMalformedStringException;
use Incoviba\Common\Define;
use Incoviba\Exception\ServiceAction\Create; use Incoviba\Exception\ServiceAction\Create;
use Incoviba\Exception\ServiceAction\Read; use Incoviba\Exception\ServiceAction\Read;
use Incoviba\Exception\ServiceAction\Update; use Incoviba\Exception\ServiceAction\Update;
use PDOException; use PDOException;
use Incoviba\Common\Ideal;
use Incoviba\Common\Implement\Exception\EmptyResult; use Incoviba\Common\Implement\Exception\EmptyResult;
use Incoviba\Repository; use Incoviba\Repository;
use Incoviba\Model; use Incoviba\Model;
use Incoviba\Service; use Incoviba\Service;
use Psr\Log\LoggerInterface;
class Pago class Pago extends Ideal\Service\Repository
{ {
public function __construct( public function __construct(
LoggerInterface $logger,
protected Repository\Venta\Pago $pagoRepository, protected Repository\Venta\Pago $pagoRepository,
protected Repository\Venta\EstadoPago $estadoPagoRepository, protected Repository\Venta\EstadoPago $estadoPagoRepository,
protected Repository\Venta\TipoEstadoPago $tipoEstadoPagoRepository, protected Repository\Venta\TipoEstadoPago $tipoEstadoPagoRepository,
protected Service\UF $ufService, protected Service\UF $ufService,
protected Service\Valor $valorService, protected Service\Valor $valorService,
protected Service\Queue $queueService protected Service\Queue $queueService
) {} )
{
parent::__construct($logger);
}
public function getRepository(): Define\Repository
{
return $this->pagoRepository;
}
public function depositar(Model\Venta\Pago $pago, DateTimeInterface $fecha): bool public function depositar(Model\Venta\Pago $pago, DateTimeInterface $fecha): bool
{ {

View File

@ -1,22 +1,24 @@
<?php <?php
namespace Incoviba\Service\Venta; namespace Incoviba\Service\Venta;
use PDOException;
use Psr\Log\LoggerInterface;
use Incoviba\Common\Ideal\Service; use Incoviba\Common\Ideal\Service;
use Incoviba\Common\Implement\Exception\EmptyResult; use Incoviba\Common\Implement\Exception\EmptyResult;
use Incoviba\Exception\ServiceAction\Create; use Incoviba\Exception\ServiceAction\Create;
use Incoviba\Exception\ServiceAction\Read; use Incoviba\Exception\ServiceAction\Read;
use Incoviba\Exception\ServiceAction\Update; use Incoviba\Exception\ServiceAction\Update;
use Incoviba\Repository;
use Incoviba\Model; use Incoviba\Model;
use PDOException; use Incoviba\Repository;
use Psr\Log\LoggerInterface; use Incoviba\Service\Valor;
class Propietario extends Service class Propietario extends Service
{ {
public function __construct( public function __construct(
LoggerInterface $logger, LoggerInterface $logger,
protected Repository\Venta\Propietario $propietarioRepository, protected Repository\Venta\Propietario $propietarioRepository,
protected Repository\Direccion $direccionRepository protected Repository\Direccion $direccionRepository,
protected Valor $valorService
) { ) {
parent::__construct($logger); parent::__construct($logger);
} }
@ -49,6 +51,9 @@ class Propietario extends Service
$data['direccion'] = $direccion->id; $data['direccion'] = $direccion->id;
} }
$filteredData = $this->propietarioRepository->filterData($data); $filteredData = $this->propietarioRepository->filterData($data);
if (array_key_exists('telefono', $filteredData)) {
$filteredData['telefono'] = $this->valorService->telefono()->toDatabase($filteredData['telefono']);
}
try { try {
return $this->propietarioRepository->edit($propietario, $filteredData); return $this->propietarioRepository->edit($propietario, $filteredData);
} catch (PDOException | EmptyResult $exception) { } catch (PDOException | EmptyResult $exception) {
@ -85,6 +90,10 @@ class Propietario extends Service
]); ]);
$filtered_data = array_intersect_key($data, $fields); $filtered_data = array_intersect_key($data, $fields);
if (array_key_exists('telefono', $filtered_data)) {
$filtered_data['telefono'] = $this->valorService->telefono()->toDatabase($filtered_data['telefono']);
}
try { try {
$propietario = $this->propietarioRepository->fetchById($data['rut']); $propietario = $this->propietarioRepository->fetchById($data['rut']);
$edits = []; $edits = [];
@ -95,6 +104,7 @@ class Propietario extends Service
} catch (EmptyResult) { } catch (EmptyResult) {
try { try {
$propietario = $this->propietarioRepository->create($filtered_data); $propietario = $this->propietarioRepository->create($filtered_data);
$this->logger->info('Propietario', ['propietario' => $propietario]);
$propietario = $this->propietarioRepository->save($propietario); $propietario = $this->propietarioRepository->save($propietario);
} catch (PDOException $exception) { } catch (PDOException $exception) {
throw new Create(__CLASS__, $exception); throw new Create(__CLASS__, $exception);

View File

@ -0,0 +1,101 @@
<?php
namespace Incoviba\Test\Service\MQTT;
use Psr\Log\LoggerInterface;
use PHPUnit\Framework\TestCase;
use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Connection;
use xobotyi\beansclient\Exception\JobException;
use xobotyi\beansclient\Job;
use Incoviba\Exception\MQTT\MissingJob;
use Incoviba\Service\MQTT\Beanstalkd;
class BeanstalkdTest extends TestCase
{
protected LoggerInterface $logger;
protected BeansClient $client;
protected function setUp(): void
{
$this->logger = $this->getMockBuilder(LoggerInterface::class)->getMock();
$this->client = $this->getMockBuilder(BeansClient::class)->disableOriginalConstructor()->getMock();
}
public function testExists(): void
{
$stats = ['current-jobs-ready' => 1];
$this->client->method('watchTube')->willReturn($this->client);
$this->client->method('statsTube')->willReturn($stats);
$service = new Beanstalkd($this->logger, $this->client);
$this->assertTrue($service->exists());
}
public function testNotExists(): void
{
$stats = ['current-jobs-ready' => 0];
$this->client->method('watchTube')->willReturn($this->client);
$this->client->method('statsTube')->willReturn($stats);
$service = new Beanstalkd($this->logger, $this->client);
$this->assertFalse($service->exists());
}
public function testGet(): void
{
$jobData = [
'id' => 1,
'configuration' => [
'type' => 'service',
],
'created_at' => '2020-01-01 00:00:00',
];
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('isActive')->willReturn(true);
$this->client->method('getConnection')->willReturn($connection);
$this->client->method('watchTube')->willReturn($this->client);
$this->client->method('statsTube')->willReturn(['current-jobs-ready' => 1]);
$job = new Job($this->client, 1, 'ready', json_encode($jobData));
$this->client->method('reserve')->willReturn($job);
$service = new Beanstalkd($this->logger, $this->client);
$this->assertEquals(json_encode($jobData), $service->get());
}
public function testGetException(): void
{
$this->client->method('watchTube')->willReturn($this->client);
$this->client->method('statsTube')->willReturn(['current-jobs-ready' => 0]);
$service = new Beanstalkd($this->logger, $this->client);
$this->expectException(MissingJob::class);
$service->get();
$this->client->method('statsTube')->willReturn(['current-jobs-ready' => 1]);
$exception = new JobException();
$this->client->method('reserve')->willThrowException($exception);
$this->expectException(MissingJob::class);
$service->get();
}
public function testSet(): void
{
$this->client->method('useTube')->willReturn($this->client);
$this->client->method('put');
$service = new Beanstalkd($this->logger, $this->client);
$service->set('test');
$this->assertTrue(true);
}
public function testSetException(): void
{
$this->client->method('useTube')->willReturn($this->client);
$exception = new JobException();
$this->client->method('put')->willThrowException($exception);
$service = new Beanstalkd($this->logger, $this->client);
$service->set('test');
$this->assertTrue(true);
}
public function testRemove(): void
{
$this->client->method('useTube')->willReturn($this->client);
$this->client->method('delete')->willReturn(true);
$service = new Beanstalkd($this->logger, $this->client);
$service->remove(1);
$this->assertTrue(true);
$this->client->method('delete')->willReturn(false);
$service->remove(1);
$this->assertTrue(true);
}
}

View File

@ -0,0 +1,69 @@
<?php
namespace Incoviba\Test\Service;
use Psr\Log\LoggerInterface;
use PHPUnit\Framework\TestCase;
use xobotyi\beansclient\BeansClient;
use Incoviba\Exception\MQTT\MissingClient;
use Incoviba\Service\MQTT;
use Incoviba\Service\MQTT\Beanstalkd;
class MQTTTest extends TestCase
{
public function testRegisterAndClientExistsAndGet(): void
{
$mqtt = new MQTT();
$beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock();
$mqtt->register('beanstalkd', $beanstalkd);
$this->assertTrue($mqtt->clientExists('beanstalkd'));
}
public function testGetClient(): void
{
$mqtt = new MQTT();
$logger = $this->getMockBuilder(LoggerInterface::class)->disableOriginalConstructor()->getMock();
$client = $this->getMockBuilder(BeansClient::class)->disableOriginalConstructor()->getMock();
$beanstalkd = new Beanstalkd($logger, $client);
$mqtt->register('beanstalkd', $beanstalkd);
$this->assertEquals($beanstalkd, $mqtt->getClient('beanstalkd'));
}
public function testGetClientException(): void
{
$mqtt = new MQTT();
$this->expectException(MissingClient::class);
$mqtt->getClient('test');
}
public function testExists(): void
{
$mqtt = new MQTT();
$beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock();
$beanstalkd->method('exists')->willReturn(true);
$mqtt->register('beanstalkd', $beanstalkd);
$this->assertTrue($mqtt->exists('beanstalkd'));
}
public function testGet(): void
{
$mqtt = new MQTT();
$beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock();
$beanstalkd->method('get')->willReturn('test');
$mqtt->register('beanstalkd', $beanstalkd);
$this->assertEquals('test', $mqtt->get('beanstalkd'));
}
public function testSet(): void
{
$mqtt = new MQTT();
$beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock();
$beanstalkd->method('set');
$mqtt->register('beanstalkd', $beanstalkd);
$mqtt->set('test', 0, 'beanstalkd');
$this->assertTrue(true);
}
public function testRemove(): void
{
$mqtt = new MQTT();
$beanstalkd = $this->getMockBuilder(Beanstalkd::class)->disableOriginalConstructor()->getMock();
$beanstalkd->method('remove');
$mqtt->register('beanstalkd', $beanstalkd);
$mqtt->remove(0, 'beanstalkd');
$this->assertTrue(true);
}
}

View File

@ -0,0 +1,66 @@
<?php
namespace Incoviba\Test\Service;
use Psr\Log\LoggerInterface;
use PHPUnit\Framework\TestCase;
use Incoviba\Service\Job;
use Incoviba\Service\Queue;
use Incoviba\Service\Worker;
use Incoviba\Model;
class QueueTest extends TestCase
{
protected LoggerInterface $logger;
protected Job $jobService;
protected Worker $defaultWorker;
protected function setUp(): void
{
$this->logger = $this->getMockBuilder(LoggerInterface::class)
->disableOriginalConstructor()
->getMock();
$this->jobService = $this->getMockBuilder(Job::class)
->disableOriginalConstructor()
->getMock();
$this->defaultWorker = $this->getMockBuilder(Worker::class)
->disableOriginalConstructor()
->getMock();
}
public function testRegister(): void
{
$queue = new Queue($this->logger, $this->jobService, $this->defaultWorker);
$worker = $this->getMockBuilder(Worker::class)
->disableOriginalConstructor()
->getMock();
$queue->register('test', $worker);
$this->assertTrue(true);
}
public function testEnqueue(): void
{
$queue = new Queue($this->logger, $this->jobService, $this->defaultWorker);
$jobData = ['test' => 'test'];
$result = $queue->enqueue($jobData);
$this->assertTrue($result);
$result = $queue->push($jobData);
$this->assertTrue($result);
}
public function testRun(): void
{
$queue = new Queue($this->logger, $this->jobService, $this->defaultWorker);
$result = $queue->run();
$this->assertTrue($result);
$jobData = [
'type' => 'test',
];
$job = new Model\Job();
$job->id = 1;
$job->configuration = $jobData;
$this->jobService->method('isPending')->willReturn(true);
$this->jobService->method('get')->willReturn($job);
$result = $queue->run();
$this->assertTrue($result);
}
}

View File

@ -1,2 +1,2 @@
[www] [www]
pm.max_children = 15 pm.max_children = 8

View File

@ -2,10 +2,12 @@
"name": "incoviba/cli", "name": "incoviba/cli",
"type": "project", "type": "project",
"require": { "require": {
"ext-sockets": "*",
"dragonmantank/cron-expression": "^3.4", "dragonmantank/cron-expression": "^3.4",
"guzzlehttp/guzzle": "^7.8", "guzzlehttp/guzzle": "^7.8",
"hollodotme/fast-cgi-client": "^3.1", "hollodotme/fast-cgi-client": "^3.1",
"monolog/monolog": "^3.5", "monolog/monolog": "^3.5",
"pda/pheanstalk": "^7.0",
"php-di/php-di": "^7.0", "php-di/php-di": "^7.0",
"predis/predis": "^3.0", "predis/predis": "^3.0",
"symfony/console": "^6.3" "symfony/console": "^6.3"

View File

@ -7,5 +7,5 @@
0 2 * * * /code/bin/incoviba money:uf >> /logs/commands 2>&1 0 2 * * * /code/bin/incoviba money:uf >> /logs/commands 2>&1
0 2 * * * /code/bin/incoviba money:uf:update >> /logs/commands 2>&1 0 2 * * * /code/bin/incoviba money:uf:update >> /logs/commands 2>&1
0 2 1 * * /code/bin/incoviba money:ipc >> /logs/commands 2>&1 0 2 1 * * /code/bin/incoviba money:ipc >> /logs/commands 2>&1
*/2 * * * * /code/bin/incoviba queue >> /logs/commands 2>&1 */1 * * * * /code/bin/incoviba queue >> /logs/commands 2>&1
#0 3 * * * /code/bin/incoviba external:services >> /logs/commands 2>&1 0 3 * * * /code/bin/incoviba external:services >> /logs/commands 2>&1

7
cli/entrypoint Normal file → Executable file
View File

@ -6,8 +6,13 @@ then
then then
CMD=$1 CMD=$1
shift shift
if [[ $# -gt 0 ]]
then
$CMD -c "$@" $CMD -c "$@"
exit exit 0
fi
$CMD
exit 0
fi fi
fi fi

View File

@ -16,4 +16,15 @@ return [
} }
return new Predis\Client($options); return new Predis\Client($options);
}, },
Pheanstalk\Pheanstalk::class => function(ContainerInterface $container) {
return Pheanstalk\Pheanstalk::create(
$container->get('BEANSTALKD_HOST'),
$container->has('BEANSTALKD_PORT') ? $container->get('BEANSTALKD_PORT') : 11300
);
},
Incoviba\Service\MQTT\MQTTInterface::class => function(ContainerInterface $container) {
$service = new Incoviba\Service\MQTT($container->get(Psr\Log\LoggerInterface::class));
$service->register('default', $container->get(Incoviba\Service\MQTT\Pheanstalk::class));
return $service;
}
]; ];

View File

@ -37,6 +37,8 @@ class BaseLoop extends Console\Command\Command
foreach ($commands as $command) { foreach ($commands as $command) {
$this->runCommand($input, $output, $command); $this->runCommand($input, $output, $command);
} }
unset($commands);
memory_reset_peak_usage();
$this->waitNextTimeout($output, $start); $this->waitNextTimeout($output, $start);
} }
return self::SUCCESS; return self::SUCCESS;

View File

@ -1,48 +0,0 @@
<?php
namespace Incoviba\Command\Job;
use Symfony\Component\Console;
use Incoviba\Service;
#[Console\Attribute\AsCommand(name: 'jobs:pending', description: 'List pending jobs')]
class Pending extends Console\Command\Command
{
public function __construct(protected Service\Job $jobService, ?string $name = null)
{
parent::__construct($name);
}
protected function configure(): void
{
$this->addOption('full', 'f', Console\Input\InputOption::VALUE_NONE, 'Full output');
}
protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
{
$jobs = $this->jobService->getPending();
$jobCount = count($jobs);
$output->writeln("Found {$jobCount} pending jobs");
if ($input->getOption('full') and $jobCount > 0) {
$io = new Console\Style\SymfonyStyle($input, $output);
$rows = [];
foreach ($jobs as $job) {
$retries = $job['retries'] ?? 0;
$updated = $job['updated_at'] ?? '';
$rows[] = [
$job['id'],
$job['created_at'],
$job['configuration']['type'],
$retries,
$updated
];
}
$io->table(['ID', 'Created', 'Type', 'Retries', 'Updated'], $rows);
}
return self::SUCCESS;
}
}

View File

@ -9,22 +9,16 @@ use Incoviba\Service;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Symfony\Component\Console; use Symfony\Component\Console;
#[Console\Attribute\AsCommand(name: 'jobs:run', description: 'Run jobs')] #[Console\Attribute\AsCommand(name: 'jobs:run', description: 'Run job')]
class Run extends Console\Command\Command class Run extends Console\Command\Command
{ {
public function __construct(protected Service\FastCGI $fastcgi, protected LoggerInterface $logger, public function __construct(protected Service\FastCGI $fastcgi, protected LoggerInterface $logger,
protected Service\Job $jobService,
protected DateTimeZone $timeZone, ?string $name = null) protected DateTimeZone $timeZone, ?string $name = null)
{ {
parent::__construct($name); parent::__construct($name);
} }
protected function configure(): void
{
$this->addArgument('job_ids',
Console\Input\InputArgument::IS_ARRAY | Console\Input\InputArgument::REQUIRED, 'Job IDs');
}
protected array $output = [];
public function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int public function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
{ {
try { try {
@ -33,44 +27,18 @@ class Run extends Console\Command\Command
$now = new DateTimeImmutable(); $now = new DateTimeImmutable();
} }
$jobIds = $input->getArgument('job_ids'); if ($this->jobService->getPending() === 0) {
$jobCount = count($jobIds); $output->writeln("[{$now->format('Y-m-d H:i:s e')}] No pending jobs to run.");
return self::SUCCESS;
$this->pushOutput('top', ['message' => "[{$now->format('Y-m-d H:i:s e')}] Running {$jobCount} jobs..."]);
$this->pushOutput('bottom', ['table' => [
['Job IDs'],
array_map(function($row) {return [$row];},$jobIds)
]]);
$this->pushOutput('top', ['progress' => $jobCount]);
$result = $this->runJobs($jobIds);
$this->pushOutput('top', ['progress' => 'finish']);
$this->writeOutput($input, $output);
return $result;
} }
protected function runJobs(array $jobIds): int $output->writeln("[{$now->format('Y-m-d H:i:s e')}] Running Ready Job...");
$this->runJob();
return $this->getResponses();
}
protected function runJob(): bool
{ {
$pendingJobs = []; $uri = "/api/queue/run";
foreach ($jobIds as $jobId) {
if (!$this->runJob($jobId)) {
$pendingJobs []= $jobId;
}
}
$result = $this->getResponses();
if (count($pendingJobs) > 0) {
if ($this->runJobs($pendingJobs) === self::FAILURE) {
$result = self::FAILURE;
}
}
return $result;
}
protected function runJob(int $jobId): bool
{
$uri = "/api/queue/run/{$jobId}";
$this->pushOutput('bottom', ['message' => "GET {$uri}"]);
try { try {
$this->fastcgi->get($uri); $this->fastcgi->get($uri);
@ -85,7 +53,6 @@ class Run extends Console\Command\Command
$result = self::SUCCESS; $result = self::SUCCESS;
$responses = $this->fastcgi->awaitResponses(); $responses = $this->fastcgi->awaitResponses();
foreach ($responses as $response) { foreach ($responses as $response) {
$this->pushOutput('top', ['progress' => 'advance']);
if ($response->getError() !== '') { if ($response->getError() !== '') {
$this->logger->error("Error running job", [ $this->logger->error("Error running job", [
'error' => $response->getError(), 'error' => $response->getError(),
@ -93,100 +60,8 @@ class Run extends Console\Command\Command
'headers' => $response->getHeaders(), 'headers' => $response->getHeaders(),
]); ]);
$result = self::FAILURE; $result = self::FAILURE;
continue;
} }
$this->pushOutput('bottom', ['message' => $response->getBody()]);
} }
return $result; return $result;
} }
protected function pushOutput(string $section, array $configuration): void
{
if (!isset($this->output[$section])) {
$this->output[$section] = [];
}
foreach ($configuration as $key => $value) {
if (!isset($this->output[$section][$key])) {
$this->output[$section][$key] = [];
}
$this->output[$section][$key] []= $value;
}
if (isset($this->output[$section]['progress'])) {
usort($this->output[$section]['progress'], function($a, $b) {
if ($a === $b) {
return 0;
}
if (is_int($a)) {
return -1;
}
if (is_int($b)) {
return 1;
}
if ($a === 'finish') {
return 1;
}
if ($b === 'finish') {
return -1;
}
return 0;
});
}
}
protected function writeOutput(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): void
{
$sectionNames = array_keys($this->output);
$ios = [];
foreach ($sectionNames as $sectionName) {
$section = $output->section();
$ios[$sectionName] = new Console\Style\SymfonyStyle($input, $section);
}
foreach ($this->output as $sectionName => $configurations) {
$io = $ios[$sectionName];
$this->writeSection($io, $configurations);
}
}
protected function writeSection(Console\Style\SymfonyStyle $io, array $configurations): void
{
if (array_key_exists('table', $configurations)) {
$this->writeTables($io, $configurations['table']);
}
if (array_key_exists('progress', $configurations)) {
$this->writeProgress($io, $configurations['progress']);
}
if (array_key_exists('message', $configurations)) {
$this->writeMessages($io, $configurations['message']);
}
}
protected function writeTables(Console\Style\SymfonyStyle $io, array $tableConfigurations): void
{
foreach ($tableConfigurations as $tableData) {
$io->table(...$tableData);
}
}
protected function writeMessages(Console\Style\SymfonyStyle $io, array $messages): void
{
foreach ($messages as $message) {
$io->writeln($message);
}
}
protected function writeProgress(Console\Style\SymfonyStyle $io, array $progresses): void
{
$progressBar = null;
foreach ($progresses as $progress) {
if ($progress === 'advance' and $progressBar !== null) {
$progressBar->advance();
continue;
}
if ($progress === 'finish' and $progressBar !== null) {
$progressBar->finish();
continue;
}
if (in_array($progress, ['finish', 'advance'])) {
continue;
}
$progressBar = $io->createProgressBar($progress);
}
$io->newLine();
}
} }

View File

@ -35,61 +35,29 @@ class Queue extends Command
]; ];
$io = new Console\Style\SymfonyStyle($input, $this->sections['top']); $io = new Console\Style\SymfonyStyle($input, $this->sections['top']);
$now = new DateTimeImmutable('now', $this->timezone); $now = new DateTimeImmutable('now', $this->timezone);
if ($this->jobService->getPending() === 0) {
$io->success("[{$now->format('Y-m-d H:i:s e')}] Queue is empty");
return self::SUCCESS;
}
$io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue..."); $io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue...");
$results = [];
$jobs = $this->getJobs(); for ($i = 0; $i < $this->batchSize; $i++) {
$jobCount = count($jobs); if ($this->jobService->getPending() === 0) {
if ($jobCount === 0) { break;
return Console\Command\Command::SUCCESS;
} }
$results []= $this->runJob();
$io->writeln("Found {$jobCount} jobs to run");
$result = $this->runJobs($io, $jobs);
foreach ($this->outputs as $output) {
$this->sections['bottom']->writeln($output);
} }
return $result; return count(array_filter($results, fn ($result) => $result === self::FAILURE)) === 0 ? self::SUCCESS : self::FAILURE;
} }
protected array $sections; protected array $sections;
protected function getJobs(): array
{
$this->logger->debug("Getting jobs");
$jobs = $this->jobService->getPending();
$jobCount = count($jobs);
if ($jobCount === 0) {
$this->logger->debug("No jobs to run");
return [];
}
$this->logger->debug("Found {$jobCount} jobs");
return array_column($jobs, 'id');
}
protected function runJobs(Console\Style\SymfonyStyle $io, array $jobs): int
{
$chunks = array_chunk($jobs, $this->batchSize);
$chunkCount = count($chunks);
$result = self::SUCCESS;
$progress1 = $io->createProgressBar($chunkCount);
$progress1->start();
foreach ($chunks as $chunk) {
if ($this->runJobBatch($chunk) === self::FAILURE) {
$result = self::FAILURE;
}
$progress1->advance();
}
$progress1->finish();
return $result;
}
protected array $outputs = []; protected array $outputs = [];
protected function runJobBatch(array $jobIds): int protected function runJob(): int
{ {
$baseCommand = "{$this->baseCommand} jobs:run"; $baseCommand = "{$this->baseCommand} jobs:run";
$command = "{$baseCommand}";
$jobsLine = implode(' ', $jobIds);
$command = "{$baseCommand} {$jobsLine}";
try { try {
exec($command, $output, $resultCode); exec($command, $output, $resultCode);
$this->outputs []= $output; $this->outputs []= $output;
@ -106,7 +74,8 @@ class Queue extends Command
'result_code' => $resultCode 'result_code' => $resultCode
]); ]);
return self::FAILURE; return self::FAILURE;
} } else {
return self::SUCCESS; return self::SUCCESS;
} }
} }
}

View File

@ -0,0 +1,22 @@
<?php
namespace Incoviba\Command\Queue;
use Incoviba\Service;
use Symfony\Component\Console;
#[Console\Attribute\AsCommand(name: 'queue:pending', description: 'List pending jobs in queue')]
class Pending extends Console\Command\Command
{
public function __construct(protected Service\Job $jobService, ?string $name = null)
{
parent::__construct($name);
}
protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
{
$jobCount = $this->jobService->getPending();
$output->writeln("Found {$jobCount} pending jobs");
return self::SUCCESS;
}
}

View File

@ -2,20 +2,22 @@
namespace Incoviba\Command\Queue; namespace Incoviba\Command\Queue;
use Throwable; use Throwable;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console; use Symfony\Component\Console;
use Incoviba\Service; use Incoviba\Service;
#[Console\Attribute\AsCommand(name: 'queue:push', description: 'Push a job to the queue')] #[Console\Attribute\AsCommand(name: 'queue:push', description: 'Push a job to the queue')]
class Push extends Console\Command\Command class Push extends Console\Command\Command
{ {
public function __construct(protected Service\Job $jobService, ?string $name = null) public function __construct(protected LoggerInterface $logger, protected Service\Job $jobService, ?string $name = null)
{ {
parent::__construct($name); parent::__construct($name);
} }
protected function configure(): void protected function configure(): void
{ {
$this->addOption('configurations', 'c', Console\Input\InputOption::VALUE_REQUIRED | Console\Input\InputOption::VALUE_IS_ARRAY, 'Job configuration, must be in valid JSON format'); $this->addOption('configurations', 'c', Console\Input\InputOption::VALUE_REQUIRED | Console\Input\InputOption::VALUE_IS_ARRAY, 'Job configuration options array, each job configuration must be in valid JSON format');
$this->addOption('files', 'f', Console\Input\InputOption::VALUE_REQUIRED | Console\Input\InputOption::VALUE_IS_ARRAY, 'Paths to jobs configurations files with JSON array content');
} }
protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
@ -23,8 +25,8 @@ class Push extends Console\Command\Command
$io = new Console\Style\SymfonyStyle($input, $output); $io = new Console\Style\SymfonyStyle($input, $output);
$io->title("Pushing job"); $io->title("Pushing job");
$configurations = $input->getOption('configurations'); $configurations = $this->getConfigurations($input);
if ($configurations === null) { if (count($configurations) === 0) {
$io->error('Missing configurations'); $io->error('Missing configurations');
return self::FAILURE; return self::FAILURE;
} }
@ -46,4 +48,74 @@ class Push extends Console\Command\Command
} }
return $result; return $result;
} }
protected function getConfigurations(Console\Input\InputInterface $input): array
{
return [
...$this->getFilesConfigurations($input),
...$this->getOptionConfigurations($input),
];
}
protected function getFilesConfigurations(Console\Input\InputInterface $input): array
{
$configurations = [];
$files = $input->getOption('files');
if ($files === null) {
return $configurations;
}
foreach ($files as $filePath) {
if (!file_exists($filePath)) {
continue;
}
$configurations = array_merge($configurations, $this->getFileConfigurations($filePath));
}
return $configurations;
}
protected function getFileConfigurations(string $filePath): array
{
$configurations = [];
if (!file_exists($filePath)) {
return $configurations;
}
$json = file_get_contents($filePath);
if (!json_validate($json)) {
return $configurations;
}
$tmp = json_decode($json, true);
foreach ($tmp as $config) {
try {
$configurations []= $this->processConfiguration(json_encode($config));
} catch (Throwable $exception) {
$this->logger->warning($exception->getMessage(), ['exception' => $exception, 'config' => $config]);
}
}
return $configurations;
}
protected function getOptionConfigurations(Console\Input\InputInterface $input): array
{
$configurations = [];
$configOptions = $input->getOption('configurations');
if ($configOptions === null) {
return $configurations;
}
foreach ($configOptions as $config) {
try {
$configurations []= $this->processConfiguration($config);
} catch (Throwable $exception) {
$this->logger->warning($exception->getMessage(), ['exception' => $exception, 'config' => $config]);
}
}
return $configurations;
}
protected function processConfiguration(string $configuration): string
{
$json = json_decode($configuration, true);
if (!array_key_exists('type', $json) and !array_key_exists('configuration', $json)) {
throw new Console\Exception\InvalidArgumentException('Missing type or configuration key in JSON');
}
if (array_key_exists('type', $json)) {
return json_encode($json);
}
return json_encode($json['configuration']);
}
} }

View File

@ -0,0 +1,18 @@
<?php
namespace Incoviba\Exception;
use Throwable;
use Exception;
abstract class MQTT extends Exception
{
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
{
$baseCode = 700;
$code = $baseCode + $code;
if ($message == "") {
$message = "MQTT Exception";
}
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class Create extends MQTT
{
public function __construct(string $tube = '', string $payload = '', ?Throwable $previous = null)
{
$message = "Unable to create MQTT message: {$payload} in tube {$tube}";
$code = 11;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class Delete extends MQTT
{
public function __construct(string $tube, int $jobId, ?Throwable $previous = null)
{
$message = "Could not delete job {$jobId} in tube {$tube}";
$code = 13;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,15 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class Read extends MQTT
{
public function __construct(string $tube, ?Throwable $previous = null)
{
$message = "Error reading from tube {$tube}";
$code = 10;
parent::__construct($message, $code, $previous);
}
}

View File

@ -0,0 +1,14 @@
<?php
namespace Incoviba\Exception\MQTT;
use Incoviba\Exception\MQTT;
use Throwable;
class UnknownTransport extends MQTT
{
public function __construct(string $transportName, ?Throwable $previous = null)
{
$message = "Unknown transport {$transportName}";
parent::__construct($message, 1, $previous);
}
}

View File

@ -0,0 +1,16 @@
<?php
namespace Incoviba\Exception\MQTT;
use Throwable;
use Incoviba\Exception\MQTT;
class Update extends MQTT
{
public function __construct(string $tube, string $payload, ?int $jobId = null, ?Throwable $previous = null)
{
$jobString = $jobId !== null ? " with jobId {$jobId}" : '';
$message = "Could not update job{$jobString} with {$payload} in tube {$tube}";
$code = 12;
parent::__construct($message, $code, $previous);
}
}

9
cli/src/Service.php Normal file
View File

@ -0,0 +1,9 @@
<?php
namespace Incoviba;
use Psr\Log\LoggerInterface;
abstract class Service
{
public function __construct(protected LoggerInterface $logger) {}
}

View File

@ -5,27 +5,22 @@ use DateInvalidTimeZoneException;
use DateMalformedStringException; use DateMalformedStringException;
use DateTimeImmutable; use DateTimeImmutable;
use DateTimeZone; use DateTimeZone;
use Exception;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Predis\Connection\ConnectionException; use Incoviba\Exception\MQTT as MQTTException;
use Incoviba\Service\MQTT\MQTTInterface;
class Job class Job
{ {
public function __construct(protected LoggerInterface $logger, protected Redis $redisService) public function __construct(protected LoggerInterface $logger, protected MQTTInterface $mqttService) {}
{
$this->redisKey = 'jobs';
}
protected string $redisKey; protected string $redisKey;
public function getPending(): array public function getPending(): int
{ {
try { try {
$jobs = $this->redisService->get($this->redisKey); return $this->mqttService->pending();
return json_decode($jobs, true); } catch (MQTTException $exception) {
} catch (ConnectionException|Exception $exception) {
$exception = new Exception("Could not read {$this->redisKey} from Redis", $exception->getCode(), $exception);
$this->logger->warning($exception->getMessage(), ['exception' => $exception]); $this->logger->warning($exception->getMessage(), ['exception' => $exception]);
return []; return 0;
} }
} }
@ -44,9 +39,11 @@ class Job
'updated_at' => null, 'updated_at' => null,
'retries' => 0 'retries' => 0
]; ];
$jobs = $this->getPending(); try {
$jobs []= $data; $this->mqttService->set(json_encode($data));
$this->redisService->set($this->redisKey, json_encode($jobs), -1); } catch (MQTTException $exception) {
$this->logger->warning($exception->getMessage(), ['exception' => $exception]);
}
return $data; return $data;
} }
} }

124
cli/src/Service/MQTT.php Normal file
View File

@ -0,0 +1,124 @@
<?php
namespace Incoviba\Service;
use Incoviba\Exception\MQTT as MQTTException;
use Incoviba\Service;
use Incoviba\Service\MQTT\MQTTInterface;
class MQTT extends Service implements MQTTInterface
{
protected array $transports = [];
public function register(string $name, MQTTInterface $transport): self
{
$this->transports[$name] = $transport;
return $this;
}
/**
* @param string $payload
* @param int $delay
* @param string|null $transportName
* @return $this
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Create
*/
public function set(string $payload, int $delay = 0, ?string $transportName = null): self
{
$transport = $this->getTransport($transportName);
$transport->set($payload, $delay);
return $this;
}
/**
* @param string|null $transportName
* @return int
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Read
*/
public function pending(?string $transportName = null): int
{
$transport = $this->getTransport($transportName);
return $transport->pending();
}
/**
* @param int|null $jobId
* @param string|null $transportName
* @return bool
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Read
*/
public function exists(?int $jobId = null, ?string $transportName = null): bool
{
$transport = $this->getTransport($transportName);
return $transport->exists($jobId);
}
/**
* @param int|null $jobId
* @param string|null $transportName
* @return string
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Read
*/
public function get(?int $jobId = null, ?string $transportName = null): string
{
$transport = $this->getTransport($transportName);
return $transport->get($jobId);
}
/**
* @param string $newPayload
* @param int|null $jobId
* @param string|null $transportName
* @return $this
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Update
*/
public function update(string $newPayload, ?int $jobId = null, ?string $transportName = null): self
{
$transport = $this->getTransport($transportName);
$transport->update($newPayload, $jobId);
return $this;
}
/**
* @param int|null $jobId
* @param string|null $transportName
* @return $this
* @throws MQTTException\UnknownTransport
* @throws MQTTException\Delete
*/
public function remove(?int $jobId = null, ?string $transportName = null): self
{
$transport = $this->getTransport($transportName);
$transport->remove($jobId);
return $this;
}
/**
* @param string|null $transportName
* @return mixed
* @throws MQTTException\UnknownTransport
*/
protected function getTransport(?string $transportName): mixed
{
if (count($this->transports) === 0) {
throw new MQTTException\UnknownTransport('');
}
if ($transportName === null) {
if (array_key_exists('default', $this->transports)) {
$transportName = 'default';
} else {
$transportName = array_keys($this->transports)[0];
}
}
if (!array_key_exists($transportName, $this->transports)) {
if ($transportName === null) {
$transportName = '';
}
throw new MQTTException\UnknownTransport($transportName);
}
return $this->transports[$transportName];
}
}

View File

@ -0,0 +1,127 @@
<?php
namespace Incoviba\Service\MQTT;
use Exception;
use Psr\Log\LoggerInterface;
use xobotyi\beansclient;
use Incoviba\Service;
use Incoviba\Exception\MQTT;
class Beanstalkd extends Service implements MQTTInterface
{
const string DEFAULT_TUBE = 'default';
const int DEFAULT_TTR = 30;
const int DEFAULT_PRIORITY = 1_024;
public function __construct(LoggerInterface $logger, protected beansclient\Client $client,
protected string $tube = self::DEFAULT_TUBE,
protected int $ttr = self::DEFAULT_TTR,
protected int $priority = self::DEFAULT_PRIORITY)
{
parent::__construct($logger);
}
/**
* @param string $payload
* @param int $delay
* @return self
* @throws MQTT\Create
*/
public function set(string $payload, int $delay = 60): self
{
try {
$this->client->put($payload, $this->ttr, $this->priority, $delay);
} catch (Exception $exception) {
throw new MQTT\Create($this->tube, $payload, $exception);
}
return $this;
}
/**
* @return int
* @throws MQTT\Read
*/
public function pending(): int
{
try {
$stats = $this->client
->statsTube($this->tube);
} catch (Exception $exception) {
throw new MQTT\Read($this->tube, $exception);
}
if (!array_key_exists('current-jobs-ready', $stats)) {
throw new MQTT\Read($this->tube);
}
return $stats['current-jobs-ready'];
}
/**
* @param int|null $jobId
* @return bool
* @throws MQTT\Read
*/
public function exists(?int $jobId = null): bool
{
return $this->pending() > 0;
}
protected int $currentJobId;
/**
* @param int|null $jobId
* @return string
* @throws MQTT\Read
*/
public function get(?int $jobId = null): string
{
try {
if ($jobId !== null) {
$job = (object) $this->client
->reserveJob($jobId);
} else {
$job = (object) $this->client
->reserve();
}
} catch (Exception $exception) {
throw new MQTT\Read($this->tube, $exception);
}
$this->currentJobId = $job->id;
return $job->payload;
}
/**
* @param string $newPayload
* @param int|null $jobId
* @return self
* @throws MQTT\Update
*/
public function update(string $newPayload, ?int $jobId = null): self
{
try {
$this->remove($jobId);
$this->set($newPayload);
} catch (MQTT\Delete | MQTT\Create $exception) {
throw new MQTT\Update($this->tube, $newPayload, $jobId, $exception);
}
return $this;
}
/**
* @param int|null $jobId
* @return self
* @throws MQTT\Delete
*/
public function remove(?int $jobId = null): self
{
try {
if ($jobId === null) {
$jobId = $this->currentJobId;
}
$this->client
->delete($jobId);
} catch (Exception $exception) {
throw new MQTT\Delete($this->tube, $jobId, $exception);
}
return $this;
}
}

View File

@ -0,0 +1,12 @@
<?php
namespace Incoviba\Service\MQTT;
interface MQTTInterface
{
public function set(string $payload, int $delay = 0): self;
public function pending(): int;
public function exists(?int $jobId = null): bool;
public function get(?int $jobId = null): string;
public function update(string $newPayload, ?int $jobId = null): self;
public function remove(?int $jobId = null): self;
}

View File

@ -0,0 +1,65 @@
<?php
namespace Incoviba\Service\MQTT;
use Psr\Log\LoggerInterface;
use Pheanstalk as PBA;
use Incoviba\Service;
class Pheanstalk extends Service implements MQTTInterface
{
public function __construct(LoggerInterface $logger, protected PBA\Pheanstalk $client, string $tubeName = 'default')
{
parent::__construct($logger);
$this->tube = new PBA\Values\TubeName($tubeName);
}
protected PBA\Values\TubeName $tube;
public function set(string $payload, int $delay = 0): self
{
$this->client->useTube($this->tube);
$this->client->put($payload, $delay);
return $this;
}
public function pending(): int
{
$stats = $this->client->statsTube($this->tube);
return $stats->currentJobsReady;
}
public function exists(?int $jobId = null): bool
{
return $this->pending() > 0;
}
protected int $currentJobId;
public function get(?int $jobId = null): string
{
$this->client->watch($this->tube);
if ($jobId !== null) {
$jobId = new PBA\Values\JobId($jobId);
$job = $this->client->reserveJob($jobId);
} else {
$job = $this->client->reserve();
}
$this->currentJobId = $job->getId();
return $job->getData();
}
public function update(string $newPayload, ?int $jobId = null): self
{
$this->remove($jobId);
$this->set($newPayload);
return $this;
}
public function remove(?int $jobId = null): self
{
if ($jobId === null) {
$jobId = $this->currentJobId;
}
$this->client->watch($this->tube);
$this->client->delete(new PBA\Values\JobId($jobId));
return $this;
}
}

View File

@ -0,0 +1,85 @@
<?php
namespace Incoviba\Service;
class SystemInfo
{
public function getAllInfo(): array
{
return [
'memory' => [
'usage' => $this->getMemoryUsage(),
'peak' => $this->getPeakMemoryUsage()
],
'cpu' => [
'usage' => $this->getCpuUsage(),
'last_15minutes' => $this->getCpuUsageLast15minutes(),
'cores' => $this->getCpuCores()
]
];
}
public function get(string $name): int|null|float
{
return match ($name) {
'memory' => $this->getMemoryUsage(),
'peak_memory' => $this->getPeakMemoryUsage(),
'cpu' => $this->getCpuUsage(),
'cpu_last_15minutes' => $this->getCpuUsageLast15minutes(),
'cpu_cores' => $this->getCpuCores(),
default => null
};
}
public function getMemoryUsage(): float
{
return memory_get_usage(true);
}
public function getPeakMemoryUsage(): float
{
return memory_get_peak_usage(true);
}
public function getCpuUsage(): float
{
return $this->getCpuLoad()[0];
}
public function getCpuUsageLast15minutes(): float
{
return $this->getCpuLoad()[1];
}
protected array $cpuLoad;
protected function getCpuLoad(): array
{
if (isset($this->cpuLoad)) {
$load = sys_getloadavg();
$cores = $this->getCpuCores();
array_walk($load, function (&$value) use ($cores) {
$value = $value / $cores;
});
$this->cpuLoad = $load;
unset($load);
}
return $this->cpuLoad;
}
protected function getCpuCores(): int
{
$cpu_cores = 1;
if (is_file('/proc/cpuinfo')) {
$cpuinfo = file('/proc/cpuinfo');
preg_match_all('/^processor/m', $cpuinfo, $matches);
$cpu_cores = count($matches[0]);
}
return $cpu_cores;
}
public function formatMemoryUsage(float $usage, string $unit = 'MB'): string
{
$sizeFactor = match ($unit) {
'MB' => 1024 * 1024,
'GB' => 1024 * 1024 * 1024,
default => 1
};
return number_format($usage / $sizeFactor, 2) . " {$unit}";
}
public function formatCpuLoad(float $load): string
{
return number_format($load * 100, 2) . '%';
}
}

0
cli/start_command Normal file → Executable file
View File

21
mqtt.compose.yml Normal file
View File

@ -0,0 +1,21 @@
services:
mqtt:
profiles:
- mqtt
container_name: incoviba_mqtt
image: maateen/docker-beanstalkd
restart: unless-stopped
volumes:
- incoviba_mqtt:/var/lib/beanstalkd
mqtt-admin:
profiles:
- mqtt
container_name: incoviba_mqtt_admin
image: mitulislam/beanstalkd-aurora:latest
restart: unless-stopped
ports:
- "8093:3000"
volumes:
incoviba_mqtt: {}

View File

@ -18,6 +18,8 @@ services:
condition: service_healthy condition: service_healthy
test-redis: test-redis:
condition: service_healthy condition: service_healthy
test-mqtt:
condition: service_started
test-db: test-db:
profiles: profiles:
@ -48,6 +50,19 @@ services:
networks: networks:
- testing - testing
test-mqtt:
profiles:
- testing
image: maateen/docker-beanstalkd
container_name: incoviba_test_mqtt
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "11300" ]
interval: 5s
timeout: 5s
retries: 5
networks:
- testing
volumes: volumes:
test-db: {} test-db: {}