Completed
Pull Request — master (#2)
by
unknown
38:19
created

Worker   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 422
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 12

Importance

Changes 0
Metric Value
wmc 49
lcom 1
cbo 12
dl 0
loc 422
rs 8.5454
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 12 1
A daemon() 0 12 4
A runNextJob() 0 17 3
B runJobById() 0 26 5
A runInBackground() 0 6 1
B process() 0 26 3
A sleep() 0 4 1
A memoryExceeded() 0 4 1
A stop() 0 6 1
B markJobAsFailedIfAlreadyExceedsMaxAttempts() 0 20 7
A failJob() 0 18 2
B handleJobException() 0 26 5
B markJobAsFailedIfWillExceedMaxAttempts() 0 12 6
B getNextJob() 0 16 5
A raiseBeforeJobEvent() 0 4 1
A raiseAfterJobEvent() 0 4 1
A raiseExceptionOccurredJobEvent() 0 4 1
A raiseFailedJobEvent() 0 4 1

How to fix   Complexity   

Complex Class

Complex classes like Worker often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Worker, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace SfCod\QueueBundle\Worker;
4
5
use Exception;
6
use SfCod\QueueBundle\Event\JobExceptionOccurredEvent;
7
use SfCod\QueueBundle\Event\JobFailedEvent;
8
use SfCod\QueueBundle\Event\JobProcessedEvent;
9
use SfCod\QueueBundle\Event\JobProcessingEvent;
10
use SfCod\QueueBundle\Event\WorkerStoppingEvent;
11
use SfCod\QueueBundle\Exception\FatalThrowableException;
12
use SfCod\QueueBundle\Failer\FailedJobProviderInterface;
13
use SfCod\QueueBundle\Handler\ExceptionHandlerInterface;
14
use SfCod\QueueBundle\Job\JobContractInterface;
15
use SfCod\QueueBundle\MaxAttemptsExceededException;
16
use SfCod\QueueBundle\Queue\QueueInterface;
17
use SfCod\QueueBundle\Service\JobProcess;
18
use SfCod\QueueBundle\Service\QueueManager;
19
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
20
use Symfony\Component\Process\Process;
21
use Throwable;
22
23
/**
24
 * Thread worker for job queues
25
 *
26
 * @author Virchenko Maksim <[email protected]>
27
 */
28
class Worker
29
{
30
    /**
31
     * Events
32
     */
33
    const EVENT_RAISE_BEFORE_JOB = 'job_queue_worker.raise_before_job';
34
    const EVENT_RAISE_AFTER_JOB = 'job_queue_worker.raise_after_job';
35
    const EVENT_RAISE_EXCEPTION_OCCURED_JOB = 'job_queue_worker.raise_exception_occurred_job';
36
    const EVENT_RAISE_FAILED_JOB = 'job_queue_worker.raise_failed_job';
37
    const EVENT_STOP = 'job_queue_worker.stop';
38
39
    /**
40
     * QueueManager instance
41
     *
42
     * @var QueueManager
43
     */
44
    private $queueManager;
45
46
    /**
47
     * Logger instance
48
     *
49
     * @var ExceptionHandler
50
     */
51
    private $exceptions;
52
53
    /**
54
     * Failer instance
55
     *
56
     * @var FailedJobProviderInterface
57
     */
58
    private $failer;
59
60
    /**
61
     * @var EventDispatcherInterface
62
     */
63
    private $dispatcher;
64
65
    /**
66
     * @var JobProcess
67
     */
68
    private $jobProcess;
69
70
    /**
71
     * Worker constructor.
72
     *
73
     * @param QueueInterface $queue
0 ignored issues
show
Bug introduced by
There is no parameter named $queue. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
74
     * @param JobProcess $process
75
     * @param FailedJobProviderInterface $failer
76
     * @param ExceptionHandlerInterface $exceptions
77
     * @param EventDispatcherInterface $dispatcher
78
     */
79
    public function __construct(QueueManager $queueManager,
80
                                JobProcess $process,
81
                                FailedJobProviderInterface $failer,
82
                                ExceptionHandlerInterface $exceptions,
83
                                EventDispatcherInterface $dispatcher)
84
    {
85
        $this->queueManager = $queueManager;
86
        $this->process = $process;
0 ignored issues
show
Bug introduced by
The property process does not seem to exist. Did you mean jobProcess?

An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.

If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.

Loading history...
87
        $this->failer = $failer;
88
        $this->exceptions = $exceptions;
0 ignored issues
show
Documentation Bug introduced by
It seems like $exceptions of type object<SfCod\QueueBundle...eptionHandlerInterface> is incompatible with the declared type object<SfCod\QueueBundle\Worker\ExceptionHandler> of property $exceptions.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
89
        $this->dispatcher = $dispatcher;
90
    }
91
92
    /**
93
     * Listen to the given queue in a loop.
94
     *
95
     * @param string $connectionName
96
     * @param string $queue
97
     * @param Options $options
98
     */
99
    public function daemon(string $connectionName, string $queue, Options $options)
100
    {
101
        while (true) {
102
            if (false === $this->runNextJob($connectionName, $queue, $options)) {
103
                $this->sleep($options->sleep);
104
            }
105
106
            if ($this->memoryExceeded($options->memory)) {
107
                $this->stop();
108
            }
109
        }
110
    }
111
112
    /**
113
     * Process the next job on the queue.
114
     *
115
     * @param string $connectionName
116
     * @param string $queue
117
     * @param Options $options
118
     *
119
     * @return bool
120
     */
121
    public function runNextJob(string $connectionName, string $queue, Options $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
122
    {
123
        $connection = $this->queueManager->connection($connectionName);
124
        $job = $this->getNextJob($connection, $queue);
125
126
        // If we're able to pull a job off of the stack, we will process it and then return
127
        // from this method. If there is no job on the queue, we will "sleep" the worker
128
        // for the specified number of seconds, then keep processing jobs after sleep.
129
        if ($job instanceof JobContractInterface && $connection->canRunJob($job)) {
130
            $connection->markJobAsReserved($job);
131
            $this->runInBackground($job, $connectionName);
132
133
            return true;
134
        }
135
136
        return false;
137
    }
138
139
    /**
140
     * Process the next job on the queue.
141
     *
142
     * @param string $connectionName
143
     * @param $id
144
     * @param Options $options
145
     */
146
    public function runJobById(string $connectionName, $id, Options $options)
147
    {
148
        try {
149
            $connection = $this->queueManager->connection($connectionName);
150
            $job = $connection->getJobById($id);
151
152
            // If we're able to pull a job off of the stack, we will process it and then return
153
            // from this method. If there is no job on the queue, we will "sleep" the worker
154
            // for the specified number of seconds, then keep processing jobs after sleep.
155
            if ($job instanceof JobContractInterface) {
156
                if (false === $job->reserved()) {
157
                    $connection->markJobAsReserved($job);
158
                }
159
160
                $this->process($connectionName, $job, $options);
161
162
                return;
163
            }
164
        } catch (Exception $e) {
165
            $this->exceptions->report($e);
166
        } catch (Throwable $e) {
167
            $this->exceptions->report(new FatalThrowableException($e));
0 ignored issues
show
Compatibility introduced by
$e of type object<Throwable> is not a sub-type of object<Exception>. It seems like you assume a concrete implementation of the interface Throwable to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
168
        }
169
170
        $this->sleep($options->sleep);
171
    }
172
173
    /**
174
     * Make a Process for the Artisan command for the job id.
175
     *
176
     * @param JobContractInterface $job
177
     * @param string $connectionName
178
     */
179
    public function runInBackground(JobContractInterface $job, string $connectionName)
180
    {
181
        $process = $this->process->getProcess($job, $connectionName);
0 ignored issues
show
Bug introduced by
The property process does not seem to exist. Did you mean jobProcess?

An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.

If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.

Loading history...
182
183
        $process->run();
184
    }
185
186
    /** Process the given job from the queue.
187
     *
188
     * @param string $connectionName
189
     * @param JobContractInterface $job
190
     * @param Options $options
191
     *
192
     * @return void
193
     *
194
     * @throws \Throwable
195
     */
196
    public function process(string $connectionName, JobContractInterface $job, Options $options)
197
    {
198
        try {
199
            // First we will raise the before job event and determine if the job has already ran
200
            // over the its maximum attempt limit, which could primarily happen if the job is
201
            // continually timing out and not actually throwing any exceptions from itself.
202
            $this->raiseBeforeJobEvent($connectionName, $job);
203
204
            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
205
                $connectionName, $job, (int)$options->maxTries
206
            );
207
208
            // Here we will fire off the job and let it process. We will catch any exceptions so
209
            // they can be reported to the developers logs, etc. Once the job is finished the
210
            // proper events will be fired to let any listeners know this job has finished.
211
            $job->fire();
212
213
            $this->raiseAfterJobEvent($connectionName, $job);
214
        } catch (Exception $e) {
215
            $this->handleJobException($connectionName, $job, $options, $e);
216
        } catch (Throwable $e) {
217
            $this->handleJobException(
218
                $connectionName, $job, $options, new FatalThrowableException($e)
0 ignored issues
show
Compatibility introduced by
$e of type object<Throwable> is not a sub-type of object<Exception>. It seems like you assume a concrete implementation of the interface Throwable to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
219
            );
220
        }
221
    }
222
223
    /**
224
     * Sleep the script for a given number of seconds.
225
     *
226
     * @param int $seconds
227
     *
228
     * @return void
229
     */
230
    public function sleep(int $seconds)
231
    {
232
        sleep($seconds);
233
    }
234
235
    /**
236
     * Determine if the memory limit has been exceeded.
237
     *
238
     * @param int $memoryLimit
239
     *
240
     * @return bool
241
     */
242
    public function memoryExceeded(int $memoryLimit)
243
    {
244
        return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
245
    }
246
247
    /**
248
     * Stop listening and bail out of the script.
249
     *
250
     * @param int $status
251
     */
252
    public function stop(int $status = 0)
0 ignored issues
show
Unused Code introduced by
The parameter $status is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
253
    {
254
        $this->dispatcher->dispatch(self::EVENT_STOP, new WorkerStoppingEvent());
255
256
        exit(0);
257
    }
258
259
    /**
260
     * Mark the given job as failed if it has exceeded the maximum allowed attempts.
261
     *
262
     * This will likely be because the job previously exceeded a timeout.
263
     *
264
     * @param string $connectionName
265
     * @param JobContractInterface $job
266
     * @param int $maxTries
267
     *
268
     * @return void
269
     */
270
    protected function markJobAsFailedIfAlreadyExceedsMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries)
271
    {
272
        $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
273
274
        $timeoutAt = $job->timeoutAt();
275
276
        if ($timeoutAt && time() <= $timeoutAt) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $timeoutAt of type integer|null is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
277
            return;
278
        }
279
280
        if (!$timeoutAt && (0 === $maxTries || $job->attempts() <= $maxTries)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $timeoutAt of type integer|null is loosely compared to false; this is ambiguous if the integer can be zero. You might want to explicitly use === null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
281
            return;
282
        }
283
284
        $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
285
            'A queued job has been attempted too many times or run too long. The job may have previously timed out.'
286
        ));
