GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#247)
by
unknown
01:27
created

QueuedJobService::onShutdown()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
3
namespace Symbiote\QueuedJobs\Services;
4
5
use Exception;
6
use Monolog\Handler\BufferHandler;
7
use Monolog\Logger;
8
use Psr\Log\LoggerInterface;
9
use SilverStripe\Control\Controller;
10
use SilverStripe\Control\Director;
11
use SilverStripe\Control\Email\Email;
12
use SilverStripe\Core\Config\Config;
13
use SilverStripe\Core\Config\Configurable;
14
use SilverStripe\Core\Extensible;
15
use SilverStripe\Core\Injector\Injectable;
16
use SilverStripe\Core\Injector\Injector;
17
use SilverStripe\ORM\DataList;
18
use SilverStripe\ORM\DataObject;
19
use SilverStripe\ORM\DB;
20
use SilverStripe\ORM\FieldType\DBDatetime;
21
use SilverStripe\ORM\ValidationException;
22
use SilverStripe\Security\Member;
23
use SilverStripe\Security\Security;
24
use SilverStripe\Subsites\Model\Subsite;
25
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor;
26
use Symbiote\QueuedJobs\QJUtils;
27
use Symbiote\QueuedJobs\Tasks\Engines\TaskRunnerEngine;
28
29
/**
30
 * A service that can be used for starting, stopping and listing queued jobs.
31
 *
32
 * When a job is first added, it is initialised, its job type determined, then persisted to the database
33
 *
34
 * When the queues are scanned, a job is reloaded and processed. Ignoring the persistence and reloading, it looks
35
 * something like
36
 *
37
 * job->getJobType();
38
 * job->getJobData();
39
 * data->write();
40
 * job->setup();
41
 * while !job->isComplete
42
 *  job->process();
43
 *  job->getJobData();
44
 *  data->write();
45
 *
46
 *
47
 * @author Marcus Nyeholt <[email protected]>
48
 * @license BSD http://silverstripe.org/bsd-license/
49
 */
50
class QueuedJobService
51
{
52
    use Configurable;
53
    use Injectable;
54
    use Extensible;
55
56
    /**
57
     * @config
58
     * @var int
59
     */
60
    private static $stall_threshold = 3;
61
62
    /**
63
     * How much ram will we allow before pausing and releasing the memory?
64
     *
65
     * For instance, set to 268435456 (256MB) to pause this process if used memory exceeds
66
     * this value. This needs to be set to a value lower than the php_ini max_memory as
67
     * the system will otherwise crash before shutdown can be handled gracefully.
68
     *
69
     * This was increased to 256MB for SilverStripe 4.x as framework uses more memory than 3.x
70
     *
71
     * @var int
72
     * @config
73
     */
74
    private static $memory_limit = 268435456;
75
76
    /**
77
     * Optional time limit (in seconds) to run the service before restarting to release resources.
78
     *
79
     * Defaults to no limit.
80
     *
81
     * @var int
82
     * @config
83
     */
84
    private static $time_limit = 0;
85
86
    /**
87
     * Disable health checks that usually occur when a runner first picks up a queue. Note that completely disabling
88
     * health checks could result in many jobs that are always marked as running - that will never be restarted. If
89
     * this option is disabled you may alternatively use the build task
90
     *
91
     * @see \Symbiote\QueuedJobs\Tasks\CheckJobHealthTask
92
     *
93
     * @var bool
94
     * @config
95
     */
96
    private static $disable_health_check = false;
97
98
    /**
99
     * Maximum number of jobs that can be initialised at any one time
100
     * prevents too many jobs getting into this state in case something goes wrong with the child processes
101
     * we shouldn't have too many jobs in the initialising state anyway
102
     *
103
     * valid values:
104
     * 0 - unlimited (default)
105
     * greater than 0 - maximum number of jobs in initialised state
106
     *
107
     * @var int
108
     * @config
109
     */
110
    private static $max_init_jobs = 0;
111
112
    /**
113
     * Timestamp (in seconds) when the queue was started
114
     *
115
     * @var int
116
     */
117
    protected $startedAt = 0;
118
119
    /**
120
     * Should "immediate" jobs be managed using the shutdown function?
121
     *
122
     * It is recommended you set up an inotify watch and use that for
123
     * triggering immediate jobs. See the wiki for more information
124
     *
125
     * @var boolean
126
     * @config
127
     */
128
    private static $use_shutdown_function = true;
129
130
    /**
131
     * The location for immediate jobs to be stored in
132
     *
133
     * @var string
134
     * @config
135
     */
136
    private static $cache_dir = 'queuedjobs';
137
138
    /**
139
     * @var DefaultQueueHandler
140
     */
141
    public $queueHandler;
142
143
    /**
144
     *
145
     * @var TaskRunnerEngine
146
     */
147
    public $queueRunner;
148
149
    /**
150
     * Config controlled list of default/required jobs
151
     *
152
     * @var array
153
     */
154
    public $defaultJobs = [];
155
156
    /**
157
     * Register our shutdown handler
158
     */
159
    public function __construct()
160
    {
161
        // bind a shutdown function to process all 'immediate' queued jobs if needed, but only in CLI mode
162
        if (static::config()->get('use_shutdown_function') && Director::is_cli()) {
163
            register_shutdown_function([$this, 'onShutdown']);
164
        }
165
        if (Config::inst()->get(Email::class, 'queued_job_admin_email') == '') {
166
            Config::modify()->set(
167
                Email::class,
168
                'queued_job_admin_email',
169
                Config::inst()->get(Email::class, 'admin_email')
170
            );
171
        }
172
    }
173
174
    /**
175
     * Adds a job to the queue to be started
176
     *
177
     * @param QueuedJob $job The job to start.
178
     * @param null|string $startAfter The date (in Y-m-d H:i:s format) to start execution after
179
     * @param null|int $userId The ID of a user to execute the job as. Defaults to the current user
180
     * @param null|int $queueName
181
     * @param array $options options which are passed to the updateJobDescriptorBeforeQueued extension point
182
     * @return int
183
     * @throws ValidationException
184
     */
185
    public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $queueName = null, array $options = [])
