Completed
Push — master ( b00a6a...d29062 )
by Alexey
13s
created

Worker.php (5 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace SfCod\QueueBundle;
4
5
use Carbon\Carbon;
6
use Exception;
7
use Illuminate\Contracts\Queue\Queue;
8
use Illuminate\Queue\Jobs\Job;
9
use Illuminate\Queue\MaxAttemptsExceededException;
10
use Illuminate\Queue\QueueManager;
11
use SfCod\QueueBundle\Base\FatalThrowableError;
12
use SfCod\QueueBundle\Event\JobExceptionOccurredEvent;
13
use SfCod\QueueBundle\Event\JobFailedEvent;
14
use SfCod\QueueBundle\Event\JobProcessedEvent;
15
use SfCod\QueueBundle\Event\JobProcessingEvent;
16
use SfCod\QueueBundle\Event\WorkerStoppingEvent;
17
use SfCod\QueueBundle\Failer\MongoFailedJobProvider;
18
use SfCod\QueueBundle\Handler\ExceptionHandlerInterface;
19
use SfCod\QueueBundle\Queue\MongoQueue;
20
use SfCod\QueueBundle\Service\JobQueue;
21
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
22
use Symfony\Component\Process\Process;
23
use Throwable;
24
25
/**
26
 * Thread worker for job queues
27
 *
28
 * @author Virchenko Maksim <[email protected]>
29
 */
30
class Worker
31
{
32
    /**
33
     * Events
34
     */
35
    const EVENT_RAISE_BEFORE_JOB = 'job_queue_worker.raise_before_job';
36
    const EVENT_RAISE_AFTER_JOB = 'job_queue_worker.raise_after_job';
37
    const EVENT_RAISE_EXCEPTION_OCCURED_JOB = 'job_queue_worker.raise_exception_occurred_job';
38
    const EVENT_RAISE_FAILED_JOB = 'job_queue_worker.raise_failed_job';
39
    const EVENT_STOP = 'job_queue_worker.stop';
40
41
    /**
42
     * @var QueueManager
43
     */
44
    private $manager;
45
46
    /**
47
     * Logger instance
48
     *
49
     * @var ExceptionHandler
50
     */
51
    private $exceptions;
52
53
    /**
54
     * Failer instance
55
     *
56
     * @var MongoFailedJobProvider
57
     */
58
    private $failer;
59
60
    /**
61
     * @var EventDispatcherInterface
62
     */
63
    private $dispatcher;
64
65
    /**
66
     * @var JobProcess
67
     */
68
    private $jobProcess;
69
70
    /**
71
     * Create a new queue worker.
72
     *
73
     * @param QueueManager $manager
0 ignored issues
show
There is no parameter named $manager. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
74
     * @param MongoFailedJobProvider $failer
75
     * @param ExceptionHandler $exceptions
76
     */
77
    public function __construct(JobQueue $queue,
78
                                JobProcess $process,
79
                                MongoFailedJobProvider $failer,
80
                                ExceptionHandlerInterface $exceptions,
81
                                EventDispatcherInterface $dispatcher)
82
    {
83
        $this->manager = $queue->getQueueManager();
84
        $this->process = $process;
0 ignored issues
show
The property process does not seem to exist. Did you mean jobProcess?

An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.

If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.

Loading history...
85
        $this->failer = $failer;
86
        $this->exceptions = $exceptions;
0 ignored issues
show
Documentation Bug introduced by
It seems like $exceptions of type object<SfCod\QueueBundle...eptionHandlerInterface> is incompatible with the declared type object<SfCod\QueueBundle\ExceptionHandler> of property $exceptions.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
87
        $this->dispatcher = $dispatcher;
88
    }
89
90
    /**
91
     * Listen to the given queue in a loop.
92
     *
93
     * @param string $connectionName
94
     * @param string $queue
95
     * @param Options $options
96
     */
97
    public function daemon($connectionName, $queue, Options $options)
98
    {
99
        while (true) {
100
            if (false === $this->runNextJob($connectionName, $queue, $options)) {
101
                $this->sleep($options->sleep);
102
            }
103
104
            if ($this->memoryExceeded($options->memory)) {
105
                $this->stop();
106
            }
107
        }
108
    }
109
110
    /**
111
     * Process the next job on the queue.
112
     *
113
     * @param string $connectionName
114
     * @param string $queue
115
     * @param Options $options
116
     *
117
     * @return bool
118
     */
119
    public function runNextJob($connectionName, $queue, Options $options)
120
    {
121
        /** @var MongoQueue|Queue $connection */
122
        $connection = $this->manager->connection($connectionName);
123
124
        $job = $this->getNextJob($connection, $queue);
125
126
        // If we're able to pull a job off of the stack, we will process it and then return
127
        // from this method. If there is no job on the queue, we will "sleep" the worker
128
        // for the specified number of seconds, then keep processing jobs after sleep.
129
        if ($job instanceof Job && $connection->canRunJob($job)) {
130
            $connection->markJobAsReserved($job);
131
            $this->runInBackground($job, $connectionName);
132
133
            return true;
134
        }
135
136
        return false;
137
    }
138
139
    /**
140
     * Process the next job on the queue.
141
     *
142
     * @param string $connectionName
143
     * @param $id
144
     * @param Options $options
145
     */
146
    public function runJobById($connectionName, $id, Options $options)
147
    {
148
        /** @var MongoQueue|Queue $connection */
149
        $connection = $this->manager->connection($connectionName);
150
151
        try {
152
            $job = $connection->getJobById($id);
153
154
            // If we're able to pull a job off of the stack, we will process it and then return
155
            // from this method. If there is no job on the queue, we will "sleep" the worker
156
            // for the specified number of seconds, then keep processing jobs after sleep.
157
            if ($job instanceof Job) {
158
                if (false === $job->reserved()) {
159
                    $connection->markJobAsReserved($job);
160
                }
161
162
                $this->process($connectionName, $job, $options);
163
164
                return;
165
            }
166
        } catch (Exception $e) {
167
            $this->exceptions->report($e);
168
        } catch (Throwable $e) {
169
            $this->exceptions->report(new FatalThrowableError($e));
170
        }
171
172
        $this->sleep($options->sleep);
173
    }
174
175
    /**
176
     * Make a Process for the Artisan command for the job id.
177
     *
178
     * @param Job $job
179
     * @param string $connectionName
180
     */
181
    public function runInBackground(Job $job, string $connectionName)
182
    {
183
        $process = $this->process->getProcess($job, $connectionName);
0 ignored issues
show
The property process does not seem to exist. Did you mean jobProcess?

An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.

If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.

Loading history...
184
185
        $process->run();
186
    }
187
188
    /** Process the given job from the queue.
189
     *
190
     * @param string $connectionName
191
     * @param \Illuminate\Contracts\Queue\Job $job
192
     * @param Options $options
193
     *
194
     * @return void
195
     *
196
     * @throws \Throwable
197
     */
198
    public function process($connectionName, $job, Options $options)
199
    {
200
        try {
201
            // First we will raise the before job event and determine if the job has already ran
202
            // over the its maximum attempt limit, which could primarily happen if the job is
203
            // continually timing out and not actually throwing any exceptions from itself.
204
            $this->raiseBeforeJobEvent($connectionName, $job);
205
206
            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
207
                $connectionName, $job, (int)$options->maxTries
208
            );
209
210
            // Here we will fire off the job and let it process. We will catch any exceptions so
211
            // they can be reported to the developers logs, etc. Once the job is finished the
212
            // proper events will be fired to let any listeners know this job has finished.
213
            $job->fire();
214
215
            $this->raiseAfterJobEvent($connectionName, $job);
216
        } catch (Exception $e) {
217
            $this->handleJobException($connectionName, $job, $options, $e);
218
        } catch (Throwable $e) {
219
            $this->handleJobException(
220
                $connectionName, $job, $options, new FatalThrowableError($e)
221
            );
222
        }
223
    }
224
225
    /**
226
     * Sleep the script for a given number of seconds.
227
     *
228
     * @param int $seconds
229
     *
230
     * @return void
231
     */
232
    public function sleep($seconds)
233
    {
234
        sleep($seconds);
235
    }
236
237
    /**
238
     * Determine if the memory limit has been exceeded.
239
     *
240
     * @param int $memoryLimit
241
     *
242
     * @return bool
243
     */
244
    public function memoryExceeded($memoryLimit)
245
    {
246
        return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
247
    }
248
249
    /**
250
     * Stop listening and bail out of the script.
251
     *
252
     * @param int $status
253
     */
254
    public function stop($status = 0)
0 ignored issues
show
The parameter $status is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
255
    {
256
        $this->dispatcher->dispatch(self::EVENT_STOP, new WorkerStoppingEvent());
257
258
        exit(0);
259
    }
260
261
    /**
262
     * Mark the given job as failed if it has exceeded the maximum allowed attempts.
263
     *
264
     * This will likely be because the job previously exceeded a timeout.
265
     *
266
     * @param string $connectionName
267
     * @param \Illuminate\Contracts\Queue\Job $job
268
     * @param int $maxTries
269
     *
270
     * @return void
271
     */
272
    protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
273
    {
274
        $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
275
276
        $timeoutAt = $job->timeoutAt();
277
278
        if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {
279
            return;
280
        }
281
282
        if (!$timeoutAt && (0 === $maxTries || $job->attempts() <= $maxTries)) {
283
            return;
284
        }
285
286
        $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
287
            'A queued job has been attempted too many times or run too long. The job may have previously timed out.'
288
        ));
