Worker::shouldContinue()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 0
1
<?php
2
namespace Workana\AsyncJobs;
3
4
use Throwable;
5
use Exception;
6
use Bernard\Envelope;
7
use Bernard\Queue;
8
use Bernard\Router;
9
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
10
use Workana\AsyncJobs\Event\SuccessfulExecutionEvent;
11
use Workana\AsyncJobs\Process\ProcessManager;
12
use Workana\AsyncJobs\Event\AfterExecutionEvent;
13
use Workana\AsyncJobs\Event\BeforeExecutionEvent;
14
use Workana\AsyncJobs\Event\RejectedExecutionEvent;
15
use Workana\AsyncJobs\Event\WorkerShutdownEvent;
16
use Workana\AsyncJobs\Util\Sleeper;
17
use Workana\AsyncJobs\Retry\RetryStrategy;
18
19
/**
20
 * Worker class
21
 *
22
 * @author Carlos Frutos <[email protected]>
23
 */
24
class Worker
25
{
26
    /**
27
     * @var ProcessManager
28
     */
29
    protected $pm;
30
31
    /**
32
     * @var Stopwatch
33
     */
34
    protected $stopwatch;
35
36
    /**
37
     * @var Sleeper
38
     */
39
    protected $sleeper;
40
41
    /**
42
     * @var Router
43
     */
44
    protected $router;
45
46
    /**
47
     * @var Queue
48
     */
49
    protected $queue;
50
51
    /**
52
     * @var RetryStrategy
53
     */
54
    protected $retryStrategy;
55
56
    /**
57
     * @var int
58
     */
59
    protected $shutdownSignal = null;
60
61
    /**
62
     * Create a new Worker
63
     *
64
     * @param Queue $queue
65
     * @param Router $router
66
     * @param EventDispatcherInterface $eventDispatcher
67
     * @param ProcessManager $processManager
68
     * @param RetryStrategy $retryStrategy
69
     */
70
    public function __construct(
71
        Queue $queue,
72
        Router $router,
73
        EventDispatcherInterface $eventDispatcher,
74
        ProcessManager $processManager,
75
        RetryStrategy $retryStrategy
76
    ) {
77
        $this->queue = $queue;
78
        $this->router = $router;
79
        $this->eventDispatcher = $eventDispatcher;
0 ignored issues
show
Bug introduced by
The property eventDispatcher does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
80
        $this->pm = $processManager;
81
82
        $this->stopwatch = new Stopwatch();
83
        $this->sleeper = new Sleeper();
84
85
        $this->retryStrategy = $retryStrategy;
86
87
        $this->bindSignals();
88
    }
89
90
    /**
91
     * Bind signal handlers
92
     *
93
     * @return void
94
     */
95
    protected function bindSignals()
96
    {
97
        $this->pm->signals()->registerHandler(SIGTERM, [$this, 'shutdown']);
98
        $this->pm->signals()->registerHandler(SIGINT, [$this, 'shutdown']);
99
        $this->pm->signals()->registerHandler(SIGQUIT, [$this, 'shutdown']);
100
    }
101
102
    /**
103
     * Shutdown worker
104
     *
105
     * @return void
106
     */
107
    public function shutdown($signal = SIGTERM)
108
    {
109
        $this->shutdownSignal = $signal;
110
111
        $this->eventDispatcher->dispatch(AsyncJobsEvents::WORKER_SHUTDOWN, new WorkerShutdownEvent());
112
    }
113
114
    /**
115
     * Run worker
116
     *
117
     * @return void
118
     */
119
    public function run()
120
    {
121
        while ($this->shouldContinue()) {
122
            $envelope = $this->queue->dequeue();
123
124
            if ($envelope) {
125
                $this->invoke($envelope);
126
            } else {
127
                $this->sleeper->sleep(1);
128
            }
129
        }
130
    }
131
132
    /**
133
     * @return bool
134
     */
135
    protected function shouldContinue()
136
    {
137
        $this->pm->dispatch();
138
        return empty($this->shutdownSignal);
139
    }
140
141
    /**
142
     * Invoke job execution
143
     *
144
     * @param Envelope $envelope
145
     *
146
     * @return void
147
     */
148
    public function invoke(Envelope $envelope)
149
    {
150
        $this->eventDispatcher->dispatch(AsyncJobsEvents::BEFORE_EXECUTION, new BeforeExecutionEvent());
151
152
        try {
153
            $this->stopwatch->start();
154
155
            call_user_func($this->router->map($envelope), $envelope->getMessage());
156
157
            $this->queue->acknowledge($envelope);
158
159
            $info = new ExecutionInfo($envelope->getMessage(), $this->queue, $this->stopwatch);
0 ignored issues
show
Compatibility introduced by
$envelope->getMessage() of type object<Bernard\Message> is not a sub-type of object<Workana\AsyncJobs\Job>. It seems like you assume a concrete implementation of the interface Bernard\Message to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
160
            $this->eventDispatcher->dispatch(AsyncJobsEvents::SUCCESSFUL_EXECUTION, new SuccessfulExecutionEvent($envelope, $this, $info));
161
        } catch (Throwable $t) {
0 ignored issues
show
Bug introduced by
The class Throwable does not exist. Is this class maybe located in a folder that is not analyzed, or in a newer version of your dependencies than listed in your composer.lock/composer.json?
Loading history...
162
            $this->handleRejected($envelope, $t);
163
        } catch (Exception $e) {
164
            $this->handleRejected($envelope, $e);
165
        }
166
167
        $this->eventDispatcher->dispatch(AsyncJobsEvents::AFTER_EXECUTION, new AfterExecutionEvent());
168
    }
169
170
    /**
171
     * Get Process Managerç
172
     *
173
     * @return ProcessManager
174
     */
175
    public function getProcessManager()
176
    {
177
        return $this->pm;
178
    }
179
180
    /**
181
     * Handle rejected job
182
     *
183
     * @param Envelope $envelope Envelope
184
     * @param Throwable|Exception $error Error
185
     */
186
    protected function handleRejected(Envelope $envelope, $error)
187
    {
188
        $info = new ExecutionInfo($envelope->getMessage(), $this->queue, $this->stopwatch);
0 ignored issues
show
Compatibility introduced by
$envelope->getMessage() of type object<Bernard\Message> is not a sub-type of object<Workana\AsyncJobs\Job>. It seems like you assume a concrete implementation of the interface Bernard\Message to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
189
        $this->eventDispatcher->dispatch(AsyncJobsEvents::REJECTED_EXECUTION, new RejectedExecutionEvent(
190
            $envelope,
191
            $error,
0 ignored issues
show
Bug introduced by
It seems like $error defined by parameter $error on line 186 can also be of type object<Exception>; however, Workana\AsyncJobs\Event\...ionEvent::__construct() does only seem to accept object<Workana\AsyncJobs...ncJobs\Event\Throwable>, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
192
            $this,
193
            $info
194
        ));
195
196
        $this->queue->acknowledge($envelope);
197
        $this->retryStrategy->handleRetry($envelope, $error);
0 ignored issues
show
Bug introduced by
It seems like $error defined by parameter $error on line 186 can also be of type object<Exception>; however, Workana\AsyncJobs\Retry\...Strategy::handleRetry() does only seem to accept object<Workana\AsyncJobs...ncJobs\Retry\Exception>, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
198
    }
199
}