Worker::handleJobException()   A
last analyzed

Complexity

Conditions 5
Paths 8

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 26
rs 9.1928
c 0
b 0
f 0
cc 5
nc 8
nop 4
1
<?php
2
3
namespace SfCod\QueueBundle\Worker;
4
5
use Exception;
6
use SfCod\QueueBundle\Event\JobExceptionOccurredEvent;
7
use SfCod\QueueBundle\Event\JobFailedEvent;
8
use SfCod\QueueBundle\Event\JobProcessedEvent;
9
use SfCod\QueueBundle\Event\JobProcessingEvent;
10
use SfCod\QueueBundle\Event\WorkerStoppingEvent;
11
use SfCod\QueueBundle\Exception\FatalThrowableException;
12
use SfCod\QueueBundle\Exception\MaxAttemptsExceededException;
13
use SfCod\QueueBundle\Failer\FailedJobProviderInterface;
14
use SfCod\QueueBundle\Handler\ExceptionHandlerInterface;
15
use SfCod\QueueBundle\Job\JobContractInterface;
16
use SfCod\QueueBundle\Queue\QueueInterface;
17
use SfCod\QueueBundle\Service\JobProcess;
18
use SfCod\QueueBundle\Service\QueueManager;
19
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
20
use Symfony\Component\Process\Process;
21
use Throwable;
22
23
/**
24
 * Thread worker for job queues
25
 *
26
 * @author Virchenko Maksim <[email protected]>
27
 */