287
288
        throw $e;
289
    }
290
291
    /**
292
     * Mark the given job as failed and raise the relevant event.
293
     *
294
     * @param string $connectionName
295
     * @param JobContractInterface $job
296
     * @param Exception $e
297
     */
298
    protected function failJob(string $connectionName, JobContractInterface $job, Exception $e)
299
    {
300
        if ($job->isDeleted()) {
301
            return;
302
        }
303
304
        try {
305
            // If the job has failed, we will delete it, call the "failed" method and then call
306
            // an event indicating the job has failed so it can be logged if needed. This is
307
            // to allow every developer to better keep monitor of their failed queue jobs.
308
            $job->delete();
309
310
            $job->failed($e);
311
        } finally {
312
            $this->failer->log($connectionName, $job->getQueue(), $job->getRawBody(), $e);
313
            $this->raiseFailedJobEvent($connectionName, $job, $e);
314
        }
315
    }
316
317
    /**
318
     * Handle an exception that occurred while the job was running.
319
     *
320
     * @param string $connectionName
321
     * @param JobContractInterface $job
322
     * @param Options $options
323
     * @param Exception $e
324
     *
325
     * @return void
326
     *
327
     * @throws Exception
328
     */
329
    protected function handleJobException(string $connectionName, JobContractInterface $job, Options $options, Exception $e)