289
290
        throw $e;
291
    }
292
293
    /**
294
     * Mark the given job as failed and raise the relevant event.
295
     *
296
     * @param string $connectionName
297
     * @param \Illuminate\Contracts\Queue\Job $job
298
     * @param \Exception $e
299
     */
300
    protected function failJob($connectionName, $job, $e)
301
    {
302
        if ($job->isDeleted()) {
303
            return;
304
        }
305
306
        try {
307
            // If the job has failed, we will delete it, call the "failed" method and then call
308
            // an event indicating the job has failed so it can be logged if needed. This is
309
            // to allow every developer to better keep monitor of their failed queue jobs.
310
            $job->delete();
311
312
            $job->failed($e);
313
        } finally {
314
            $this->failer->log($connectionName, $job->getQueue(), $job->getRawBody(), $e);
315
            $this->raiseFailedJobEvent($connectionName, $job, $e);
316
        }
317
    }
318
319
    /**
320
     * Handle an exception that occurred while the job was running.
321
     *
322
     * @param string $connectionName
323
     * @param \Illuminate\Contracts\Queue\Job $job
324
     * @param Options $options
325
     * @param \Exception $e
326
     *
327
     * @return void
328
     *
329
     * @throws \Exception
330
     */
331
    protected function handleJobException($connectionName, $job, Options $options, $e)