186
    {
187
        $signature = $job->getSignature();
188
189
        // see if we already have this job in a queue
190
        $filter = [
191
            'Signature' => $signature,
192
            'JobStatus' => [
193
                QueuedJob::STATUS_NEW,
194
                QueuedJob::STATUS_INIT,
195
            ],
196
        ];
197
198
        $existing = QueuedJobDescriptor::get()
199
            ->filter($filter)
200
            ->first();
201
202
        if ($existing && $existing->ID) {
203
            return $existing->ID;
204
        }
205
206
        $jobDescriptor = new QueuedJobDescriptor();
207
        $jobDescriptor->JobTitle = $job->getTitle();
208
        $jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType();
209
        $jobDescriptor->Signature = $signature;
210
        $jobDescriptor->Implementation = get_class($job);
211
        $jobDescriptor->StartAfter = $startAfter;
212
213
        // no user provided - fallback to job user default
214
        if ($userId === null) {
215
            $userId = $job->getRunAsMember();
216
        }
217
218
        // still no user - fallback to current user
219
        if ($userId === null) {
220
            if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) {
221
                // current user available
222
                $runAsID = Security::getCurrentUser()->ID;
223
            } else {
224
                // current user unavailable
225
                $runAsID = 0;
226
            }
227
        } else {
228
            $runAsID = $userId;
229
        }
230
231
        $jobDescriptor->RunAsID = $runAsID;
0 ignored issues
show
Documentation introduced by
The property RunAsID does not exist on object<Symbiote\QueuedJo...ts\QueuedJobDescriptor>. Since you implemented __set, maybe consider adding a @property annotation.

Since your code implements the magic setter _set, this function will be called for any write access on an undefined variable. You can add the @property annotation to your class or interface to document the existence of this variable.

<?php

/**
 * @property int $x
 * @property int $y
 * @property string $text
 */
class MyLabel
{
    private $properties;

    private $allowedProperties = array('x', 'y', 'text');

    public function __get($name)
    {
        if (isset($properties[$name]) && in_array($name, $this->allowedProperties)) {
            return $properties[$name];
        } else {
            return null;
        }
    }

    public function __set($name, $value)
    {
        if (in_array($name, $this->allowedProperties)) {
            $properties[$name] = $value;
        } else {
            throw new \LogicException("Property $name is not defined.");
        }
    }

}

Since the property has write access only, you can use the @property-write annotation instead.

Of course, you may also just have mistyped another name, in which case you should fix the error.

See also the PhpDoc documentation for @property.

Loading history...
232
233
        // copy data
234
        $this->copyJobToDescriptor($job, $jobDescriptor);
235
236
        // use this to populate custom data columns before job is queued
237
        $this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job, $options);
238
239
        $jobDescriptor->write();
240
241
        $this->startJob($jobDescriptor, $startAfter);
242
243
        return $jobDescriptor->ID;
244
    }
245
246
    /**
247
     * Start a job (or however the queue handler determines it should be started)
248
     *
249
     * @param QueuedJobDescriptor $jobDescriptor
250
     * @param string $startAfter
251
     */
252
    public function startJob($jobDescriptor, $startAfter = null)
253
    {
254
        if ($startAfter && strtotime($startAfter) > DBDatetime::now()->getTimestamp()) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $startAfter of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
255
            $this->queueHandler->scheduleJob($jobDescriptor, $startAfter);
256
        } else {
257
            // immediately start it on the queue, however that works
258
            $this->queueHandler->startJobOnQueue($jobDescriptor);
259
        }
260
    }
261
262
    /**
263
     * Check if maximum number of jobs are currently initialised.
264
     *
265
     * @codeCoverageIgnore
266
     * @return bool
267
     */
268
    public function isAtMaxJobs()
269
    {
270
        $initJobsMax = static::config()->get('max_init_jobs');
271
        if (!$initJobsMax) {
272
            return false;
273
        }
274
275
        $initJobsCount = QueuedJobDescriptor::get()
276
            ->filter(['JobStatus' => QueuedJob::STATUS_INIT])
277
            ->count();
278
279
        if ($initJobsCount >= $initJobsMax) {
280
            return true;
281
        }
282
283
        return false;
284
    }
285
286
    /**
287
     * Copies data from a job into a descriptor for persisting
288
     *
289
     * @param QueuedJob $job
290
     * @param QueuedJobDescriptor $jobDescriptor
291
     */
292
    protected function copyJobToDescriptor($job, $jobDescriptor)
293
    {
294
        $data = $job->getJobData();
295
296
        $jobDescriptor->TotalSteps = $data->totalSteps;
297
        $jobDescriptor->StepsProcessed = $data->currentStep;
298
        if ($data->isComplete) {
299
            $jobDescriptor->JobStatus = QueuedJob::STATUS_COMPLETE;
300
            $jobDescriptor->JobFinished = DBDatetime::now()->Rfc2822();
301
        }
302
303
        $jobDescriptor->SavedJobData = serialize($data->jobData);
304
        $jobDescriptor->SavedJobMessages = serialize($data->messages);
305
    }
306
307
    /**
308
     * @param QueuedJobDescriptor $jobDescriptor
309
     * @param QueuedJob $job
310
     */
311
    protected function copyDescriptorToJob($jobDescriptor, $job)
312
    {
313
        $jobData = null;
0 ignored issues
show
Unused Code introduced by
$jobData is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
314
        $messages = null;
0 ignored issues
show
Unused Code introduced by
$messages is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
315
316
        // switching to php's serialize methods... not sure why this wasn't done from the start!
317
        $jobData = @unserialize($jobDescriptor->SavedJobData);
318
        $messages = @unserialize($jobDescriptor->SavedJobMessages);
319
320
        // try decoding as json if null
321
        $jobData = $jobData ?: json_decode($jobDescriptor->SavedJobData);
322
        $messages = $messages ?: json_decode($jobDescriptor->SavedJobMessages);
323
324
        $job->setJobData(
325
            $jobDescriptor->TotalSteps,
326
            $jobDescriptor->StepsProcessed,
327
            $jobDescriptor->JobStatus == QueuedJob::STATUS_COMPLETE,
328
            $jobData,
329
            $messages
330
        );
331
    }