330
    {
331
        try {
332
            // First, we will go ahead and mark the job as failed if it will exceed the maximum
333
            // attempts it is allowed to run the next time we process it. If so we will just
334
            // go ahead and mark it as failed now so we do not have to release this again.
335
            if (!$job->hasFailed()) {
0 ignored issues
show
Bug introduced by
The method hasFailed() does not exist on SfCod\QueueBundle\Job\JobContractInterface. Did you maybe mean failed()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
336
                $this->markJobAsFailedIfWillExceedMaxAttempts(
337
                    $connectionName, $job, (int)$options->maxTries, $e
338
                );
339
            }
340
341
            $this->raiseExceptionOccurredJobEvent(
342
                $connectionName, $job, $e
343
            );
344
        } finally {
345
            // If we catch an exception, we will attempt to release the job back onto the queue
346
            // so it is not lost entirely. This'll let the job be retried at a later time by
347
            // another listener (or this same one). We will re-throw this exception after.
348
            if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) {
0 ignored issues
show
Bug introduced by
The method isReleased() does not exist on SfCod\QueueBundle\Job\JobContractInterface. Did you maybe mean release()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
Bug introduced by
The method hasFailed() does not exist on SfCod\QueueBundle\Job\JobContractInterface. Did you maybe mean failed()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
349
                $job->release($options->delay);
350
            }
351
        }
