GenericTask   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 51
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 8
eloc 25
c 2
b 0
f 0
dl 0
loc 51
ccs 0
cts 35
cp 0
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A getExecutor() 0 10 2
A __construct() 0 5 1
A run() 0 6 1
A createExecutor() 0 16 4
1
<?php
2
3
namespace Tarantool\JobQueue\Runner\Amp;
4
5
use Amp\Parallel\Worker\Environment;
6
use Amp\Parallel\Worker\Task;
7
use Tarantool\JobQueue\Executor\Executor;
8
use Tarantool\JobQueue\Executor\ExecutorChain;
9
use Tarantool\JobQueue\Executor\ProcessExecutor;
10
use Tarantool\JobQueue\JobBuilder\JobOptions;
11
use Tarantool\Queue\Queue;
12
use Tarantool\Queue\Task as QueueTask;
13
14
class GenericTask implements Task
15
{
16
    private const EXECUTOR_KEY = self::class.'-executor';
17
18
    private $task;
19
    private $queue;
20
    private $executorsConfigFile;
21
22
    public function __construct(QueueTask $task, Queue $queue, string $executorsConfigFile = null)
23
    {
24
        $this->task = $task;
25
        $this->queue = $queue;
26
        $this->executorsConfigFile = $executorsConfigFile;
27
    }
28
29
    public function run(Environment $environment): void
30
    {
31
        $data = $this->task->getData();
32
        $payload = $data[JobOptions::PAYLOAD] ?? null;
33
        $executor = $this->getExecutor($environment);
34
        $executor->execute($payload, $this->queue);
35
    }
36
37
    private function getExecutor(Environment $environment): Executor
38
    {
39
        $executor = $environment->get(self::EXECUTOR_KEY);
40
41
        if (!$executor) {
42
            $executor = $this->createExecutor();
43
            $environment->set(self::EXECUTOR_KEY, $executor);
44
        }
45
46
        return $executor;
47
    }
48
49
    private function createExecutor(): Executor
50
    {
51
        if (!$this->executorsConfigFile) {
52
            return new ProcessExecutor();
53
        }
54
55
        $executors = require $this->executorsConfigFile;
56
        if ($executors instanceof Executor) {
57
            return $executors;
58
        }
59
60
        if (!is_array($executors)) {
61
            throw new \RuntimeException(sprintf('%s must return an Executor object or an array of Executor objects.', $this->executorsConfigFile));
62
        }
63
64
        return new ExecutorChain($executors);
65
    }
66
}
67