332
333
    /**
334
     * Check the current job queues and see if any of the jobs currently in there should be started. If so,
335
     * return the next job that should be executed
336
     *
337
     * @param string $type Job type
338
     *
339
     * @return QueuedJobDescriptor|false
340
     */
341
    public function getNextPendingJob($type = null)
342
    {
343
        // Filter jobs by type
344
        $type = $type ?: QueuedJob::QUEUED;
345
        $list = QueuedJobDescriptor::get()
346
            ->filter('JobType', $type)
347
            ->sort('ID', 'ASC');
348
349
        // see if there's any blocked jobs that need to be resumed
350
        /** @var QueuedJobDescriptor $waitingJob */
351
        $waitingJob = $list
352
            ->filter('JobStatus', QueuedJob::STATUS_WAIT)
353
            ->first();
354
        if ($waitingJob) {
355
            return $waitingJob;
356
        }
357
358
        // If there's an existing job either running or pending, the lets just return false to indicate
359
        // that we're still executing
360
        $runningJob = $list
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $runningJob is correct as $list->filter('JobStatus...::STATUS_RUN))->first() (which targets SilverStripe\ORM\DataList::first()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
361
            ->filter('JobStatus', [QueuedJob::STATUS_INIT, QueuedJob::STATUS_RUN])
362
            ->first();
363
        if ($runningJob) {
364
            return false;
365
        }
366
367
        // Otherwise, lets find any 'new' jobs that are waiting to execute
368
        /** @var QueuedJobDescriptor $newJob */
369
        $newJob = $list
370
            ->filter('JobStatus', QueuedJob::STATUS_NEW)
371
            ->where(sprintf(
372
                '"StartAfter" < \'%s\' OR "StartAfter" IS NULL',
373
                DBDatetime::now()->getValue()
374
            ))
375
            ->first();
376
377
        return $newJob;
378
    }
379
380
    /**
381
     * Runs an explicit check on all currently running jobs to make sure their "processed" count is incrementing
382
     * between each run. If it's not, then we need to flag it as paused due to an error.
383
     *
384
     * This typically happens when a PHP fatal error is thrown, which can't be picked up by the error
385
     * handler or exception checker; in this case, we detect these stalled jobs later and fix (try) to
386
     * fix them
387
     *
388
     * @param int $queue The queue to check against
389
     */
390
    public function checkJobHealth($queue = null)
391
    {
392
        $queue = $queue ?: QueuedJob::QUEUED;
393
        // Select all jobs currently marked as running
394
        $runningJobs = QueuedJobDescriptor::get()
395
            ->filter([
396
                'JobStatus' => [
397
                    QueuedJob::STATUS_RUN,
398
                    QueuedJob::STATUS_INIT,
399
                ],
400
                'JobType' => $queue,
401
            ]);
402
403
        // If no steps have been processed since the last run, consider it a broken job
404
        // Only check jobs that have been viewed before. LastProcessedCount defaults to -1 on new jobs.
405
        $stalledJobs = $runningJobs
406
            ->filter('LastProcessedCount:GreaterThanOrEqual', 0)
407
            ->where('"StepsProcessed" = "LastProcessedCount"');
408
        foreach ($stalledJobs as $stalledJob) {
409
            $this->restartStalledJob($stalledJob);
410
        }
411
412
        // now, find those that need to be marked before the next check
413
        // foreach job, mark it as having been incremented
414
        foreach ($runningJobs as $job) {
415
            $job->LastProcessedCount = $job->StepsProcessed;
416
            $job->write();
417
        }
418
419
        // finally, find the list of broken jobs and send an email if there's some found
420
        $brokenJobs = QueuedJobDescriptor::get()->filter('JobStatus', QueuedJob::STATUS_BROKEN);
421 View Code Duplication
        if ($brokenJobs && $brokenJobs->count()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
422
            $this->getLogger()->error(
423
                print_r(
424
                    [
425
                        'errno' => 0,
426
                        'errstr' => 'Broken jobs were found in the job queue',
427
                        'errfile' => __FILE__,
428
                        'errline' => __LINE__,
429
                        'errcontext' => [],
430
                    ],
431
                    true
432
                ),
433
                [
434
                    'file' => __FILE__,
435
                    'line' => __LINE__,
436
                ]
437
            );
438
        }
439
440
        return $stalledJobs->count();
441
    }
442
443
    /**
444
     * Checks through ll the scheduled jobs that are expected to exist
445
     */
446
    public function checkDefaultJobs($queue = null)
447
    {
448
        $queue = $queue ?: QueuedJob::QUEUED;
0 ignored issues
show
Unused Code introduced by
$queue is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
449
        if (count($this->defaultJobs)) {
450
            $activeJobs = QueuedJobDescriptor::get()->filter(
451
                'JobStatus',
452
                [
453
                    QueuedJob::STATUS_NEW,
454
                    QueuedJob::STATUS_INIT,
455
                    QueuedJob::STATUS_RUN,
456
                    QueuedJob::STATUS_WAIT,
457
                    QueuedJob::STATUS_PAUSED,
458
                ]
459
            );
460
            foreach ($this->defaultJobs as $title => $jobConfig) {
461
                if (!isset($jobConfig['filter']) || !isset($jobConfig['type'])) {
462
                    $this->getLogger()->error(
463
                        "Default Job config: $title incorrectly set up. Please check the readme for examples",
464
                        [
465
                            'file' => __FILE__,
466
                            'line' => __LINE__,
467
                        ]
468
                    );
469
                    continue;
470
                }
471
                $job = $activeJobs->filter(array_merge(
472
                    ['Implementation' => $jobConfig['type']],
473
                    $jobConfig['filter']
474
                ));
475
                if (!$job->count()) {
476
                    $this->getLogger()->info(
477
                        "Default Job config: $title was missing from Queue",
478
                        [
479
                            'file' => __FILE__,
480
                            'line' => __LINE__,
481
                        ]
482
                    );
483
                    Email::create()
484
                        ->setTo(
485
                            isset($jobConfig['email'])
486
                                ? $jobConfig['email']
487
                                : Config::inst()->get(Email::class, 'queued_job_admin_email')
488
                        )
489
                        ->setFrom(Config::inst()->get(Email::class, 'admin_email'))
490
                        ->setSubject('Default Job "' . $title . '" missing')
491
                        ->setData($jobConfig)
492
                        ->addData('Title', $title)
493
                        ->addData('Site', Director::absoluteBaseURL())
0 ignored issues
show
Security Bug introduced by
It seems like \SilverStripe\Control\Director::absoluteBaseURL() targeting SilverStripe\Control\Director::absoluteBaseURL() can also be of type false; however, SilverStripe\Control\Email\Email::addData() does only seem to accept string|null, did you maybe forget to handle an error condition?
Loading history...
494
                        ->setHTMLTemplate('QueuedJobsDefaultJob')
495
                        ->send();
496
                    if (isset($jobConfig['recreate']) && $jobConfig['recreate']) {
497
                        if (!array_key_exists('construct', $jobConfig)
498
                            || !isset($jobConfig['startDateFormat'])
499
                            || !isset($jobConfig['startTimeString'])
500
                        ) {
501
                            $this->getLogger()->error(
502
                                "Default Job config: $title incorrectly set up. Please check the readme for examples",
503
                                [
504
                                    'file' => __FILE__,
505
                                    'line' => __LINE__,
506
                                ]
507
                            );
508
                            continue;
509
                        }
510
                        QueuedJobService::singleton()->queueJob(
511
                            Injector::inst()->createWithArgs($jobConfig['type'], $jobConfig['construct']),
512
                            date($jobConfig['startDateFormat'], strtotime($jobConfig['startTimeString']))
513
                        );
514
                        $this->getLogger()->info(
515
                            "Default Job config: $title has been re-added to the Queue",
516
                            [
517
                                'file' => __FILE__,
518
                                'line' => __LINE__,
519
                            ]
520
                        );
521
                    }
522
                }
523
            }
524
        }
525
    }
526
527
    /**
528
     * Attempt to restart a stalled job
529
     *
530
     * @param QueuedJobDescriptor $stalledJob
531
     *
532
     * @return bool True if the job was successfully restarted
533
     */
534
    protected function restartStalledJob($stalledJob)
535
    {
536
        if ($stalledJob->ResumeCounts < static::config()->get('stall_threshold')) {
537
            $stalledJob->restart();
538
            $logLevel = 'warning';
539
            $message = _t(
540
                __CLASS__ . '.STALLED_JOB_RESTART_MSG',
541
                'A job named {name} (#{id}) appears to have stalled. It will be stopped and restarted, please '
542
                . 'login to make sure it has continued',
543
                ['name' => $stalledJob->JobTitle, 'id' => $stalledJob->ID]
544
            );
545
        } else {
546
            $stalledJob->pause();
547
            $logLevel = 'error';
548
            $message = _t(
549
                __CLASS__ . '.STALLED_JOB_MSG',
550
                'A job named {name} (#{id}) appears to have stalled. It has been paused, please login to check it',
551
                ['name' => $stalledJob->JobTitle, 'id' => $stalledJob->ID]
552
            );
553
        }
554
555
        $this->getLogger()->log(
556
            $logLevel,
557
            $message,
558
            [
559
                'file' => __FILE__,
560
                'line' => __LINE__,
561
            ]
562
        );
563
        $from = Config::inst()->get(Email::class, 'admin_email');
564
        $to = Config::inst()->get(Email::class, 'queued_job_admin_email');
565
        $subject = _t(__CLASS__ . '.STALLED_JOB', 'Stalled job');
566
567
        if ($to) {
568
            $mail = Email::create($from, $to, $subject)
569
                ->setData([
570
                    'JobID' => $stalledJob->ID,
571
                    'Message' => $message,
572
                    'Site' => Director::absoluteBaseURL(),
573
                ])
574
                ->setHTMLTemplate('QueuedJobsStalledJob');
575
            $mail->send();
576
        }
577
    }
578
579
    /**
580
     * Prepares the given jobDescriptor for execution. Returns the job that
581
     * will actually be run in a state ready for executing.
582
     *
583
     * Note that this is called each time a job is picked up to be executed from the cron
584
     * job - meaning that jobs that are paused and restarted will have 'setup()' called on them again,
585
     * so your job MUST detect that and act accordingly.
586
     *
587
     * @param QueuedJobDescriptor $jobDescriptor
588
     *          The Job descriptor of a job to prepare for execution
589
     *
590
     * @return QueuedJob|boolean
591
     * @throws Exception
592
     */
593
    protected function initialiseJob(QueuedJobDescriptor $jobDescriptor)
594
    {
595
        // create the job class
596
        $impl = $jobDescriptor->Implementation;
597
        /** @var QueuedJob $job */
598
        $job = Injector::inst()->create($impl);
599
        /* @var $job QueuedJob */
600
        if (!$job) {
601
            throw new Exception("Implementation $impl no longer exists");
602
        }
603
604
        $jobDescriptor->JobStatus = QueuedJob::STATUS_INIT;
605
        $jobDescriptor->write();
606
607
        // make sure the data is there
608
        $this->copyDescriptorToJob($jobDescriptor, $job);
609
610
        // see if it needs 'setup' or 'restart' called
611
        if ($jobDescriptor->StepsProcessed <= 0) {
612
            $job->setup();
613
        } else {
614
            $job->prepareForRestart();
615
        }
616
617
        // make sure the descriptor is up to date with anything changed
618
        $this->copyJobToDescriptor($job, $jobDescriptor);
619
        $jobDescriptor->write();
620
621
        return $job;
622
    }
623
624
    /**
625
     * Given a {@link QueuedJobDescriptor} mark the job as initialised. Works sort of like a mutex.
626
     * Currently a database lock isn't entirely achievable, due to database adapters not supporting locks.
627
     * This may still have a race condition, but this should minimise the possibility.
628
     * Side effect is the job status will be changed to "Initialised".
629
     *
630
     * Assumption is the job has a status of "Queued" or "Wait".
631
     *
632
     * @param QueuedJobDescriptor $jobDescriptor
633
     *
634
     * @return boolean
635
     */
636
    protected function grabMutex(QueuedJobDescriptor $jobDescriptor)
637
    {
638
        // write the status and determine if any rows were affected, for protection against a
639
        // potential race condition where two or more processes init the same job at once.
640
        // This deliberately does not use write() as that would always update LastEdited
641
        // and thus the row would always be affected.
642
        try {
643
            DB::query(sprintf(
644
                'UPDATE "QueuedJobDescriptor" SET "JobStatus" = \'%s\' WHERE "ID" = %s',
645
                QueuedJob::STATUS_INIT,
646
                $jobDescriptor->ID
647
            ));
648
        } catch (Exception $e) {
649
            return false;
650
        }
651
652
        if (DB::get_conn()->affectedRows() === 0 && $jobDescriptor->JobStatus !== QueuedJob::STATUS_INIT) {
653
            return false;
654
        }
655
656
        return true;
657
    }
658
659
    /**
660
     * Start the actual execution of a job.
661
     * The assumption is the jobID refers to a {@link QueuedJobDescriptor} that is status set as "Queued".
662
     *
663
     * This method will continue executing until the job says it's completed
664
     *
665
     * @param int $jobId
666
     *          The ID of the job to start executing
667
     *
668
     * @return boolean
669
     * @throws Exception
670
     */
671
    public function runJob($jobId)
672
    {
673
        // first retrieve the descriptor
674
        /** @var QueuedJobDescriptor $jobDescriptor */
675
        $jobDescriptor = DataObject::get_by_id(
676
            QueuedJobDescriptor::class,
677
            (int)$jobId
678
        );
679
        if (!$jobDescriptor) {
680
            throw new Exception("$jobId is invalid");
681
        }
682
683
        // now lets see whether we have a current user to run as. Typically, if the job is executing via the CLI,
684
        // we want it to actually execute as the RunAs user - however, if running via the web (which is rare...), we
685
        // want to ensure that the current user has admin privileges before switching. Otherwise, we just run it
686
        // as the currently logged in user and hope for the best
687
688
        // We need to use $_SESSION directly because SS ties the session to a controller that no longer exists at
689
        // this point of execution in some circumstances
690
        $originalUserID = isset($_SESSION['loggedInAs']) ? $_SESSION['loggedInAs'] : 0;
691
        /** @var Member|null $originalUser */
692
        $originalUser = $originalUserID
693
            ? DataObject::get_by_id(Member::class, $originalUserID)
694
            : null;
695
        $runAsUser = null;
696
697
        // If the Job has requested that we run it as a particular user, then we should try and do that.
698
        if ($jobDescriptor->RunAs() !== null) {
699
            $runAsUser = $this->setRunAsUser($jobDescriptor->RunAs(), $originalUser);
700
        }
701
702
        // set up a custom error handler for this processing
703
        $errorHandler = new JobErrorHandler();
704
705
        $job = null;
706
707
        $broken = false;
708
709
        // Push a config context onto the stack for the duration of this job run.
710
        Config::nest();
711
712
        if ($this->grabMutex($jobDescriptor)) {
713
            try {
714
                $job = $this->initialiseJob($jobDescriptor);
715
716
                // get the job ready to begin.
717
                if (!$jobDescriptor->JobStarted) {
718
                    $jobDescriptor->JobStarted = DBDatetime::now()->Rfc2822();
719
                } else {
720
                    $jobDescriptor->JobRestarted = DBDatetime::now()->Rfc2822();
0 ignored issues
show
Documentation introduced by
The property JobRestarted does not exist on object<Symbiote\QueuedJo...ts\QueuedJobDescriptor>. Since you implemented __set, maybe consider adding a @property annotation.

Since your code implements the magic setter _set, this function will be called for any write access on an undefined variable. You can add the @property annotation to your class or interface to document the existence of this variable.

<?php

/**
 * @property int $x
 * @property int $y
 * @property string $text
 */
class MyLabel
{
    private $properties;

    private $allowedProperties = array('x', 'y', 'text');

    public function __get($name)
    {
        if (isset($properties[$name]) && in_array($name, $this->allowedProperties)) {
            return $properties[$name];
        } else {
            return null;
        }
    }

    public function __set($name, $value)
    {
        if (in_array($name, $this->allowedProperties)) {
            $properties[$name] = $value;
        } else {
            throw new \LogicException("Property $name is not defined.");
        }
    }

}

Since the property has write access only, you can use the @property-write annotation instead.

Of course, you may also just have mistyped another name, in which case you should fix the error.

See also the PhpDoc documentation for @property.

Loading history...
721
                }
722
723
                // Only write to job as "Running" if 'isComplete' was NOT set to true
724
                // during setup() or prepareForRestart()
725
                if (!$job->jobFinished()) {
726
                    $jobDescriptor->JobStatus = QueuedJob::STATUS_RUN;
727
                    $jobDescriptor->write();
728
                }
729
730
                $lastStepProcessed = 0;
731
                // have we stalled at all?
732
                $stallCount = 0;
733
734
                if (class_exists(Subsite::class) && !empty($job->SubsiteID)) {
735
                    Subsite::changeSubsite($job->SubsiteID);
736
737
                    // lets set the base URL as far as Director is concerned so that our URLs are correct
738
                    /** @var Subsite $subsite */
739
                    $subsite = DataObject::get_by_id(Subsite::class, $job->SubsiteID);
740
                    if ($subsite && $subsite->exists()) {
741
                        $domain = $subsite->domain();
742
                        $base = rtrim(Director::protocol() . $domain, '/') . '/';
743
744
                        Config::modify()->set(Director::class, 'alternate_base_url', $base);
745
                    }
746
                }
747
748
                // while not finished
749
                while (!$job->jobFinished() && !$broken) {
750
                    // see that we haven't been set to 'paused' or otherwise by another process
751
                    /** @var QueuedJobDescriptor $jobDescriptor */
752
                    $jobDescriptor = DataObject::get_by_id(
753
                        QueuedJobDescriptor::class,
754
                        (int)$jobId
755
                    );
756
                    if (!$jobDescriptor || !$jobDescriptor->exists()) {
757
                        $broken = true;
758
                        $this->getLogger()->error(
759
                            print_r(
760
                                [
761
                                    'errno' => 0,
762
                                    'errstr' => 'Job descriptor ' . $jobId . ' could not be found',
763
                                    'errfile' => __FILE__,
764
                                    'errline' => __LINE__,
765
                                    'errcontext' => [],
766
                                ],
767
                                true
768
                            ),
769
                            [
770
                                'file' => __FILE__,
771
                                'line' => __LINE__,
772
                            ]
773
                        );
774
                        break;
775
                    }
776 View Code Duplication
                    if ($jobDescriptor->JobStatus != QueuedJob::STATUS_RUN) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
777
                        // we've been paused by something, so we'll just exit
778
                        $job->addMessage(_t(
779
                            __CLASS__ . '.JOB_PAUSED',
780
                            'Job paused at {time}',
781
                            ['time' => DBDatetime::now()->Rfc2822()]
782
                        ));
783
                        $broken = true;
784
                    }
785
786
                    if (!$broken) {
787
                        // Inject real-time log handler
788
                        $logger = Injector::inst()->get(LoggerInterface::class);
789
                        if ($logger instanceof Logger) {
790
                            // Check if there is already a handler
791
                            $exists = false;
792
                            foreach ($logger->getHandlers() as $handler) {
793
                                if ($handler instanceof QueuedJobHandler) {
794
                                    $exists = true;
795
                                    break;
796
                                }
797
                            }
798
799
                            if (!$exists) {
800
                                // Add the handler
801
                                /** @var QueuedJobHandler $queuedJobHandler */
802
                                $queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor);
803
804
                                // We only write for every 100 file
805
                                $bufferHandler = new BufferHandler(
806
                                    $queuedJobHandler,
807
                                    100,
808
                                    Logger::DEBUG,
809
                                    true,
810
                                    true
811
                                );
812
813
                                $logger->pushHandler($bufferHandler);
814
                            }
815
                        } else {
816
                            if ($logger instanceof LoggerInterface) {
817
                                $logger->warning(
818
                                    'Monolog not found, messages will not output while the job is running'
819
                                );
820
                            }
821
                        }
822
823
                        // Collect output as job messages as well as sending it to the screen after processing
824
                        $obLogger = function ($buffer, $phase) use ($job, $jobDescriptor) {
0 ignored issues
show
Unused Code introduced by
The parameter $phase is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
825
                            $job->addMessage($buffer);
826
                            if ($jobDescriptor) {
827
                                $this->copyJobToDescriptor($job, $jobDescriptor);
828
                                $jobDescriptor->write();
829
                            }
830
                            return $buffer;
831
                        };
832
                        ob_start($obLogger, 256);
833
834
                        try {
835
                            $job->process();
836
                        } catch (Exception $e) {
837
                            // okay, we'll just catch this exception for now
838
                            $job->addMessage(
839
                                _t(
840
                                    __CLASS__ . '.JOB_EXCEPT',
841
                                    'Job caused exception {message} in {file} at line {line}',
842
                                    [
843
                                        'message' => $e->getMessage(),
844
                                        'file' => $e->getFile(),
845
                                        'line' => $e->getLine(),
846
                                    ]
847
                                )
848
                            );
849
                            $this->getLogger()->error(
850
                                $e->getMessage(),
851
                                [
852
                                    'exception' => $e,
853
                                ]
854
                            );
855
                            $jobDescriptor->JobStatus =  QueuedJob::STATUS_BROKEN;
856
                            $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e);
857
                        }
858
859
                        // Write any remaining batched messages at the end
860
                        if (isset($bufferHandler)) {
861
                            $bufferHandler->flush();
862
                        }
863
864
                        ob_end_flush();
865
866
                        // now check the job state
867
                        $data = $job->getJobData();
868
                        if ($data->currentStep == $lastStepProcessed) {
869
                            $stallCount++;
870
                        }
871
872 View Code Duplication
                        if ($stallCount > static::config()->get('stall_threshold')) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
873
                            $broken = true;
874
                            $job->addMessage(
875
                                _t(
876
                                    __CLASS__ . '.JOB_STALLED',
877
                                    'Job stalled after {attempts} attempts - please check',
878
                                    ['attempts' => $stallCount]
879
                                )
880
                            );
881
                            $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN;
882
                        }
883
884
                        // now we'll be good and check our memory usage. If it is too high, we'll set the job to
885
                        // a 'Waiting' state, and let the next processing run pick up the job.
886
                        if ($this->isMemoryTooHigh()) {
887
                            $job->addMessage(
888
                                _t(
889
                                    __CLASS__ . '.MEMORY_RELEASE',
890
                                    'Job releasing memory and waiting ({used} used)',
891
                                    ['used' => $this->humanReadable($this->getMemoryUsage())]
892
                                )
893
                            );
894
                            if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) {
895
                                $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT;
896
                            }
897
                            $broken = true;
898
                        }
899
900
                        // Also check if we are running too long
901
                        if ($this->hasPassedTimeLimit()) {
902
                            $job->addMessage(_t(
903
                                __CLASS__ . '.TIME_LIMIT',
904
                                'Queue has passed time limit and will restart before continuing'
905
                            ));
906
                            if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) {
907
                                $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT;
908
                            }
909
                            $broken = true;
910
                        }