352
353
        throw $e;
354
    }
355
356
    /**
357
     * Mark the given job as failed if it has exceeded the maximum allowed attempts.
358
     *
359
     * @param string $connectionName
360
     * @param JobContractInterface $job
361
     * @param int $maxTries
362
     * @param Exception $e
363
     *
364
     * @return void
365
     */
366
    protected function markJobAsFailedIfWillExceedMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries, Exception $e)
367
    {
368
        $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
369
370
        if ($job->timeoutAt() && $job->timeoutAt() <= time()) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $job->timeoutAt() of type integer|null is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
371
            $this->failJob($connectionName, $job, $e);
372
        }
373
374
        if ($maxTries > 0 && $job->attempts() >= $maxTries) {
375
            $this->failJob($connectionName, $job, $e);
376
        }
377
    }
378
379
    /**
380
     * Get the next job from the queue connection.
381
     *
382
     * @param QueueInterface $connection
383
     * @param string $queue
384
     *
385
     * @return JobContractInterface|null
386
     */
387
    protected function getNextJob(QueueInterface $connection, string $queue): ?JobContractInterface
388
    {
389
        try {
390
            foreach (explode(',', $queue) as $queue) {
391
                if (!is_null($job = $connection->pop($queue))) {
392
                    return $job;
393
                }
394
            }
395
        } catch (Exception $e) {
396
            $this->exceptions->report($e);
397
        } catch (Throwable $e) {
398
            $this->exceptions->report($e = new FatalThrowableException($e));
0 ignored issues
show
Compatibility introduced by
$e of type object<Throwable> is not a sub-type of object<Exception>. It seems like you assume a concrete implementation of the interface Throwable to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
399
        }
400
401
        return null;
402
    }
403
404
    /**
405
     * Raise the before queue job event.
406
     *
407
     * @param string $connectionName
408
     * @param JobContractInterface $job
409
     */
410
    protected function raiseBeforeJobEvent(string $connectionName, JobContractInterface $job)
411
    {
412
        $this->dispatcher->dispatch(self::EVENT_RAISE_AFTER_JOB, new JobProcessingEvent($connectionName, $job));
413
    }
414
415
    /**
416
     * Raise the after queue job event.
417
     *
418
     * @param string $connectionName
419
     * @param JobContractInterface $job
420
     */
421
    protected function raiseAfterJobEvent(string $connectionName, JobContractInterface $job)
422
    {
423
        $this->dispatcher->dispatch(self::EVENT_RAISE_AFTER_JOB, new JobProcessedEvent($connectionName, $job));
424
    }
425
426
    /**
427
     * Raise the exception occurred queue job event.
428
     *
429
     * @param string $connectionName
430
     * @param JobContractInterface $job
431
     * @param Exception $e
432
     */
433
    protected function raiseExceptionOccurredJobEvent(string $connectionName, JobContractInterface $job, Exception $e)
434
    {
435
        $this->dispatcher->dispatch(self::EVENT_RAISE_EXCEPTION_OCCURED_JOB, new JobExceptionOccurredEvent($connectionName, $job, $e));
436
    }
437
438
    /**
439
     * Raise the failed queue job event.
440
     *
441
     * @param string $connectionName
442
     * @param JobContractInterface $job
443
     * @param Exception $e
444
     */
445
    protected function raiseFailedJobEvent(string $connectionName, JobContractInterface $job, Exception $e)
446
    {
447
        $this->dispatcher->dispatch(self::EVENT_RAISE_FAILED_JOB, new JobFailedEvent($connectionName, $job, $e));
448
    }
449
}
450