ParallelRunner::run()   A
last analyzed

Complexity

Conditions 3
Paths 1

Size

Total Lines 26
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 3
Bugs 0 Features 0
Metric Value
eloc 17
c 3
b 0
f 0
dl 0
loc 26
ccs 0
cts 21
cp 0
rs 9.7
cc 3
nc 1
nop 1
crap 12
1
<?php
2
3
namespace Tarantool\JobQueue\Runner\Amp;
4
5
use Amp\Loop;
6
use Amp\Parallel\Worker\DefaultPool;
7
use Psr\Log\LoggerInterface as Logger;
8
use Tarantool\JobQueue\Handler\Handler;
9
use Tarantool\JobQueue\Runner\Runner;
10
use Tarantool\Queue\Queue;
11
12
class ParallelRunner implements Runner
13
{
14
    private $queue;
15
    private $successHandler;
16
    private $failureHandler;
17
    private $logger;
18
    private $executorsConfigFile;
19
20
    public function __construct(Queue $queue, Handler $successHandler, Handler $failureHandler, Logger $logger, string $executorsConfigFile = null)
21
    {
22
        $this->queue = $queue;
23
        $this->successHandler = $successHandler;
24
        $this->failureHandler = $failureHandler;
25
        $this->logger = $logger;
26
        $this->executorsConfigFile = $executorsConfigFile;
27
    }
28
29
    public function run(int $idleTimeout = 1): void
30
    {
31
        Loop::setErrorHandler(function (\Throwable $e) {
32
            $this->logger->critical($e->getMessage());
33
            throw $e;
34
        });
35
36
        Loop::run(function() use ($idleTimeout) {
37
            $pool = new DefaultPool();
38
39
            Loop::repeat(100, function () use ($pool, $idleTimeout) {
40
                if (!$queueTask = $this->queue->take($idleTimeout)) {
41
                    $this->logger->debug('Idling...');
42
43
                    return;
44
                }
45
46
                try {
47
                    $workerTask = new GenericTask($queueTask, $this->queue, $this->executorsConfigFile);
48
                    yield $pool->enqueue($workerTask);
49
50
                    $this->successHandler->handle($queueTask, $this->queue);
51
                    $this->logger->info(sprintf('Task #%d was successfully executed.', $queueTask->getId()), $queueTask->getData());
52
                } catch (\Throwable $e) {
53
                    $this->failureHandler->handle($queueTask, $this->queue);
54
                    $this->logger->error(sprintf('Failed to execute task #%d: %s', $queueTask->getId(), $e->getMessage()), $queueTask->getData());
55
                }
56
            });
57
        });
58
    }
59
}
60