911
                    }
912
913
                    if ($jobDescriptor) {
914
                        $this->copyJobToDescriptor($job, $jobDescriptor);
915
                        $jobDescriptor->write();
916 View Code Duplication
                    } else {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
917
                        $this->getLogger()->error(
918
                            print_r(
919
                                [
920
                                    'errno' => 0,
921
                                    'errstr' => 'Job descriptor has been set to null',
922
                                    'errfile' => __FILE__,
923
                                    'errline' => __LINE__,
924
                                    'errcontext' => [],
925
                                ],
926
                                true
927
                            ),
928
                            [
929
                                'file' => __FILE__,
930
                                'line' => __LINE__,
931
                            ]
932
                        );
933
                        $broken = true;
934
                    }
935
                }
936
937
                // a last final save. The job is complete by now
938
                if ($jobDescriptor) {
939
                    $jobDescriptor->write();
940
                }
941
942
                if ($job->jobFinished()) {
943
                    /** @var AbstractQueuedJob|QueuedJob $job */
944
                    $job->afterComplete();
945
                    $jobDescriptor->cleanupJob();
946
947
                    $this->extend('updateJobDescriptorAndJobOnCompletion', $jobDescriptor, $job);
948
                }
949
            } catch (Exception $e) {
950
                // PHP 5.6 exception handling
951
                $this->handleBrokenJobException($jobDescriptor, $job, $e);
952
                $broken = true;
953
            } catch (\Throwable $e) {
0 ignored issues
show
Bug introduced by
The class Throwable does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
954
                // PHP 7 exception handling
955
                $this->handleBrokenJobException($jobDescriptor, $job, $e);
956
                $broken = true;
957
            }
