Batch queue
This commit is contained in:
@ -2,7 +2,6 @@ FROM php:8.4-cli
|
||||
|
||||
ENV TZ "${TZ}"
|
||||
ENV APP_NAME "${APP_NAME}"
|
||||
ENV API_URL "${API_URL}"
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends cron rsyslog nano && rm -r /var/lib/apt/lists/*
|
||||
|
||||
@ -10,7 +9,7 @@ RUN pecl install xdebug-3.4.2 \
|
||||
&& docker-php-ext-enable xdebug \
|
||||
&& echo $TZ > /etc/timezone
|
||||
|
||||
COPY --chmod=550 ./cli/entrypoint /root/entrypoint
|
||||
COPY --chmod=550 ./cli/start_command /root/start_command
|
||||
|
||||
COPY ./php-errors.ini /usr/local/etc/php/conf.d/docker-php-errors.ini
|
||||
COPY ./php-timezone.ini /usr/local/etc/php/conf.d/docker-php-timezone.ini
|
||||
@ -19,4 +18,4 @@ WORKDIR /code/bin
|
||||
|
||||
COPY --chmod=644 ./cli/crontab /var/spool/cron/crontabs/root
|
||||
|
||||
CMD [ "/root/entrypoint" ]
|
||||
CMD [ "/root/start_command" ]
|
||||
|
@ -1,6 +1,14 @@
|
||||
#/usr/bin/bash
|
||||
#!/bin/bash
|
||||
|
||||
printenv >> /etc/environment
|
||||
if [[ $# -gt 0 ]]
|
||||
then
|
||||
if [[ "$1" = "bash" || "$1" = "sh" || "$1" = "zsh" || "$1" = "/bin/bash" ]]
|
||||
then
|
||||
CMD=$1
|
||||
shift
|
||||
$CMD -c "$@"
|
||||
exit
|
||||
fi
|
||||
fi
|
||||
|
||||
#cron -f -L 11
|
||||
/code/bin/incoviba loop
|
||||
/code/bin/incoviba "$@"
|
||||
|
@ -1,2 +0,0 @@
|
||||
<?php
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Full::class));
|
@ -1,2 +0,0 @@
|
||||
<?php
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Comunas::class));
|
@ -1,2 +0,0 @@
|
||||
<?php
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\BaseLoop::class));
|
@ -1,4 +0,0 @@
|
||||
<?php
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Money\UF::class));
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Money\IPC::class));
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Money\UF\Update::class));
|
@ -1,9 +0,0 @@
|
||||
<?php
|
||||
$folder = implode(DIRECTORY_SEPARATOR, [__DIR__, 'proyectos']);
|
||||
$files = new FilesystemIterator($folder);
|
||||
foreach ($files as $file) {
|
||||
if ($file->isDir()) {
|
||||
continue;
|
||||
}
|
||||
include_once $file->getRealPath();
|
||||
}
|
@ -1,2 +0,0 @@
|
||||
<?php
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Proyectos\Activos::class));
|
@ -1,2 +0,0 @@
|
||||
<?php
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Queue::class));
|
@ -1,9 +0,0 @@
|
||||
<?php
|
||||
$folder = implode(DIRECTORY_SEPARATOR, [__DIR__, 'ventas']);
|
||||
$files = new FilesystemIterator($folder);
|
||||
foreach ($files as $file) {
|
||||
if ($file->isDir()) {
|
||||
continue;
|
||||
}
|
||||
include_once $file->getRealPath();
|
||||
}
|
@ -1,2 +0,0 @@
|
||||
<?php
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Ventas\Cierres\Vigentes::class));
|
@ -1,4 +0,0 @@
|
||||
<?php
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Ventas\Cuotas\Hoy::class));
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Ventas\Cuotas\Pendientes::class));
|
||||
$app->add($app->getContainer()->get(Incoviba\Command\Ventas\Cuotas\PorVencer::class));
|
@ -1,13 +1,3 @@
|
||||
<?php
|
||||
/*function loadCommands(&$app): void {
|
||||
$files = new FilesystemIterator($app->getContainer()->get('folders')->commands);
|
||||
foreach ($files as $file) {
|
||||
if ($file->isDir()) {
|
||||
continue;
|
||||
}
|
||||
include_once $file->getRealPath();
|
||||
}
|
||||
}
|
||||
loadCommands($app);*/
|
||||
$app->setCommandLoader($app->getContainer()->get(Symfony\Component\Console\CommandLoader\CommandLoaderInterface::class));
|
||||
$app->setDefaultCommand('run:full');
|
||||
$app->setDefaultCommand('loop');
|
||||
|
@ -1,23 +1,26 @@
|
||||
<?php
|
||||
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
return [
|
||||
'commands' => function() {
|
||||
return [
|
||||
'loop' => Incoviba\Command\BaseLoop::class,
|
||||
'comunas' => Incoviba\Command\Comunas::class,
|
||||
'contabilidad:cartolas:update' => Incoviba\Command\Contabilidad\Cartolas\Update::class,
|
||||
'money:ipc' => Incoviba\Command\Money\IPC::class,
|
||||
'money:uf' => Incoviba\Command\Money\UF::class,
|
||||
'money:uf:update' => Incoviba\Command\Money\UF\Update::class,
|
||||
'proyectos:activos' => Incoviba\Command\Proyectos\Activos::class,
|
||||
'run:full' => Incoviba\Command\Full::class,
|
||||
'ventas:cierres:vigentes' => Incoviba\Command\Ventas\Cierres\Vigentes::class,
|
||||
'ventas:cuotas:hoy' => Incoviba\Command\Ventas\Cuotas\Hoy::class,
|
||||
'ventas:cuotas:pendientes' => Incoviba\Command\Ventas\Cuotas\Pendientes::class,
|
||||
'ventas:cuotas:vencer' => Incoviba\Command\Ventas\Cuotas\PorVencer::class,
|
||||
'queue' => Incoviba\Command\Queue::class,
|
||||
'external:services' => Incoviba\Command\ExternalServices::class,
|
||||
'external:toku:reset' => Incoviba\Command\Ventas\MedioPagos\Toku\Reset::class,
|
||||
'external:toku:enqueue' => Incoviba\Command\Ventas\MedioPagos\Toku\Enqueue::class
|
||||
];
|
||||
'commands' => function(ContainerInterface $container) {
|
||||
$service = $container->get(Incoviba\Service\Commands::class);
|
||||
if ($container->has('folders')) {
|
||||
$folders = $container->get('folders');
|
||||
if (is_array($folders)) {
|
||||
if (array_key_exists('commands', $folders)) {
|
||||
$service->baseCommandsPath = $folders['commands'];
|
||||
}
|
||||
} elseif (isset($folders->commands)) {
|
||||
$service->baseCommandsPath = $folders->commands;
|
||||
}
|
||||
}
|
||||
if ($container->has('skip_commands')) {
|
||||
$service->skipCommands = $container->get('skip_commands');
|
||||
}
|
||||
if ($container->has('skipCommands')) {
|
||||
$service->skipCommands = $container->get('skipCommands');
|
||||
}
|
||||
return $service->getCommandsList();
|
||||
}
|
||||
];
|
||||
|
@ -6,10 +6,6 @@ return [
|
||||
$arr['base'],
|
||||
'resources'
|
||||
]);
|
||||
$arr['commands'] = implode(DIRECTORY_SEPARATOR, [
|
||||
$arr['resources'],
|
||||
'commands'
|
||||
]);
|
||||
$arr['cache'] = implode(DIRECTORY_SEPARATOR, [
|
||||
$arr['base'],
|
||||
'cache'
|
||||
|
@ -2,7 +2,7 @@
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
return [
|
||||
DateTimeZone::class => function (ContainerInterface $container) {
|
||||
DateTimeZone::class => function(ContainerInterface $container) {
|
||||
return new DateTimeZone($container->get('TZ') ?? 'America/Santiago');
|
||||
},
|
||||
'loopFrequency' => 60
|
||||
|
@ -3,8 +3,9 @@ use Psr\Container\ContainerInterface;
|
||||
|
||||
return [
|
||||
Incoviba\Service\Login::class => function(ContainerInterface $container) {
|
||||
$uri = $container->has('API_URL') ? $container->get('API_URL') : 'http://proxy/api';
|
||||
$client = new GuzzleHttp\Client([
|
||||
'base_uri' => $container->has('API_URL') ? $container->get('API_URL') : 'http://proxy/api',
|
||||
'base_uri' => $uri,
|
||||
'headers' => [
|
||||
'Authorization' => [
|
||||
'Bearer ' . md5($container->get('API_KEY'))
|
||||
|
@ -19,5 +19,13 @@ return [
|
||||
$container->get(DateTimeZone::class),
|
||||
$container->get('loopFrequency'),
|
||||
);
|
||||
},
|
||||
Incoviba\Command\Queue::class => function(ContainerInterface $container) {
|
||||
return new Incoviba\Command\Queue(
|
||||
$container->get(Psr\Http\Client\ClientInterface::class),
|
||||
$container->get('QueueLogger'),
|
||||
$container->get(Incoviba\Service\Job::class),
|
||||
$container->get(DateTimeZone::class)
|
||||
);
|
||||
}
|
||||
];
|
||||
|
@ -64,27 +64,63 @@ return [
|
||||
], $container->get(DateTimeZone::class));
|
||||
},
|
||||
'LoopLogger' => function(ContainerInterface $container) {
|
||||
return new Monolog\Logger('loop', [
|
||||
new Monolog\Handler\FilterHandler(
|
||||
($container->has('ENVIRONMENT') and $container->get('ENVIRONMENT') === 'development')
|
||||
? new Monolog\Handler\StreamHandler('/logs/loop-error.log')
|
||||
: new Monolog\Handler\RotatingFileHandler('/logs/loop-error.log', 10),
|
||||
$handlers = [
|
||||
'warning' => new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\RotatingFileHandler('/logs/loop-error.log', 10),
|
||||
Monolog\Level::Warning
|
||||
),
|
||||
new Monolog\Handler\FilterHandler(
|
||||
($container->has('ENVIRONMENT') and $container->get('ENVIRONMENT') === 'development')
|
||||
? new Monolog\Handler\StreamHandler('/logs/loop.log')
|
||||
: new Monolog\Handler\RotatingFileHandler('/logs/loop.log', 10),
|
||||
'notice' => new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\RotatingFileHandler('/logs/loop.log', 10),
|
||||
Monolog\Level::Notice,
|
||||
Monolog\Level::Notice
|
||||
),
|
||||
new Monolog\Handler\FilterHandler(
|
||||
($container->has('ENVIRONMENT') and $container->get('ENVIRONMENT') === 'development')
|
||||
? new Monolog\Handler\StreamHandler('/logs/loop-debug.log')
|
||||
: new Monolog\Handler\RotatingFileHandler('/logs/loop-debug.log', 10),
|
||||
'debug' => new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\RotatingFileHandler('/logs/loop-debug.log', 10),
|
||||
Monolog\Level::Debug,
|
||||
Monolog\Level::Debug
|
||||
)
|
||||
], [], $container->get(DateTimeZone::class));
|
||||
];
|
||||
if ($container->has('ENVIRONMENT') and $container->get('ENVIRONMENT') === 'development') {
|
||||
$handlers['warning'] = new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\StreamHandler('/logs/loop-error.log'),
|
||||
Monolog\Level::Warning);
|
||||
$handlers['notice'] = new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\StreamHandler('/logs/loop.log'),
|
||||
Monolog\Level::Notice, Monolog\Level::Notice);
|
||||
$handlers['debug'] = new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\StreamHandler('/logs/loop-debug.log'),
|
||||
Monolog\Level::Debug, Monolog\Level::Debug);
|
||||
}
|
||||
return new Monolog\Logger('loop', $handlers, [], $container->get(DateTimeZone::class));
|
||||
},
|
||||
'QueueLogger' => function(ContainerInterface $container) {
|
||||
$handlers = [
|
||||
'warning' => new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\RotatingFileHandler('/logs/queue-error.log', 10),
|
||||
Monolog\Level::Warning
|
||||
),
|
||||
'notice' => new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\RotatingFileHandler('/logs/queue.log', 10),
|
||||
Monolog\Level::Notice,
|
||||
Monolog\Level::Notice
|
||||
),
|
||||
'debug' => new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\RotatingFileHandler('/logs/queue-debug.log', 10),
|
||||
Monolog\Level::Debug,
|
||||
Monolog\Level::Debug
|
||||
)
|
||||
];
|
||||
if ($container->has('ENVIRONMENT') and $container->get('ENVIRONMENT') === 'development') {
|
||||
$handlers['warning'] = new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\StreamHandler('/logs/queue-error.log'),
|
||||
Monolog\Level::Warning);
|
||||
$handlers['notice'] = new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\StreamHandler('/logs/queue.log'),
|
||||
Monolog\Level::Notice, Monolog\Level::Notice);
|
||||
$handlers['debug'] = new Monolog\Handler\FilterHandler(
|
||||
new Monolog\Handler\StreamHandler('/logs/queue-debug.log'),
|
||||
Monolog\Level::Debug, Monolog\Level::Debug);
|
||||
}
|
||||
return new Monolog\Logger('queue', $handlers, [], $container->get(DateTimeZone::class));
|
||||
}
|
||||
];
|
||||
|
@ -10,7 +10,7 @@ use Incoviba\Common\Alias;
|
||||
)]
|
||||
class ExternalServices extends Alias\Command
|
||||
{
|
||||
protected function configure()
|
||||
protected function configure(): void
|
||||
{
|
||||
$this->addOption('update', 'u', Console\Input\InputOption::VALUE_NONE, 'Update');
|
||||
}
|
||||
|
48
cli/src/Command/Job/Pending.php
Normal file
48
cli/src/Command/Job/Pending.php
Normal file
@ -0,0 +1,48 @@
|
||||
<?php
|
||||
namespace Incoviba\Command\Job;
|
||||
|
||||
use Symfony\Component\Console;
|
||||
use Incoviba\Service;
|
||||
|
||||
#[Console\Attribute\AsCommand(name: 'jobs:pending', description: 'List pending jobs')]
|
||||
class Pending extends Console\Command\Command
|
||||
{
|
||||
public function __construct(protected Service\Job $jobService, ?string $name = null)
|
||||
{
|
||||
parent::__construct($name);
|
||||
}
|
||||
|
||||
protected function configure(): void
|
||||
{
|
||||
$this->addOption('full', 'f', Console\Input\InputOption::VALUE_NONE, 'Full output');
|
||||
}
|
||||
|
||||
protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
|
||||
{
|
||||
$jobs = $this->jobService->getPending();
|
||||
$jobCount = count($jobs);
|
||||
$output->writeln("Found {$jobCount} pending jobs");
|
||||
|
||||
if ($input->getOption('full') and $jobCount > 0) {
|
||||
$io = new Console\Style\SymfonyStyle($input, $output);
|
||||
|
||||
$rows = [];
|
||||
foreach ($jobs as $job) {
|
||||
$retries = $job['retries'] ?? 0;
|
||||
$updated = $job['updated_at'] ?? '';
|
||||
|
||||
$rows[] = [
|
||||
$job['id'],
|
||||
$job['created_at'],
|
||||
$job['configuration']['type'],
|
||||
$retries,
|
||||
$updated
|
||||
];
|
||||
}
|
||||
|
||||
$io->table(['ID', 'Created', 'Type', 'Retries', 'Updated'], $rows);
|
||||
}
|
||||
|
||||
return self::SUCCESS;
|
||||
}
|
||||
}
|
176
cli/src/Command/Job/Run.php
Normal file
176
cli/src/Command/Job/Run.php
Normal file
@ -0,0 +1,176 @@
|
||||
<?php
|
||||
namespace Incoviba\Command\Job;
|
||||
|
||||
use DateMalformedStringException;
|
||||
use DateTimeImmutable;
|
||||
use DateTimeZone;
|
||||
use Incoviba\Exception\Client\FastCGI as FastCGIException;
|
||||
use Incoviba\Service;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console;
|
||||
|
||||
#[Console\Attribute\AsCommand(name: 'jobs:run', description: 'Run jobs')]
|
||||
class Run extends Console\Command\Command
|
||||
{
|
||||
public function __construct(protected Service\FastCGI $fastcgi, protected LoggerInterface $logger,
|
||||
protected DateTimeZone $timeZone, ?string $name = null)
|
||||
{
|
||||
parent::__construct($name);
|
||||
}
|
||||
|
||||
protected function configure(): void
|
||||
{
|
||||
$this->addArgument('job_ids',
|
||||
Console\Input\InputArgument::IS_ARRAY | Console\Input\InputArgument::REQUIRED, 'Job IDs');
|
||||
}
|
||||
|
||||
protected array $output = [];
|
||||
public function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
|
||||
{
|
||||
try {
|
||||
$now = new DateTimeImmutable('now', $this->timeZone);
|
||||
} catch (DateMalformedStringException) {
|
||||
$now = new DateTimeImmutable();
|
||||
}
|
||||
|
||||
$jobIds = $input->getArgument('job_ids');
|
||||
$jobCount = count($jobIds);
|
||||
|
||||
$this->pushOutput('top', ['message' => "[{$now->format('Y-m-d H:i:s e')}] Running {$jobCount} jobs..."]);
|
||||
$this->pushOutput('bottom', ['table' => [
|
||||
['Job IDs'],
|
||||
array_map(function($row) {return [$row];},$jobIds)
|
||||
]]);
|
||||
$this->pushOutput('top', ['progress' => $jobCount]);
|
||||
foreach ($jobIds as $jobId) {
|
||||
$this->runJob($jobId);
|
||||
}
|
||||
$result = $this->getResponses();
|
||||
$this->pushOutput('top', ['progress' => 'finish']);
|
||||
|
||||
$this->writeOutput($input, $output);
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
protected function runJob(int $jobId): void
|
||||
{
|
||||
$uri = "/api/queue/run/{$jobId}";
|
||||
$this->pushOutput('bottom', ['message' => "GET {$uri}"]);
|
||||
|
||||
try {
|
||||
$this->fastcgi->get($uri);
|
||||
} catch (FastCGIException $exception) {
|
||||
$this->logger->error($exception->getMessage(), ['uri' => $uri, 'exception' =>$exception]);
|
||||
}
|
||||
}
|
||||
protected function getResponses(): int
|
||||
{
|
||||
$result = self::SUCCESS;
|
||||
$responses = $this->fastcgi->awaitResponses();
|
||||
foreach ($responses as $response) {
|
||||
$this->pushOutput('top', ['progress' => 'advance']);
|
||||
if ($response->getError() !== '') {
|
||||
$this->logger->error("Error running job", [
|
||||
'error' => $response->getError(),
|
||||
'body' => $response->getBody(),
|
||||
'headers' => $response->getHeaders(),
|
||||
]);
|
||||
$result = self::FAILURE;
|
||||
continue;
|
||||
}
|
||||
$this->pushOutput('bottom', ['message' => $response->getBody()]);
|
||||
}
|
||||
return $result;
|
||||
}
|
||||
|
||||
protected function pushOutput(string $section, array $configuration): void
|
||||
{
|
||||
if (!isset($this->output[$section])) {
|
||||
$this->output[$section] = [];
|
||||
}
|
||||
foreach ($configuration as $key => $value) {
|
||||
if (!isset($this->output[$section][$key])) {
|
||||
$this->output[$section][$key] = [];
|
||||
}
|
||||
$this->output[$section][$key] []= $value;
|
||||
}
|
||||
if (isset($this->output[$section]['progress'])) {
|
||||
usort($this->output[$section]['progress'], function($a, $b) {
|
||||
if ($a === $b) {
|
||||
return 0;
|
||||
}
|
||||
if (is_int($a)) {
|
||||
return -1;
|
||||
}
|
||||
if (is_int($b)) {
|
||||
return 1;
|
||||
}
|
||||
if ($a === 'finish') {
|
||||
return 1;
|
||||
}
|
||||
if ($b === 'finish') {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
}
|
||||
}
|
||||
protected function writeOutput(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): void
|
||||
{
|
||||
$sectionNames = array_keys($this->output);
|
||||
$ios = [];
|
||||
foreach ($sectionNames as $sectionName) {
|
||||
$section = $output->section();
|
||||
$ios[$sectionName] = new Console\Style\SymfonyStyle($input, $section);
|
||||
}
|
||||
|
||||
foreach ($this->output as $sectionName => $configurations) {
|
||||
$io = $ios[$sectionName];
|
||||
$this->writeSection($io, $configurations);
|
||||
}
|
||||
}
|
||||
protected function writeSection(Console\Style\SymfonyStyle $io, array $configurations): void
|
||||
{
|
||||
if (array_key_exists('table', $configurations)) {
|
||||
$this->writeTables($io, $configurations['table']);
|
||||
}
|
||||
if (array_key_exists('progress', $configurations)) {
|
||||
$this->writeProgress($io, $configurations['progress']);
|
||||
}
|
||||
if (array_key_exists('message', $configurations)) {
|
||||
$this->writeMessages($io, $configurations['message']);
|
||||
}
|
||||
}
|
||||
protected function writeTables(Console\Style\SymfonyStyle $io, array $tableConfigurations): void
|
||||
{
|
||||
foreach ($tableConfigurations as $tableData) {
|
||||
$io->table(...$tableData);
|
||||
}
|
||||
}
|
||||
protected function writeMessages(Console\Style\SymfonyStyle $io, array $messages): void
|
||||
{
|
||||
foreach ($messages as $message) {
|
||||
$io->writeln($message);
|
||||
}
|
||||
}
|
||||
protected function writeProgress(Console\Style\SymfonyStyle $io, array $progresses): void
|
||||
{
|
||||
$progressBar = null;
|
||||
foreach ($progresses as $progress) {
|
||||
if ($progress === 'advance' and $progressBar !== null) {
|
||||
$progressBar->advance();
|
||||
continue;
|
||||
}
|
||||
if ($progress === 'finish' and $progressBar !== null) {
|
||||
$progressBar->finish();
|
||||
continue;
|
||||
}
|
||||
if (in_array($progress, ['finish', 'advance'])) {
|
||||
continue;
|
||||
}
|
||||
$progressBar = $io->createProgressBar($progress);
|
||||
}
|
||||
$io->newLine();
|
||||
}
|
||||
}
|
@ -3,14 +3,12 @@ namespace Incoviba\Command;
|
||||
|
||||
use DateTimeImmutable;
|
||||
use DateTimeZone;
|
||||
use Psr\Http\Client\ClientExceptionInterface;
|
||||
use Psr\Http\Client\ClientInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console;
|
||||
use Incoviba\Service\FastCGI;
|
||||
use Incoviba\Service\Job;
|
||||
use Incoviba\Common\Alias\Command;
|
||||
use Incoviba\Exception\Client\FastCGI as FastCGIException;
|
||||
use Throwable;
|
||||
|
||||
#[Console\Attribute\AsCommand(
|
||||
name: 'queue',
|
||||
@ -21,7 +19,8 @@ class Queue extends Command
|
||||
public function __construct(ClientInterface $client, LoggerInterface $logger,
|
||||
protected Job $jobService,
|
||||
protected DateTimeZone $timezone,
|
||||
protected FastCGI $fastcgi,
|
||||
protected string $baseCommand = '/code/bin/incoviba',
|
||||
protected int $batchSize = 10,
|
||||
?string $name = null)
|
||||
{
|
||||
parent::__construct($client, $logger, $name);
|
||||
@ -30,59 +29,84 @@ class Queue extends Command
|
||||
public function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
|
||||
{
|
||||
$this->logger->debug("Running {$this->getName()}");
|
||||
$io = new Console\Style\SymfonyStyle($input, $output);
|
||||
$this->sections = [
|
||||
'top' => $output->section(),
|
||||
'bottom' => $output->section(),
|
||||
];
|
||||
$io = new Console\Style\SymfonyStyle($input, $this->sections['top']);
|
||||
$now = new DateTimeImmutable('now', $this->timezone);
|
||||
$io->title("[{$now->format('Y-m-d H:i:s e')}] Running Queue...");
|
||||
|
||||
$jobs = $this->getJobs($output);
|
||||
if (count($jobs) === 0) {
|
||||
$this->logger->debug("No jobs to run");
|
||||
$jobs = $this->getJobs();
|
||||
$jobCount = count($jobs);
|
||||
if ($jobCount === 0) {
|
||||
return Console\Command\Command::SUCCESS;
|
||||
}
|
||||
|
||||
return $this->runJobs($output, $jobs);
|
||||
$io->writeln("Found {$jobCount} jobs to run");
|
||||
$result = $this->runJobs($io, $jobs);
|
||||
foreach ($this->outputs as $output) {
|
||||
$this->sections['bottom']->writeln($output);
|
||||
}
|
||||
return $result;
|
||||
}
|
||||
|
||||
protected function getJobs(Console\Output\OutputInterface $output): array
|
||||
protected array $sections;
|
||||
|
||||
protected function getJobs(): array
|
||||
{
|
||||
$this->logger->debug("Getting jobs");
|
||||
$jobs = $this->jobService->getPending();
|
||||
$this->logger->debug("Found " . count($jobs) . " jobs");
|
||||
$jobCount = count($jobs);
|
||||
if ($jobCount === 0) {
|
||||
$this->logger->debug("No jobs to run");
|
||||
return [];
|
||||
}
|
||||
$this->logger->debug("Found {$jobCount} jobs");
|
||||
return array_column($jobs, 'id');
|
||||
}
|
||||
protected function runJobs(Console\Output\OutputInterface $output, array $jobs): int
|
||||
protected function runJobs(Console\Style\SymfonyStyle $io, array $jobs): int
|
||||
{
|
||||
$errors = 0;
|
||||
foreach ($jobs as $job) {
|
||||
if ($this->runJob($output, $job) === Console\Command\Command::FAILURE) {
|
||||
$this->logger->error("Error running job: {$job}");
|
||||
$errors ++;
|
||||
$chunks = array_chunk($jobs, $this->batchSize);
|
||||
$chunkCount = count($chunks);
|
||||
$result = self::SUCCESS;
|
||||
$progress1 = $io->createProgressBar($chunkCount);
|
||||
$progress1->start();
|
||||
foreach ($chunks as $chunk) {
|
||||
if ($this->runJobBatch($chunk) === self::FAILURE) {
|
||||
$result = self::FAILURE;
|
||||
}
|
||||
$progress1->advance();
|
||||
}
|
||||
$responses = $this->fastcgi->awaitResponses();
|
||||
foreach ($responses as $response) {
|
||||
if ($response->getError() !== '') {
|
||||
$this->logger->error("Error running job", [
|
||||
'error' => $response->getError(),
|
||||
'body' => $response->getBody(),
|
||||
'headers' => $response->getHeaders(),
|
||||
]);
|
||||
$errors ++;
|
||||
}
|
||||
}
|
||||
return $errors === 0 ? Console\Command\Command::SUCCESS : Console\Command\Command::FAILURE;
|
||||
$progress1->finish();
|
||||
return $result;
|
||||
}
|
||||
protected function runJob(Console\Output\OutputInterface $output, int $job_id): int
|
||||
|
||||
protected array $outputs = [];
|
||||
protected function runJobBatch(array $jobIds): int
|
||||
{
|
||||
$uri = "/api/queue/run/{$job_id}";
|
||||
$output->writeln("GET {$uri}");
|
||||
$baseCommand = "{$this->baseCommand} jobs:run";
|
||||
|
||||
$jobsLine = implode(' ', $jobIds);
|
||||
$command = "{$baseCommand} {$jobsLine}";
|
||||
|
||||
try {
|
||||
$this->fastcgi->get($uri);
|
||||
return self::SUCCESS;
|
||||
} catch (FastCGIException $exception) {
|
||||
$this->logger->error($exception->getMessage(), ['uri' => $uri, 'exception' =>$exception]);
|
||||
exec($command, $output, $resultCode);
|
||||
$this->outputs []= $output;
|
||||
} catch (Throwable $exception) {
|
||||
$this->logger->error("Failed to run command", [
|
||||
'command' => $command,
|
||||
'exception' => $exception
|
||||
]);
|
||||
return self::FAILURE;
|
||||
}
|
||||
if ($resultCode !== 0) {
|
||||
$this->logger->error("Failed to run command", [
|
||||
'command' => $command,
|
||||
'result_code' => $resultCode
|
||||
]);
|
||||
return self::FAILURE;
|
||||
}
|
||||
return self::SUCCESS;
|
||||
}
|
||||
}
|
||||
|
49
cli/src/Command/Queue/Push.php
Normal file
49
cli/src/Command/Queue/Push.php
Normal file
@ -0,0 +1,49 @@
|
||||
<?php
|
||||
namespace Incoviba\Command\Queue;
|
||||
|
||||
use Throwable;
|
||||
use Symfony\Component\Console;
|
||||
use Incoviba\Service;
|
||||
|
||||
#[Console\Attribute\AsCommand(name: 'queue:push', description: 'Push a job to the queue')]
|
||||
class Push extends Console\Command\Command
|
||||
{
|
||||
public function __construct(protected Service\Job $jobService, ?string $name = null)
|
||||
{
|
||||
parent::__construct($name);
|
||||
}
|
||||
|
||||
protected function configure(): void
|
||||
{
|
||||
$this->addOption('configurations', 'c', Console\Input\InputOption::VALUE_REQUIRED | Console\Input\InputOption::VALUE_IS_ARRAY, 'Job configuration, must be in valid JSON format');
|
||||
}
|
||||
|
||||
protected function execute(Console\Input\InputInterface $input, Console\Output\OutputInterface $output): int
|
||||
{
|
||||
$io = new Console\Style\SymfonyStyle($input, $output);
|
||||
$io->title("Pushing job");
|
||||
|
||||
$configurations = $input->getOption('configurations');
|
||||
if ($configurations === null) {
|
||||
$io->error('Missing configurations');
|
||||
return self::FAILURE;
|
||||
}
|
||||
$result = self::SUCCESS;
|
||||
foreach ($configurations as $configuration) {
|
||||
if (!json_validate($configuration)) {
|
||||
$io->error("Invalid JSON: {$configuration}");
|
||||
continue;
|
||||
}
|
||||
$configuration = json_decode($configuration, true);
|
||||
|
||||
try {
|
||||
$job = $this->jobService->push($configuration);
|
||||
$io->success("Job pushed with ID {$job['id']}");
|
||||
} catch (Throwable $exception) {
|
||||
$io->error($exception->getMessage());
|
||||
$result = self::FAILURE;
|
||||
}
|
||||
}
|
||||
return $result;
|
||||
}
|
||||
}
|
51
cli/src/Service/Commands.php
Normal file
51
cli/src/Service/Commands.php
Normal file
@ -0,0 +1,51 @@
|
||||
<?php
|
||||
namespace Incoviba\Service;
|
||||
|
||||
use RecursiveDirectoryIterator;
|
||||
use RecursiveIteratorIterator;
|
||||
use ReflectionClass;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console\Attribute\AsCommand;
|
||||
|
||||
class Commands
|
||||
{
|
||||
public function __construct(protected LoggerInterface $logger, public ?string $baseCommandsPath = null, public array $skipCommands = [])
|
||||
{
|
||||
if ($this->baseCommandsPath === null) {
|
||||
$this->baseCommandsPath = implode(DIRECTORY_SEPARATOR, [
|
||||
dirname(__DIR__, 1),
|
||||
'Command'
|
||||
]);
|
||||
}
|
||||
$this->baseCommandsPath = realpath($this->baseCommandsPath);
|
||||
}
|
||||
|
||||
public function getCommandsList(): array
|
||||
{
|
||||
$commands = [];
|
||||
$files = new RecursiveIteratorIterator((new RecursiveDirectoryIterator($this->baseCommandsPath)));
|
||||
foreach ($files as $file) {
|
||||
if ($file->isDir()) {
|
||||
continue;
|
||||
}
|
||||
$basename = ltrim(str_replace(DIRECTORY_SEPARATOR, "\\",
|
||||
str_replace([$this->baseCommandsPath, '.php'], '', $file->getRealPath())), "\\");
|
||||
$namespace = "Incoviba\\Command";
|
||||
$class = "{$namespace}\\{$basename}";
|
||||
if (!class_exists($class)) {
|
||||
$this->logger->error("Class {$class} not found");
|
||||
continue;
|
||||
}
|
||||
$ref = new ReflectionClass($class);
|
||||
$commandData = $ref->getAttributes(AsCommand::class)[0];
|
||||
$commandName = $commandData->getArguments()['name'];
|
||||
|
||||
if (in_array($commandName, $this->skipCommands) or in_array($class, $this->skipCommands)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$commands[$commandName] = $class;
|
||||
}
|
||||
return $commands;
|
||||
}
|
||||
}
|
@ -59,17 +59,29 @@ class FastCGI implements LoggerAwareInterface
|
||||
public function awaitResponses(): array
|
||||
{
|
||||
$responses = [];
|
||||
$repeats = 0;
|
||||
$maxRepeats = count($this->socketIds);
|
||||
|
||||
while ($this->client->hasUnhandledResponses()) {
|
||||
if ($repeats >= $maxRepeats) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
$readyResponses = $this->client->readReadyResponses(3000);
|
||||
} catch (FCGI\Exceptions\FastCGIClientException $exception) {
|
||||
$this->logger->error($exception->getMessage());
|
||||
$repeats ++;
|
||||
continue;
|
||||
}
|
||||
foreach ($readyResponses as $response) {
|
||||
$responses []= $response;
|
||||
$repeats ++;
|
||||
}
|
||||
}
|
||||
if ($this->client->hasUnhandledResponses()) {
|
||||
$this->logger->error("Unhandled responses");
|
||||
return array_merge($responses, $this->awaitResponses());
|
||||
}
|
||||
return $responses;
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,10 @@
|
||||
<?php
|
||||
namespace Incoviba\Service;
|
||||
|
||||
use DateInvalidTimeZoneException;
|
||||
use DateMalformedStringException;
|
||||
use DateTimeImmutable;
|
||||
use DateTimeZone;
|
||||
use Exception;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Predis\Connection\ConnectionException;
|
||||
@ -19,8 +23,30 @@ class Job
|
||||
$jobs = $this->redisService->get($this->redisKey);
|
||||
return json_decode($jobs, true);
|
||||
} catch (ConnectionException|Exception $exception) {
|
||||
$this->logger->warning($exception);
|
||||
$exception = new Exception("Could not read {$this->redisKey} from Redis", $exception->getCode(), $exception);
|
||||
$this->logger->warning($exception->getMessage(), ['exception' => $exception]);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
public function push(array $jobConfiguration): array
|
||||
{
|
||||
try {
|
||||
$now = (new DateTimeImmutable('now', new DateTimeZone($_ENV['TZ'] ?? 'America/Santiago')));
|
||||
} catch (DateMalformedStringException | DateInvalidTimeZoneException) {
|
||||
$now = new DateTimeImmutable();
|
||||
}
|
||||
$data = [
|
||||
'id' => $now->format('Uu'),
|
||||
'configuration' => $jobConfiguration,
|
||||
'executed' => false,
|
||||
'created_at' => $now->format('Y-m-d H:i:s'),
|
||||
'updated_at' => null,
|
||||
'retries' => 0
|
||||
];
|
||||
$jobs = $this->getPending();
|
||||
$jobs []= $data;
|
||||
$this->redisService->set($this->redisKey, json_encode($jobs), -1);
|
||||
return $data;
|
||||
}
|
||||
}
|
||||
|
@ -69,6 +69,7 @@ services:
|
||||
- ./logs:/logs
|
||||
ports:
|
||||
- "8084:80"
|
||||
|
||||
cli:
|
||||
profiles:
|
||||
- cli
|
||||
@ -78,6 +79,7 @@ services:
|
||||
image: php:incoviba-cli
|
||||
container_name: incoviba_cli
|
||||
restart: unless-stopped
|
||||
entrypoint: [ "/code/entrypoint" ]
|
||||
env_file:
|
||||
- ${CLI_PATH:-.}/.env
|
||||
- ./.key.env
|
||||
|
Reference in New Issue
Block a user