28
class Worker
29
{
30
    /**
31
     * Events
32
     */
33
    const EVENT_RAISE_BEFORE_JOB = 'job_queue_worker.raise_before_job';
34
    const EVENT_RAISE_AFTER_JOB = 'job_queue_worker.raise_after_job';
35
    const EVENT_RAISE_EXCEPTION_OCCURED_JOB = 'job_queue_worker.raise_exception_occurred_job';
36
    const EVENT_RAISE_FAILED_JOB = 'job_queue_worker.raise_failed_job';
37
    const EVENT_STOP = 'job_queue_worker.stop';
38
39
    /**
40
     * QueueManager instance
41
     *
42
     * @var QueueManager
43
     */
44
    private $queueManager;
45
46
    /**
47
     * Logger instance
48
     *
49
     * @var ExceptionHandlerInterface
50
     */
51
    private $exceptions;
52
53
    /**
54
     * Failer instance
55
     *
56
     * @var FailedJobProviderInterface
57
     */
58
    private $failer;
59
60
    /**
61
     * @var EventDispatcherInterface
62
     */
63
    private $dispatcher;
64
65
    /**
66
     * @var JobProcess
67
     */
68
    private $process;
69
70
    /**
71
     * Worker constructor.
72
     *
73
     * @param QueueManager $queueManager
74
     * @param JobProcess $process
75
     * @param FailedJobProviderInterface $failer
76
     * @param ExceptionHandlerInterface $exceptions
77
     * @param EventDispatcherInterface $dispatcher
78
     */
79
    public function __construct(QueueManager $queueManager,
80
                                JobProcess $process,
81
                                FailedJobProviderInterface $failer,
82
                                ExceptionHandlerInterface $exceptions,
83
                                EventDispatcherInterface $dispatcher)
84
    {
85
        $this->queueManager = $queueManager;
86
        $this->process = $process;
87
        $this->failer = $failer;
88
        $this->exceptions = $exceptions;
89
        $this->dispatcher = $dispatcher;
90
    }
91
92
    /**
93
     * Listen to the given queue in a loop.
94
     *
95
     * @param string $connectionName
96
     * @param string $queue
97
     * @param Options $options
98
     */
99
    public function daemon(string $connectionName, string $queue, Options $options)
100
    {
101
        while (true) {
102
            if (false === $this->runNextJob($connectionName, $queue, $options)) {
103
                $this->sleep($options->sleep);
104
            }
105
106
            if ($this->memoryExceeded($options->memory)) {
107
                $this->stop();
108
            }
109
        }
110
    }
111
112
    /**
113
     * Process the next job on the queue.
114
     *
115
     * @param string $connectionName
116
     * @param string $queue
117
     * @param Options $options
118
     *
119
     * @return bool
120
     */
121
    public function runNextJob(string $connectionName, string $queue, Options $options)
122
    {
123
        $connection = $this->queueManager->connection($connectionName);
124
        $job = $this->getNextJob($connection, $queue);
125
126
        // If we're able to pull a job off of the stack, we will process it and then return
127
        // from this method. If there is no job on the queue, we will "sleep" the worker
128
        // for the specified number of seconds, then keep processing jobs after sleep.
129
        if ($job instanceof JobContractInterface && $connection->canRunJob($job)) {
130
            $connection->markJobAsReserved($job);
131
            $this->runInBackground($job, $options);
132
133
            return true;
134
        }
135
136
        return false;
137
    }
138
139
    /**
140
     * Process the next job on the queue.
141
     *
142
     * @param string $connectionName
143
     * @param string $queue
144
     * @param string $id
145
     * @param Options $options
146
     */
147
    public function runJobById(string $connectionName, string $queue, string $id, Options $options)
148
    {
149
        try {
150
            $connection = $this->queueManager->connection($connectionName);
151
            $job = $connection->getJobById($queue, $id);
152
153
            // If we're able to pull a job off of the stack, we will process it and then return
154
            // from this method. If there is no job on the queue, we will "sleep" the worker
155
            // for the specified number of seconds, then keep processing jobs after sleep.
156
            if ($job instanceof JobContractInterface) {
157
                if (false === $job->reserved()) {
158
                    $connection->markJobAsReserved($job);
159
                }
160
161
                $this->process($connectionName, $job, $options);
162
163
                return;
164
            }
165
        } catch (Exception $e) {
166
            $this->exceptions->report($e);
167
        } catch (Throwable $e) {
168
            $this->exceptions->report(new FatalThrowableException($e));
0 ignored issues
show
Compatibility introduced by
$e of type object<Throwable> is not a sub-type of object<Exception>. It seems like you assume a concrete implementation of the interface Throwable to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
169
        }
170
171
        $this->sleep($options->sleep);
172
    }
173
174
    /**
175
     * Make a Process for the Artisan command for the job id.
176
     *
177
     * @param JobContractInterface $job
178
     * @param Options $options
179
     */
180
    public function runInBackground(JobContractInterface $job, Options $options)
181
    {
182
        $process = $this->process->getProcess($job, $options);
183
184
        $process->run();
185
    }
186
187
    /** Process the given job from the queue.
188
     *
189
     * @param string $connectionName
190
     * @param JobContractInterface $job
191
     * @param Options $options
192
     *
193
     * @return void
194
     *
195
     * @throws \Throwable
196
     */
197
    public function process(string $connectionName, JobContractInterface $job, Options $options)
198
    {
199
        try {
200
            // First we will raise the before job event and determine if the job has already ran
201
            // over the its maximum attempt limit, which could primarily happen if the job is
202
            // continually timing out and not actually throwing any exceptions from itself.
203
            $this->raiseBeforeJobEvent($connectionName, $job);
204
205
            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
206
                $connectionName, $job, (int)$options->maxTries
207
            );
208
209
            // Here we will fire off the job and let it process. We will catch any exceptions so
210
            // they can be reported to the developers logs, etc. Once the job is finished the
211
            // proper events will be fired to let any listeners know this job has finished.
212
            $job->fire();
213
214
            $this->raiseAfterJobEvent($connectionName, $job);
215
        } catch (Exception $e) {
216
            $this->handleJobException($connectionName, $job, $options, $e);
217
        } catch (Throwable $e) {
218
            $this->handleJobException(
219
                $connectionName, $job, $options, new FatalThrowableException($e)
0 ignored issues
show
Compatibility introduced by
$e of type object<Throwable> is not a sub-type of object<Exception>. It seems like you assume a concrete implementation of the interface Throwable to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
220
            );
221
        }
222
    }
223
224
    /**
225
     * Sleep the script for a given number of seconds.
226
     *
227
     * @param int $seconds
228
     *
229
     * @return void
230
     */
231
    public function sleep(int $seconds)
232
    {
233
        sleep($seconds);
234
    }
235
236
    /**
237
     * Determine if the memory limit has been exceeded.
238
     *
239
     * @param int $memoryLimit
240
     *
241
     * @return bool
242
     */
243
    public function memoryExceeded(int $memoryLimit)
244
    {
245
        return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
246
    }
247
248
    /**
249
     * Stop listening and bail out of the script.
250
     */
251
    public function stop()