958
        }
959
960
        $errorHandler->clear();
961
962
        Config::unnest();
963
964
        $this->unsetRunAsUser($runAsUser, $originalUser);
965
966
        return !$broken;
967
    }
968
969
    /**
970
     * @param QueuedJobDescriptor $jobDescriptor
971
     * @param QueuedJob $job
972
     * @param $e
973
     * @throws ValidationException
974
     */
975
    protected function handleBrokenJobException(QueuedJobDescriptor $jobDescriptor, QueuedJob $job, $e)
976
    {
977
        // okay, we'll just catch this exception for now
978
        $this->getLogger()->error(
979
            $e->getMessage(),
980
            [
981
                'exception' => $e,
982
            ]
983
        );
984
        $jobDescriptor->JobStatus =  QueuedJob::STATUS_BROKEN;
985
        $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e);
986
        $jobDescriptor->write();
987
    }
988
989
    /**
990
     * @param Member $runAsUser
991
     * @param Member|null $originalUser
992
     * @return null|Member
993
     */
994
    protected function setRunAsUser(Member $runAsUser, Member $originalUser = null)
995
    {
996
        // Sanity check. Can't set the user if they don't exist.
997
        if ($runAsUser === null || !$runAsUser->exists()) {
998
            return null;
999
        }
1000
1001
        // Don't need to set Security user if we're already logged in as that same user.
1002
        if ($originalUser && $originalUser->ID === $runAsUser->ID) {
1003
            return null;
1004
        }
1005
1006
        // We are currently either not logged in at all, or we're logged in as a different user. We should switch users
1007
        // so that the context within the Job is correct.
1008
        if (Controller::has_curr()) {
1009
            Security::setCurrentUser($runAsUser);
1010
        } else {
1011
            $_SESSION['loggedInAs'] = $runAsUser->ID;
1012
        }
1013
1014
        // This is an explicit coupling brought about by SS not having a nice way of mocking a user, as it requires
1015
        // session nastiness
1016
        if (class_exists('SecurityContext')) {
1017
            singleton('SecurityContext')->setMember($runAsUser);
1018
        }
1019
1020
        return $runAsUser;
1021
    }
