DaemonAction::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
nc 1
nop 7
dl 0
loc 10
ccs 0
cts 2
cp 0
crap 2
rs 10
c 2
b 0
f 0
1
<?php
2
declare(strict_types=1);
3
4
namespace JCIT\jobqueue\actions;
5
6
use JCIT\jobqueue\events\JobQueueEvent;
7
use JCIT\jobqueue\exceptions\PermanentException;
8
use JCIT\jobqueue\interfaces\JobFactoryInterface;
9
use League\Tactician\CommandBus;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
use yii\base\Action;
12
use yii\base\InvalidConfigException;
13
use yii\console\Application;
14
use yii\db\Connection;
15
use yii\helpers\Console;
16
17
class DaemonAction extends Action
18
{
19
    public int $reserveWithTimeout = 120;
20
21
    public function __construct(
22
        $id,
23
        $controller,
24
        private PheanstalkInterface $beanstalk,
25
        private CommandBus $commandBus,
26
        private Connection $db,
27
        private JobFactoryInterface $jobFactory,
28
        $config = []
29
    ) {
30
        parent::__construct($id, $controller, $config);
31
    }
32
33
    public function init(): void
34
    {
35
        if (!$this->controller->module instanceof Application) {
36
            throw new InvalidConfigException('This action can only be used in a console application.');
37
        }
38
39
        parent::init();
40
    }
41
42
    public function run(
43
        $reserveWithTimeout = null
44
    ): void {
45
        $reserveWithTimeout = $reserveWithTimeout ?? $this->reserveWithTimeout;
46
47
        $this->controller->stdout("Waiting for jobs" . PHP_EOL, Console::FG_CYAN);
48
49
        while (true) {
50
            $this->controller->stdout('.', Console::FG_CYAN);
51
            $job = $this->beanstalk->reserveWithTimeout($reserveWithTimeout);
52
            if (isset($job)) {
53
                try {
54
                    $jobCommand = $this->jobFactory->createFromJson($job->getData());
55
                    $event = new JobQueueEvent($jobCommand);
56
                    \Yii::$app->trigger($event::EVENT_JOB_QUEUE_HANDLE, $event);
57
58
                    $jobClass = get_class($jobCommand);
59
                    $this->controller->stdout(PHP_EOL . "Starting job: {$jobClass}({$job->getId()})" . PHP_EOL, Console::FG_CYAN);
60
                    $this->commandBus->handle($jobCommand);
61
                    $this->controller->stdout("Deleting job: {$job->getId()}" . PHP_EOL, Console::FG_GREEN);
62
                    $this->beanstalk->delete($job);
63
                } catch (PermanentException $e) {
64
                    \Yii::error($e, self::class);
65
                    $this->controller->stdout(PHP_EOL . "Deleting job({$job->getId()}) with permanent exception: {$e->getMessage()}" . PHP_EOL, Console::FG_RED);
66
                    $this->beanstalk->delete($job);
67
                } catch (\Throwable $t) {
68
                    \Yii::error($t, self::class);
69
                    $this->controller->stdout(PHP_EOL . "Burying job({$job->getId()}) with message: {$t->getMessage()}" . PHP_EOL, Console::FG_YELLOW);
70
                    $this->beanstalk->bury($job);
71
                }
72
            }
73
            
74
            $this->db->close();
75
76
            \Yii::getLogger()->flush();
77
            foreach (\Yii::getLogger()->dispatcher->targets as $target) {
78
                $target->export();
79
            }
80
        }
81
    }
82
}
83