252
    {
253
        $this->dispatcher->dispatch(new WorkerStoppingEvent(), self::EVENT_STOP);
0 ignored issues
show
Documentation introduced by
new \SfCod\QueueBundle\Event\WorkerStoppingEvent() is of type object<SfCod\QueueBundle...nt\WorkerStoppingEvent>, but the function expects a object<Symfony\Contracts\EventDispatcher\object>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
254
255
        exit(0);
256
    }
257
258
    /**
259
     * Mark the given job as failed if it has exceeded the maximum allowed attempts.
260
     *
261
     * This will likely be because the job previously exceeded a timeout.
262
     *
263
     * @param string $connectionName
264
     * @param JobContractInterface $job
265
     * @param int $maxTries
266
     *
267
     * @return void
268
     */
269
    protected function markJobAsFailedIfAlreadyExceedsMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries)
270
    {
271
        $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
272
273
        $timeoutAt = $job->timeoutAt();
274
275
        if ($timeoutAt && time() <= $timeoutAt) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $timeoutAt of type integer|null is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
276
            return;
277
        }
278
279
        if (!$timeoutAt && (0 === $maxTries || $job->attempts() <= $maxTries)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $timeoutAt of type integer|null is loosely compared to false; this is ambiguous if the integer can be zero. You might want to explicitly use === null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
280
            return;
281
        }
282
283
        $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
284
            'A queued job has been attempted too many times or run too long. The job may have previously timed out.'
285
        ));
286
287
        throw $e;
288
    }
289
290
    /**
291
     * Mark the given job as failed and raise the relevant event.
292
     *
293
     * @param string $connectionName
294
     * @param JobContractInterface $job
295
     * @param Exception $e
296
     */
297
    protected function failJob(string $connectionName, JobContractInterface $job, Exception $e)
298
    {
299
        if ($job->isDeleted()) {
300
            return;
301
        }
302
303
        try {
304
            // If the job has failed, we will delete it, call the "failed" method and then call
305
            // an event indicating the job has failed so it can be logged if needed. This is
306
            // to allow every developer to better keep monitor of their failed queue jobs.
307
            $job->delete();
308
309
            $job->failed($e);
310
        } finally {
311
            $this->failer->log($connectionName, $job->getQueue(), $job->getRawBody(), $e);
312
            $this->raiseFailedJobEvent($connectionName, $job, $e);
313
        }
314
    }
315
316
    /**
317
     * Handle an exception that occurred while the job was running.
318
     *
319
     * @param string $connectionName
320
     * @param JobContractInterface $job
321
     * @param Options $options
322
     * @param Exception $e
323
     *
324
     * @return void
325
     *
326
     * @throws Exception
327
     */
328
    protected function handleJobException(string $connectionName, JobContractInterface $job, Options $options, Exception $e)
329
    {
330
        try {
331
            // First, we will go ahead and mark the job as failed if it will exceed the maximum
332
            // attempts it is allowed to run the next time we process it. If so we will just
333
            // go ahead and mark it as failed now so we do not have to release this again.
334
            if (!$job->hasFailed()) {
0 ignored issues
show
Bug introduced by
The method hasFailed() does not exist on SfCod\QueueBundle\Job\JobContractInterface. Did you maybe mean failed()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
335
                $this->markJobAsFailedIfWillExceedMaxAttempts(
336
                    $connectionName, $job, (int)$options->maxTries, $e
337
                );
338
            }
339
340
            $this->raiseExceptionOccurredJobEvent(
341
                $connectionName, $job, $e
342
            );
343
        } finally {
344
            // If we catch an exception, we will attempt to release the job back onto the queue
345
            // so it is not lost entirely. This'll let the job be retried at a later time by
346
            // another listener (or this same one). We will re-throw this exception after.
347
            if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) {
0 ignored issues
show
Bug introduced by
The method isReleased() does not exist on SfCod\QueueBundle\Job\JobContractInterface. Did you maybe mean release()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
Bug introduced by
The method hasFailed() does not exist on SfCod\QueueBundle\Job\JobContractInterface. Did you maybe mean failed()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
348
                $job->release($options->delay);
349
            }
350
        }
351
352
        throw $e;
353
    }
354
355
    /**
356
     * Mark the given job as failed if it has exceeded the maximum allowed attempts.
357
     *
358
     * @param string $connectionName
359
     * @param JobContractInterface $job
360
     * @param int $maxTries
361
     * @param Exception $e
362
     *
363
     * @return void
364
     */