1022
1023
    /**
1024
     * @param Member|null $runAsUser
1025
     * @param Member|null $originalUser
1026
     */
1027
    protected function unsetRunAsUser(Member $runAsUser = null, Member $originalUser = null)
1028
    {
1029
        // No runAsUser was set, so we don't need to do anything.
1030
        if ($runAsUser === null) {
1031
            return;
1032
        }
1033
1034
        // There was no originalUser, so we should make sure that we set the user back to null.
1035
        if (!$originalUser) {
1036
            if (Controller::has_curr()) {
1037
                Security::setCurrentUser(null);
1038
            } else {
1039
                $_SESSION['loggedInAs'] = null;
1040
            }
1041
1042
            return;
1043
        }
1044
1045
        // Okay let's reset our user.
1046
        if (Controller::has_curr()) {
1047
            Security::setCurrentUser($originalUser);
1048
        } else {
1049
            $_SESSION['loggedInAs'] = $originalUser->ID;
1050
        }
1051
    }
1052
1053
    /**
1054
     * Start timer
1055
     */
1056
    protected function markStarted()
1057
    {
1058
        if (!$this->startedAt) {
1059
            $this->startedAt = DBDatetime::now()->getTimestamp();
1060
        }
1061
    }
1062
1063
    /**
1064
     * Is execution time too long?
1065
     *
1066
     * @return bool True if the script has passed the configured time_limit
1067
     */