332
    {
333
        try {
334
            // First, we will go ahead and mark the job as failed if it will exceed the maximum
335
            // attempts it is allowed to run the next time we process it. If so we will just
336
            // go ahead and mark it as failed now so we do not have to release this again.
337
            if (!$job->hasFailed()) {
338
                $this->markJobAsFailedIfWillExceedMaxAttempts(
339
                    $connectionName, $job, (int)$options->maxTries, $e
340
                );
341
            }
342
343
            $this->raiseExceptionOccurredJobEvent(
344
                $connectionName, $job, $e
345
            );
346
        } finally {
347
            // If we catch an exception, we will attempt to release the job back onto the queue
348
            // so it is not lost entirely. This'll let the job be retried at a later time by
349
            // another listener (or this same one). We will re-throw this exception after.
350
            if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) {
351
                $job->release($options->delay);
352
            }
353
        }
354
355
        throw $e;
356
    }
357
358
    /**
359
     * Mark the given job as failed if it has exceeded the maximum allowed attempts.
360
     *
361
     * @param string $connectionName
362
     * @param \Illuminate\Contracts\Queue\Job $job
363
     * @param int $maxTries
364
     * @param \Exception $e
365
     *
366
     * @return void
367
     */
368
    protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
369
    {
370
        $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
371
372
        if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
373
            $this->failJob($connectionName, $job, $e);
374
        }
375
376
        if ($maxTries > 0 && $job->attempts() >= $maxTries) {
377
            $this->failJob($connectionName, $job, $e);
378
        }
379
    }
380
381
    /**
382
     * Get the next job from the queue connection.
383
     *
384
     * @param \Illuminate\Contracts\Queue\Queue $connection
385
     * @param string $queue
386
     *
387
     * @return \Illuminate\Contracts\Queue\Job|null
388
     */
389
    protected function getNextJob($connection, $queue)
390
    {
391
        try {
392
            foreach (explode(',', $queue) as $queue) {
393
                if (!is_null($job = $connection->pop($queue))) {
394
                    return $job;
395
                }
396
            }
397
        } catch (Exception $e) {
398
            $this->exceptions->report($e);
399
        } catch (Throwable $e) {
400
            $this->exceptions->report($e = new FatalThrowableError($e));
401
        }
402
    }
403
404
    /**
405
     * Raise the before queue job event.
406
     *
407
     * @param string $connectionName
408
     * @param \Illuminate\Contracts\Queue\Job $job
409
     */
410
    protected function raiseBeforeJobEvent($connectionName, $job)
411
    {
412
        $this->dispatcher->dispatch(self::EVENT_RAISE_AFTER_JOB, new JobProcessingEvent($connectionName, $job));
413
    }
414
415
    /**
416
     * Raise the after queue job event.
417
     *
418
     * @param string $connectionName
419
     * @param \Illuminate\Contracts\Queue\Job $job
420
     */
421
    protected function raiseAfterJobEvent($connectionName, $job)
422
    {
423
        $this->dispatcher->dispatch(self::EVENT_RAISE_AFTER_JOB, new JobProcessedEvent($connectionName, $job));
424
    }
425
426
    /**
427
     * Raise the exception occurred queue job event.
428
     *
429
     * @param string $connectionName
430
     * @param \Illuminate\Contracts\Queue\Job $job
431
     * @param \Exception $e
432
     */
433
    protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e)
434
    {
435
        $this->dispatcher->dispatch(self::EVENT_RAISE_EXCEPTION_OCCURED_JOB, new JobExceptionOccurredEvent($connectionName, $job, $e));
436
    }
437
438
    /**
439
     * Raise the failed queue job event.
440
     *
441
     * @param string $connectionName
442
     * @param \Illuminate\Contracts\Queue\Job $job
443
     * @param \Exception $e
444
     */
445
    protected function raiseFailedJobEvent($connectionName, $job, $e)
446
    {
447
        $this->dispatcher->dispatch(self::EVENT_RAISE_FAILED_JOB, new JobFailedEvent($connectionName, $job, $e));
448
    }
449
}
450