365
    protected function markJobAsFailedIfWillExceedMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries, Exception $e)
366
    {
367
        $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
368
369
        if ($job->timeoutAt() && $job->timeoutAt() <= time()) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $job->timeoutAt() of type integer|null is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
370
            $this->failJob($connectionName, $job, $e);
371
        }
372
373
        if ($maxTries > 0 && $job->attempts() >= $maxTries) {
374
            $this->failJob($connectionName, $job, $e);
375
        }
376
    }
377
378
    /**
379
     * Get the next job from the queue connection.
380
     *
381
     * @param QueueInterface $connection
382
     * @param string $queue
383
     *
384
     * @return JobContractInterface|null
385
     */
386
    protected function getNextJob(QueueInterface $connection, string $queue): ?JobContractInterface
387
    {
388
        try {
389
            foreach (explode(',', $queue) as $queue) {
390
                if (!is_null($job = $connection->pop($queue))) {
391
                    return $job;
392
                }
393
            }
394
        } catch (Exception $e) {
395
            $this->exceptions->report($e);
396
        } catch (Throwable $e) {
397
            $this->exceptions->report($e = new FatalThrowableException($e));
0 ignored issues
show
Compatibility introduced by
$e of type object<Throwable> is not a sub-type of object<Exception>. It seems like you assume a concrete implementation of the interface Throwable to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
398
        }
399
400
        return null;
401
    }
402
403
    /**
404
     * Raise the before queue job event.
405
     *
406
     * @param string $connectionName
407
     * @param JobContractInterface $job
408
     */
409
    protected function raiseBeforeJobEvent(string $connectionName, JobContractInterface $job)
410
    {
411
        $this->dispatcher->dispatch(new JobProcessingEvent($connectionName, $job), self::EVENT_RAISE_AFTER_JOB);
0 ignored issues
show
Documentation introduced by
new \SfCod\QueueBundle\E...($connectionName, $job) is of type object<SfCod\QueueBundle...ent\JobProcessingEvent>, but the function expects a object<Symfony\Contracts\EventDispatcher\object>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
412
    }
413
414
    /**
415
     * Raise the after queue job event.
416
     *
417
     * @param string $connectionName
418
     * @param JobContractInterface $job
419
     */
420
    protected function raiseAfterJobEvent(string $connectionName, JobContractInterface $job)
421
    {
422
        $this->dispatcher->dispatch(new JobProcessedEvent($connectionName, $job), self::EVENT_RAISE_AFTER_JOB);
0 ignored issues
show
Documentation introduced by
new \SfCod\QueueBundle\E...($connectionName, $job) is of type object<SfCod\QueueBundle\Event\JobProcessedEvent>, but the function expects a object<Symfony\Contracts\EventDispatcher\object>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
423
    }
424
425
    /**
426
     * Raise the exception occurred queue job event.
427
     *
428
     * @param string $connectionName
429
     * @param JobContractInterface $job
430
     * @param Exception $e
431
     */
432
    protected function raiseExceptionOccurredJobEvent(string $connectionName, JobContractInterface $job, Exception $e)
433
    {
434
        $this->dispatcher->dispatch(new JobExceptionOccurredEvent($connectionName, $job, $e), self::EVENT_RAISE_EXCEPTION_OCCURED_JOB);
0 ignored issues
show
Documentation introduced by
new \SfCod\QueueBundle\E...nnectionName, $job, $e) is of type object<SfCod\QueueBundle...ExceptionOccurredEvent>, but the function expects a object<Symfony\Contracts\EventDispatcher\object>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
435
    }
436
437
    /**
438
     * Raise the failed queue job event.
439
     *
440
     * @param string $connectionName
441
     * @param JobContractInterface $job
442
     * @param Exception $e
443
     */
444
    protected function raiseFailedJobEvent(string $connectionName, JobContractInterface $job, Exception $e)
445
    {
446
        $this->dispatcher->dispatch(new JobFailedEvent($connectionName, $job, $e), self::EVENT_RAISE_FAILED_JOB);
0 ignored issues
show
Documentation introduced by
new \SfCod\QueueBundle\E...nnectionName, $job, $e) is of type object<SfCod\QueueBundle\Event\JobFailedEvent>, but the function expects a object<Symfony\Contracts\EventDispatcher\object>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
447
    }
448
}
449