1068
    protected function hasPassedTimeLimit()
1069
    {
1070
        // Ensure a limit exists
1071
        $limit = static::config()->get('time_limit');
1072
        if (!$limit) {
1073
            return false;
1074
        }
1075
1076
        // Ensure started date is set
1077
        $this->markStarted();
1078
1079
        // Check duration
1080
        $now = DBDatetime::now()->getTimestamp();
1081
        return $now > $this->startedAt + $limit;
1082
    }
1083
1084
    /**
1085
     * Is memory usage too high?
1086
     *
1087
     * @return bool
1088
     */
1089
    protected function isMemoryTooHigh()
1090
    {
1091
        $used = $this->getMemoryUsage();
1092
        $limit = $this->getMemoryLimit();
1093
        return $limit && ($used > $limit);
1094
    }
1095
1096
    /**
1097
     * Get peak memory usage of this application
1098
     *
1099
     * @return float
1100
     */
1101
    protected function getMemoryUsage()
1102
    {
1103
        // Note we use real_usage = false
1104
        // http://stackoverflow.com/questions/15745385/memory-get-peak-usage-with-real-usage
1105
        // Also we use the safer peak memory usage
1106
        return (float)memory_get_peak_usage(false);
1107
    }
1108
1109
    /**
1110
     * Determines the memory limit (in bytes) for this application
1111
     * Limits to the smaller of memory_limit configured via php.ini or silverstripe config
1112
     *
1113
     * @return float Memory limit in bytes
1114
     */
1115
    protected function getMemoryLimit()
1116
    {
1117
        // Limit to smaller of explicit limit or php memory limit
1118
        $limit = $this->parseMemory(static::config()->get('memory_limit'));
1119
        if ($limit) {
1120
            return $limit;
1121
        }
1122
1123
        // Fallback to php memory limit
1124
        $phpLimit = $this->getPHPMemoryLimit();
1125
        if ($phpLimit) {
1126
            return $phpLimit;
1127
        }
1128
    }
1129
1130
    /**
1131
     * Calculate the current memory limit of the server
1132
     *
1133
     * @return float
1134
     */
1135
    protected function getPHPMemoryLimit()
1136
    {
1137
        return $this->parseMemory(trim(ini_get("memory_limit")));
1138
    }
1139
1140
    /**
1141
     * Convert memory limit string to bytes.
1142
     * Based on implementation in install.php5
1143
     *
1144
     * @param string $memString
1145
     *
1146
     * @return float
1147
     */
1148
    protected function parseMemory($memString)
1149
    {
1150
        switch (strtolower(substr($memString, -1))) {
1151
            case "b":
1152
                return round(substr($memString, 0, -1));
1153
            case "k":
1154
                return round(substr($memString, 0, -1) * 1024);
1155
            case "m":
1156
                return round(substr($memString, 0, -1) * 1024 * 1024);
1157
            case "g":
1158
                return round(substr($memString, 0, -1) * 1024 * 1024 * 1024);
1159
            default:
1160
                return round($memString);
1161
        }
1162
    }
1163
1164
    protected function humanReadable($size)
1165
    {
1166
        $filesizename = [" Bytes", " KB", " MB", " GB", " TB", " PB", " EB", " ZB", " YB"];
1167
        return $size ? round($size / pow(1024, ($i = floor(log($size, 1024)))), 2) . $filesizename[$i] : '0 Bytes';
1168
    }
1169
1170
1171
    /**
1172
     * Gets a list of all the current jobs (or jobs that have recently finished)
1173
     *
1174
     * @param string $type
1175
     *          if we're after a particular job list
1176
     * @param int $includeUpUntil
1177
     *          The number of seconds to include jobs that have just finished, allowing a job list to be built that
1178
     *          includes recently finished jobs
1179
     *
1180
     * @return DataList|QueuedJobDescriptor[]
1181
     */
1182
    public function getJobList($type = null, $includeUpUntil = 0)
1183
    {
1184
        return DataObject::get(
1185
            QueuedJobDescriptor::class,
1186
            $this->getJobListFilter($type, $includeUpUntil)
1187
        );
1188
    }
1189
1190
    /**
1191
     * Return the SQL filter used to get the job list - this is used by the UI for displaying the job list...
1192
     *
1193
     * @param string $type
1194
     *          if we're after a particular job list
1195
     * @param int $includeUpUntil
1196
     *          The number of seconds to include jobs that have just finished, allowing a job list to be built that
1197
     *          includes recently finished jobs
1198
     *
1199
     * @return string
1200
     */
1201
    public function getJobListFilter($type = null, $includeUpUntil = 0)
1202
    {
1203
        $util = singleton(QJUtils::class);
1204
1205
        $filter = ['JobStatus <>' => QueuedJob::STATUS_COMPLETE];
1206
        if ($includeUpUntil) {
1207
            $filter['JobFinished > '] = DBDatetime::create()->setValue(
1208
                DBDatetime::now()->getTimestamp() - $includeUpUntil
1209
            )->Rfc2822();
1210
        }
1211
1212
        $filter = $util->dbQuote($filter, ' OR ');
1213
1214
        if ($type) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $type of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
1215
            $filter = $util->dbQuote(['JobType =' => (string)$type]) . ' AND (' . $filter . ')';
1216
        }
1217
1218
        return $filter;
1219
    }
1220
1221
    /**
1222
     * Process the job queue with the current queue runner
1223
     *
1224
     * @param string $queue
1225
     */
1226
    public function runQueue($queue)
1227
    {
1228
        if (!self::config()->get('disable_health_check')) {
1229
            $this->checkJobHealth($queue);
1230
        }
1231
        $this->checkdefaultJobs($queue);
1232
        $this->queueRunner->runQueue($queue);
1233
    }
1234
1235
    /**
1236
     * Process all jobs from a given queue
1237
     *
1238
     * @param string $name The job queue to completely process
1239
     */
1240
    public function processJobQueue($name)
1241
    {
1242
        // Start timer to measure lifetime
1243
        $this->markStarted();
1244
1245
        // Begin main loop
1246
        do {
1247
            if (class_exists(Subsite::class)) {
1248
                // clear subsite back to default to prevent any subsite changes from leaking to
1249
                // subsequent actions
1250
                Subsite::changeSubsite(0);
1251
            }
1252
1253
            $job = $this->getNextPendingJob($name);
1254
            if ($job) {
1255
                $success = $this->runJob($job->ID);
1256
                if (!$success) {
1257
                    // make sure job is null so it doesn't continue the current
1258
                    // processing loop. Next queue executor can pick up where
1259
                    // things left off
1260
                    $job = null;
1261
                }
1262
            }
1263
        } while ($job);
1264
    }
1265
1266
    /**
1267
     * When PHP shuts down, we want to process all of the immediate queue items
1268
     *
1269
     * We use the 'getNextPendingJob' method, instead of just iterating the queue, to ensure
1270
     * we ignore paused or stalled jobs.
1271
     */
1272
    public function onShutdown()
1273
    {
1274
        $this->processJobQueue(QueuedJob::IMMEDIATE);
1275
    }
1276
1277
    /**
1278
     * Get a logger
1279
     *
1280
     * @return LoggerInterface
1281
     */
1282
    public function getLogger()
1283
    {
1284
        return Injector::inst()->get(LoggerInterface::class);
1285
    }
1286
}
1287