GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 99e934...9ad84e )
by Robbie
12s
created

QueuedJobService::handleBrokenJobException()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 13
rs 9.8333
c 0
b 0
f 0
cc 1
nc 1
nop 3
1
<?php
2
3
namespace Symbiote\QueuedJobs\Services;
4
5
use Exception;
6
use Monolog\Handler\BufferHandler;
7
use Monolog\Logger;
8
use Psr\Log\LoggerInterface;
9
use SilverStripe\Control\Controller;
10
use SilverStripe\Control\Director;
11
use SilverStripe\Control\Email\Email;
12
use SilverStripe\Core\Config\Config;
13
use SilverStripe\Core\Config\Configurable;
14
use SilverStripe\Core\Extensible;
15
use SilverStripe\Core\Injector\Injectable;
16
use SilverStripe\Core\Injector\Injector;
17
use SilverStripe\ORM\DataList;
18
use SilverStripe\ORM\DataObject;
19
use SilverStripe\ORM\DB;
20
use SilverStripe\ORM\FieldType\DBDatetime;
21
use SilverStripe\ORM\ValidationException;
22
use SilverStripe\Security\Member;
23
use SilverStripe\Security\Security;
24
use SilverStripe\Subsites\Model\Subsite;
25
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor;
26
use Symbiote\QueuedJobs\QJUtils;
27
use Symbiote\QueuedJobs\Tasks\Engines\TaskRunnerEngine;
28
29
/**
30
 * A service that can be used for starting, stopping and listing queued jobs.
31
 *
32
 * When a job is first added, it is initialised, its job type determined, then persisted to the database
33
 *
34
 * When the queues are scanned, a job is reloaded and processed. Ignoring the persistence and reloading, it looks
35
 * something like
36
 *
37
 * job->getJobType();
38
 * job->getJobData();
39
 * data->write();
40
 * job->setup();
41
 * while !job->isComplete
42
 *  job->process();
43
 *  job->getJobData();
44
 *  data->write();
45
 *
46
 *
47
 * @author Marcus Nyeholt <[email protected]>
48
 * @license BSD http://silverstripe.org/bsd-license/
49
 */
50
class QueuedJobService
51
{
52
    use Configurable;
53
    use Injectable;
54
    use Extensible;
55
56
    /**
57
     * @config
58
     * @var int
59
     */
60
    private static $stall_threshold = 3;
61
62
    /**
63
     * How much ram will we allow before pausing and releasing the memory?
64
     *
65
     * For instance, set to 268435456 (256MB) to pause this process if used memory exceeds
66
     * this value. This needs to be set to a value lower than the php_ini max_memory as
67
     * the system will otherwise crash before shutdown can be handled gracefully.
68
     *
69
     * This was increased to 256MB for SilverStripe 4.x as framework uses more memory than 3.x
70
     *
71
     * @var int
72
     * @config
73
     */
74
    private static $memory_limit = 268435456;
75
76
    /**
77
     * Optional time limit (in seconds) to run the service before restarting to release resources.
78
     *
79
     * Defaults to no limit.
80
     *
81
     * @var int
82
     * @config
83
     */
84
    private static $time_limit = 0;
85
86
    /**
87
     * Disable health checks that usually occur when a runner first picks up a queue. Note that completely disabling
88
     * health checks could result in many jobs that are always marked as running - that will never be restarted. If
89
     * this option is disabled you may alternatively use the build task
90
     *
91
     * @see \Symbiote\QueuedJobs\Tasks\CheckJobHealthTask
92
     *
93
     * @var bool
94
     * @config
95
     */
96
    private static $disable_health_check = false;
97
98
    /**
99
     * Maximum number of jobs that can be initialised at any one time.
100
     *
101
     * Prevents too many jobs getting into this state in case something goes wrong with the child processes.
102
     * We shouldn't have too many jobs in the initialising state anyway.
103
     *
104
     * Valid values:
105
     * 0 - unlimited (default)
106
     * greater than 0 - maximum number of jobs in initialised state
107
     *
108
     * @var int
109
     * @config
110
     */
111
    private static $max_init_jobs = 0;
112
113
    /**
114
     * Timestamp (in seconds) when the queue was started
115
     *
116
     * @var int
117
     */
118
    protected $startedAt = 0;
119
120
    /**
121
     * Should "immediate" jobs be managed using the shutdown function?
122
     *
123
     * It is recommended you set up an inotify watch and use that for
124
     * triggering immediate jobs. See the wiki for more information
125
     *
126
     * @var boolean
127
     * @config
128
     */
129
    private static $use_shutdown_function = true;
130
131
    /**
132
     * The location for immediate jobs to be stored in
133
     *
134
     * @var string
135
     * @config
136
     */
137
    private static $cache_dir = 'queuedjobs';
138
139
    /**
140
     * @var DefaultQueueHandler
141
     */
142
    public $queueHandler;
143
144
    /**
145
     *
146
     * @var TaskRunnerEngine
147
     */
148
    public $queueRunner;
149
150
    /**
151
     * Config controlled list of default/required jobs
152
     *
153
     * @var array
154
     */
155
    public $defaultJobs = [];
156
157
    /**
158
     * Register our shutdown handler
159
     */
160
    public function __construct()
161
    {
162
        // bind a shutdown function to process all 'immediate' queued jobs if needed, but only in CLI mode
163
        if (static::config()->get('use_shutdown_function') && Director::is_cli()) {
164
            register_shutdown_function([$this, 'onShutdown']);
165
        }
166
        if (Config::inst()->get(Email::class, 'queued_job_admin_email') == '') {
167
            Config::modify()->set(
168
                Email::class,
169
                'queued_job_admin_email',
170
                Config::inst()->get(Email::class, 'admin_email')
171
            );
172
        }
173
    }
174
175
    /**
176
     * Adds a job to the queue to be started
177
     *
178
     * @param QueuedJob $job The job to start.
179
     * @param null|string $startAfter The date (in Y-m-d H:i:s format) to start execution after
180
     * @param null|int $userId The ID of a user to execute the job as. Defaults to the current user
181
     * @param null|int $queueName
182
     * @return int
183
     * @throws ValidationException
184
     */
185
    public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $queueName = null)
186
    {
187
        $signature = $job->getSignature();
188
189
        // see if we already have this job in a queue
190
        $filter = [
191
            'Signature' => $signature,
192
            'JobStatus' => [
193
                QueuedJob::STATUS_NEW,
194
                QueuedJob::STATUS_INIT,
195
            ],
196
        ];
197
198
        $existing = QueuedJobDescriptor::get()
199
            ->filter($filter)
200
            ->first();
201
202
        if ($existing && $existing->ID) {
203
            return $existing->ID;
204
        }
205
206
        $jobDescriptor = new QueuedJobDescriptor();
207
        $jobDescriptor->JobTitle = $job->getTitle();
208
        $jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType();
209
        $jobDescriptor->Signature = $signature;
210
        $jobDescriptor->Implementation = get_class($job);
211
        $jobDescriptor->StartAfter = $startAfter;
212
213
        // no user provided - fallback to job user default
214
        if ($userId === null) {
215
            $userId = $job->getRunAsMemberID();
216
        }
217
218
        // still no user - fallback to current user
219
        if ($userId === null) {
220
            if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) {
221
                // current user available
222
                $runAsID = Security::getCurrentUser()->ID;
223
            } else {
224
                // current user unavailable
225
                $runAsID = 0;
226
            }
227
        } else {
228
            $runAsID = $userId;
229
        }
230
231
        $jobDescriptor->RunAsID = $runAsID;
0 ignored issues
show
Documentation introduced by
The property RunAsID does not exist on object<Symbiote\QueuedJo...ts\QueuedJobDescriptor>. Since you implemented __set, maybe consider adding a @property annotation.

Since your code implements the magic setter _set, this function will be called for any write access on an undefined variable. You can add the @property annotation to your class or interface to document the existence of this variable.

<?php

/**
 * @property int $x
 * @property int $y
 * @property string $text
 */
class MyLabel
{
    private $properties;

    private $allowedProperties = array('x', 'y', 'text');

    public function __get($name)
    {
        if (isset($properties[$name]) && in_array($name, $this->allowedProperties)) {
            return $properties[$name];
        } else {
            return null;
        }
    }

    public function __set($name, $value)
    {
        if (in_array($name, $this->allowedProperties)) {
            $properties[$name] = $value;
        } else {
            throw new \LogicException("Property $name is not defined.");
        }
    }

}

Since the property has write access only, you can use the @property-write annotation instead.

Of course, you may also just have mistyped another name, in which case you should fix the error.

See also the PhpDoc documentation for @property.

Loading history...
232
233
        // use this to populate custom data columns before job is queued
234
        // note: you can pass arbitrary data to your job and then move it to job descriptor
235
        // this is useful if you need some data that needs to be exposed as a separate
236
        // DB column as opposed to serialised data
237
        $this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job);
238
239
        // copy data
240
        $this->copyJobToDescriptor($job, $jobDescriptor);
241
242
        $jobDescriptor->write();
243
244
        $this->startJob($jobDescriptor, $startAfter);
245
246
        return $jobDescriptor->ID;
247
    }
248
249
    /**
250
     * Start a job (or however the queue handler determines it should be started)
251
     *
252
     * @param QueuedJobDescriptor $jobDescriptor
253
     * @param string $startAfter
254
     */
255
    public function startJob($jobDescriptor, $startAfter = null)
256
    {
257
        if ($startAfter && strtotime($startAfter) > DBDatetime::now()->getTimestamp()) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $startAfter of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
258
            $this->queueHandler->scheduleJob($jobDescriptor, $startAfter);
259
        } else {
260
            // immediately start it on the queue, however that works
261
            $this->queueHandler->startJobOnQueue($jobDescriptor);
262
        }
263
    }
264
265
    /**
266
     * Check if maximum number of jobs are currently initialised.
267
     *
268
     * @return bool
269
     */
270
    public function isAtMaxJobs()
271
    {
272
        $initJobsMax = $this->config()->get('max_init_jobs');
273
        if (!$initJobsMax) {
274
            return false;
275
        }
276
277
        $initJobsCount = QueuedJobDescriptor::get()
278
            ->filter(['JobStatus' => QueuedJob::STATUS_INIT])
279
            ->count();
280
281
        if ($initJobsCount >= $initJobsMax) {
282
            return true;
283
        }
284
285
        return false;
286
    }
287
288
    /**
289
     * Copies data from a job into a descriptor for persisting
290
     *
291
     * @param QueuedJob $job
292
     * @param QueuedJobDescriptor $jobDescriptor
293
     */
294
    protected function copyJobToDescriptor($job, $jobDescriptor)
295
    {
296
        $data = $job->getJobData();
297
298
        $jobDescriptor->TotalSteps = $data->totalSteps;
299
        $jobDescriptor->StepsProcessed = $data->currentStep;
300
        if ($data->isComplete) {
301
            $jobDescriptor->JobStatus = QueuedJob::STATUS_COMPLETE;
302
            $jobDescriptor->JobFinished = DBDatetime::now()->Rfc2822();
303
        }
304
305
        $jobDescriptor->SavedJobData = serialize($data->jobData);
306
        $jobDescriptor->SavedJobMessages = serialize($data->messages);
307
    }
308
309
    /**
310
     * @param QueuedJobDescriptor $jobDescriptor
311
     * @param QueuedJob $job
312
     */
313
    protected function copyDescriptorToJob($jobDescriptor, $job)
314
    {
315
        $jobData = null;
0 ignored issues
show
Unused Code introduced by
$jobData is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
316
        $messages = null;
0 ignored issues
show
Unused Code introduced by
$messages is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
317
318
        // switching to php's serialize methods... not sure why this wasn't done from the start!
319
        $jobData = @unserialize($jobDescriptor->SavedJobData);
320
        $messages = @unserialize($jobDescriptor->SavedJobMessages);
321
322
        // try decoding as json if null
323
        $jobData = $jobData ?: json_decode($jobDescriptor->SavedJobData);
324
        $messages = $messages ?: json_decode($jobDescriptor->SavedJobMessages);
325
326
        $job->setJobData(
327
            $jobDescriptor->TotalSteps,
328
            $jobDescriptor->StepsProcessed,
329
            $jobDescriptor->JobStatus == QueuedJob::STATUS_COMPLETE,
330
            $jobData,
331
            $messages
332
        );
333
    }
334
335
    /**
336
     * Check the current job queues and see if any of the jobs currently in there should be started. If so,
337
     * return the next job that should be executed
338
     *
339
     * @param string $type Job type
340
     *
341
     * @return QueuedJobDescriptor|false
342
     */
343
    public function getNextPendingJob($type = null)
344
    {
345
        // Filter jobs by type
346
        $type = $type ?: QueuedJob::QUEUED;
347
        $list = QueuedJobDescriptor::get()
348
            ->filter('JobType', $type)
349
            ->sort('ID', 'ASC');
350
351
        // see if there's any blocked jobs that need to be resumed
352
        /** @var QueuedJobDescriptor $waitingJob */
353
        $waitingJob = $list
354
            ->filter('JobStatus', QueuedJob::STATUS_WAIT)
355
            ->first();
356
        if ($waitingJob) {
357
            return $waitingJob;
358
        }
359
360
        // If there's an existing job either running or pending, the lets just return false to indicate
361
        // that we're still executing
362
        $runningJob = $list
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $runningJob is correct as $list->filter('JobStatus...::STATUS_RUN))->first() (which targets SilverStripe\ORM\DataList::first()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
363
            ->filter('JobStatus', [QueuedJob::STATUS_INIT, QueuedJob::STATUS_RUN])
364
            ->first();
365
        if ($runningJob) {
366
            return false;
367
        }
368
369
        // Otherwise, lets find any 'new' jobs that are waiting to execute
370
        /** @var QueuedJobDescriptor $newJob */
371
        $newJob = $list
372
            ->filter('JobStatus', QueuedJob::STATUS_NEW)
373
            ->where(sprintf(
374
                '"StartAfter" < \'%s\' OR "StartAfter" IS NULL',
375
                DBDatetime::now()->getValue()
376
            ))
377
            ->first();
378
379
        return $newJob;
380
    }
381
382
    /**
383
     * Runs an explicit check on all currently running jobs to make sure their "processed" count is incrementing
384
     * between each run. If it's not, then we need to flag it as paused due to an error.
385
     *
386
     * This typically happens when a PHP fatal error is thrown, which can't be picked up by the error
387
     * handler or exception checker; in this case, we detect these stalled jobs later and fix (try) to
388
     * fix them
389
     *
390
     * @param int $queue The queue to check against
391
     */
392
    public function checkJobHealth($queue = null)
393
    {
394
        $queue = $queue ?: QueuedJob::QUEUED;
395
        // Select all jobs currently marked as running
396
        $runningJobs = QueuedJobDescriptor::get()
397
            ->filter([
398
                'JobStatus' => [
399
                    QueuedJob::STATUS_RUN,
400
                    QueuedJob::STATUS_INIT,
401
                ],
402
                'JobType' => $queue,
403
            ]);
404
405
        // If no steps have been processed since the last run, consider it a broken job
406
        // Only check jobs that have been viewed before. LastProcessedCount defaults to -1 on new jobs.
407
        $stalledJobs = $runningJobs
408
            ->filter('LastProcessedCount:GreaterThanOrEqual', 0)
409
            ->where('"StepsProcessed" = "LastProcessedCount"');
410
        foreach ($stalledJobs as $stalledJob) {
411
            $this->restartStalledJob($stalledJob);
412
        }
413
414
        // now, find those that need to be marked before the next check
415
        // foreach job, mark it as having been incremented
416
        foreach ($runningJobs as $job) {
417
            $job->LastProcessedCount = $job->StepsProcessed;
418
            $job->write();
419
        }
420
421
        // finally, find the list of broken jobs and send an email if there's some found
422
        $brokenJobs = QueuedJobDescriptor::get()->filter('JobStatus', QueuedJob::STATUS_BROKEN);
423 View Code Duplication
        if ($brokenJobs && $brokenJobs->count()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
424
            $this->getLogger()->error(
425
                print_r(
426
                    [
427
                        'errno' => 0,
428
                        'errstr' => 'Broken jobs were found in the job queue',
429
                        'errfile' => __FILE__,
430
                        'errline' => __LINE__,
431
                        'errcontext' => [],
432
                    ],
433
                    true
434
                ),
435
                [
436
                    'file' => __FILE__,
437
                    'line' => __LINE__,
438
                ]
439
            );
440
        }
441
442
        return $stalledJobs->count();
443
    }
444
445
    /**
446
     * Checks through ll the scheduled jobs that are expected to exist
447
     */
448
    public function checkDefaultJobs($queue = null)
449
    {
450
        $queue = $queue ?: QueuedJob::QUEUED;
0 ignored issues
show
Unused Code introduced by
$queue is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
451
        if (count($this->defaultJobs)) {
452
            $activeJobs = QueuedJobDescriptor::get()->filter(
453
                'JobStatus',
454
                [
455
                    QueuedJob::STATUS_NEW,
456
                    QueuedJob::STATUS_INIT,
457
                    QueuedJob::STATUS_RUN,
458
                    QueuedJob::STATUS_WAIT,
459
                    QueuedJob::STATUS_PAUSED,
460
                ]
461
            );
462
            foreach ($this->defaultJobs as $title => $jobConfig) {
463
                if (!isset($jobConfig['filter']) || !isset($jobConfig['type'])) {
464
                    $this->getLogger()->error(
465
                        "Default Job config: $title incorrectly set up. Please check the readme for examples",
466
                        [
467
                            'file' => __FILE__,
468
                            'line' => __LINE__,
469
                        ]
470
                    );
471
                    continue;
472
                }
473
                $job = $activeJobs->filter(array_merge(
474
                    ['Implementation' => $jobConfig['type']],
475
                    $jobConfig['filter']
476
                ));
477
                if (!$job->count()) {
478
                    $this->getLogger()->info(
479
                        "Default Job config: $title was missing from Queue",
480
                        [
481
                            'file' => __FILE__,
482
                            'line' => __LINE__,
483
                        ]
484
                    );
485
                    Email::create()
486
                        ->setTo(
487
                            isset($jobConfig['email'])
488
                                ? $jobConfig['email']
489
                                : Config::inst()->get(Email::class, 'queued_job_admin_email')
490
                        )
491
                        ->setFrom(Config::inst()->get(Email::class, 'admin_email'))
492
                        ->setSubject('Default Job "' . $title . '" missing')
493
                        ->setData($jobConfig)
494
                        ->addData('Title', $title)
495
                        ->addData('Site', Director::absoluteBaseURL())
0 ignored issues
show
Security Bug introduced by
It seems like \SilverStripe\Control\Director::absoluteBaseURL() targeting SilverStripe\Control\Director::absoluteBaseURL() can also be of type false; however, SilverStripe\Control\Email\Email::addData() does only seem to accept string|null, did you maybe forget to handle an error condition?
Loading history...
496
                        ->setHTMLTemplate('QueuedJobsDefaultJob')
497
                        ->send();
498
                    if (isset($jobConfig['recreate']) && $jobConfig['recreate']) {
499
                        if (!array_key_exists('construct', $jobConfig)
500
                            || !isset($jobConfig['startDateFormat'])
501
                            || !isset($jobConfig['startTimeString'])
502
                        ) {
503
                            $this->getLogger()->error(
504
                                "Default Job config: $title incorrectly set up. Please check the readme for examples",
505
                                [
506
                                    'file' => __FILE__,
507
                                    'line' => __LINE__,
508
                                ]
509
                            );
510
                            continue;
511
                        }
512
                        QueuedJobService::singleton()->queueJob(
513
                            Injector::inst()->createWithArgs($jobConfig['type'], $jobConfig['construct']),
514
                            date($jobConfig['startDateFormat'], strtotime($jobConfig['startTimeString']))
515
                        );
516
                        $this->getLogger()->info(
517
                            "Default Job config: $title has been re-added to the Queue",
518
                            [
519
                                'file' => __FILE__,
520
                                'line' => __LINE__,
521
                            ]
522
                        );
523
                    }
524
                }
525
            }
526
        }
527
    }
528
529
    /**
530
     * Attempt to restart a stalled job
531
     *
532
     * @param QueuedJobDescriptor $stalledJob
533
     *
534
     * @return bool True if the job was successfully restarted
535
     */
536
    protected function restartStalledJob($stalledJob)
537
    {
538
        if ($stalledJob->ResumeCounts < static::config()->get('stall_threshold')) {
539
            $stalledJob->restart();
540
            $logLevel = 'warning';
541
            $message = _t(
542
                __CLASS__ . '.STALLED_JOB_RESTART_MSG',
543
                'A job named {name} (#{id}) appears to have stalled. It will be stopped and restarted, please '
544
                . 'login to make sure it has continued',
545
                ['name' => $stalledJob->JobTitle, 'id' => $stalledJob->ID]
546
            );
547
        } else {
548
            $stalledJob->pause();
549
            $logLevel = 'error';
550
            $message = _t(
551
                __CLASS__ . '.STALLED_JOB_MSG',
552
                'A job named {name} (#{id}) appears to have stalled. It has been paused, please login to check it',
553
                ['name' => $stalledJob->JobTitle, 'id' => $stalledJob->ID]
554
            );
555
        }
556
557
        $this->getLogger()->log(
558
            $logLevel,
559
            $message,
560
            [
561
                'file' => __FILE__,
562
                'line' => __LINE__,
563
            ]
564
        );
565
        $from = Config::inst()->get(Email::class, 'admin_email');
566
        $to = Config::inst()->get(Email::class, 'queued_job_admin_email');
567
        $subject = _t(__CLASS__ . '.STALLED_JOB', 'Stalled job');
568
569
        if ($to) {
570
            $mail = Email::create($from, $to, $subject)
571
                ->setData([
572
                    'JobID' => $stalledJob->ID,
573
                    'Message' => $message,
574
                    'Site' => Director::absoluteBaseURL(),
575
                ])
576
                ->setHTMLTemplate('QueuedJobsStalledJob');
577
            $mail->send();
578
        }
579
    }
580
581
    /**
582
     * Prepares the given jobDescriptor for execution. Returns the job that
583
     * will actually be run in a state ready for executing.
584
     *
585
     * Note that this is called each time a job is picked up to be executed from the cron
586
     * job - meaning that jobs that are paused and restarted will have 'setup()' called on them again,
587
     * so your job MUST detect that and act accordingly.
588
     *
589
     * @param QueuedJobDescriptor $jobDescriptor
590
     *          The Job descriptor of a job to prepare for execution
591
     *
592
     * @return QueuedJob|boolean
593
     * @throws Exception
594
     */
595
    protected function initialiseJob(QueuedJobDescriptor $jobDescriptor)
596
    {
597
        // create the job class
598
        $impl = $jobDescriptor->Implementation;
599
        /** @var QueuedJob $job */
600
        $job = Injector::inst()->create($impl);
601
        /* @var $job QueuedJob */
602
        if (!$job) {
603
            throw new Exception("Implementation $impl no longer exists");
604
        }
605
606
        $jobDescriptor->JobStatus = QueuedJob::STATUS_INIT;
607
        $jobDescriptor->write();
608
609
        // make sure the data is there
610
        $this->copyDescriptorToJob($jobDescriptor, $job);
611
612
        // see if it needs 'setup' or 'restart' called
613
        if ($jobDescriptor->StepsProcessed <= 0) {
614
            $job->setup();
615
        } else {
616
            $job->prepareForRestart();
617
        }
618
619
        // make sure the descriptor is up to date with anything changed
620
        $this->copyJobToDescriptor($job, $jobDescriptor);
621
        $jobDescriptor->write();
622
623
        return $job;
624
    }
625
626
    /**
627
     * Given a {@link QueuedJobDescriptor} mark the job as initialised. Works sort of like a mutex.
628
     * Currently a database lock isn't entirely achievable, due to database adapters not supporting locks.
629
     * This may still have a race condition, but this should minimise the possibility.
630
     * Side effect is the job status will be changed to "Initialised".
631
     *
632
     * Assumption is the job has a status of "Queued" or "Wait".
633
     *
634
     * @param QueuedJobDescriptor $jobDescriptor
635
     *
636
     * @return boolean
637
     */
638
    protected function grabMutex(QueuedJobDescriptor $jobDescriptor)
639
    {
640
        // write the status and determine if any rows were affected, for protection against a
641
        // potential race condition where two or more processes init the same job at once.
642
        // This deliberately does not use write() as that would always update LastEdited
643
        // and thus the row would always be affected.
644
        try {
645
            DB::query(sprintf(
646
                'UPDATE "QueuedJobDescriptor" SET "JobStatus" = \'%s\' WHERE "ID" = %s',
647
                QueuedJob::STATUS_INIT,
648
                $jobDescriptor->ID
649
            ));
650
        } catch (Exception $e) {
651
            return false;
652
        }
653
654
        if (DB::get_conn()->affectedRows() === 0 && $jobDescriptor->JobStatus !== QueuedJob::STATUS_INIT) {
655
            return false;
656
        }
657
658
        return true;
659
    }
660
661
    /**
662
     * Start the actual execution of a job.
663
     * The assumption is the jobID refers to a {@link QueuedJobDescriptor} that is status set as "Queued".
664
     *
665
     * This method will continue executing until the job says it's completed
666
     *
667
     * @param int $jobId
668
     *          The ID of the job to start executing
669
     *
670
     * @return boolean
671
     * @throws Exception
672
     */
673
    public function runJob($jobId)
674
    {
675
        // first retrieve the descriptor
676
        /** @var QueuedJobDescriptor $jobDescriptor */
677
        $jobDescriptor = DataObject::get_by_id(
678
            QueuedJobDescriptor::class,
679
            (int)$jobId
680
        );
681
        if (!$jobDescriptor) {
682
            throw new Exception("$jobId is invalid");
683
        }
684
685
        // now lets see whether we have a current user to run as. Typically, if the job is executing via the CLI,
686
        // we want it to actually execute as the RunAs user - however, if running via the web (which is rare...), we
687
        // want to ensure that the current user has admin privileges before switching. Otherwise, we just run it
688
        // as the currently logged in user and hope for the best
689
690
        // We need to use $_SESSION directly because SS ties the session to a controller that no longer exists at
691
        // this point of execution in some circumstances
692
        $originalUserID = isset($_SESSION['loggedInAs']) ? $_SESSION['loggedInAs'] : 0;
693
        /** @var Member|null $originalUser */
694
        $originalUser = $originalUserID
695
            ? DataObject::get_by_id(Member::class, $originalUserID)
696
            : null;
697
        $runAsUser = null;
698
699
        // If the Job has requested that we run it as a particular user, then we should try and do that.
700
        if ($jobDescriptor->RunAs() !== null) {
701
            $runAsUser = $this->setRunAsUser($jobDescriptor->RunAs(), $originalUser);
702
        }
703
704
        // set up a custom error handler for this processing
705
        $errorHandler = new JobErrorHandler();
706
707
        $job = null;
708
709
        $broken = false;
710
711
        // Push a config context onto the stack for the duration of this job run.
712
        Config::nest();
713
714
        if ($this->grabMutex($jobDescriptor)) {
715
            try {
716
                $job = $this->initialiseJob($jobDescriptor);
717
718
                // get the job ready to begin.
719
                if (!$jobDescriptor->JobStarted) {
720
                    $jobDescriptor->JobStarted = DBDatetime::now()->Rfc2822();
721
                } else {
722
                    $jobDescriptor->JobRestarted = DBDatetime::now()->Rfc2822();
0 ignored issues
show
Documentation introduced by
The property JobRestarted does not exist on object<Symbiote\QueuedJo...ts\QueuedJobDescriptor>. Since you implemented __set, maybe consider adding a @property annotation.

Since your code implements the magic setter _set, this function will be called for any write access on an undefined variable. You can add the @property annotation to your class or interface to document the existence of this variable.

<?php

/**
 * @property int $x
 * @property int $y
 * @property string $text
 */
class MyLabel
{
    private $properties;

    private $allowedProperties = array('x', 'y', 'text');

    public function __get($name)
    {
        if (isset($properties[$name]) && in_array($name, $this->allowedProperties)) {
            return $properties[$name];
        } else {
            return null;
        }
    }

    public function __set($name, $value)
    {
        if (in_array($name, $this->allowedProperties)) {
            $properties[$name] = $value;
        } else {
            throw new \LogicException("Property $name is not defined.");
        }
    }

}

Since the property has write access only, you can use the @property-write annotation instead.

Of course, you may also just have mistyped another name, in which case you should fix the error.

See also the PhpDoc documentation for @property.

Loading history...
723
                }
724
725
                // Only write to job as "Running" if 'isComplete' was NOT set to true
726
                // during setup() or prepareForRestart()
727
                if (!$job->jobFinished()) {
728
                    $jobDescriptor->JobStatus = QueuedJob::STATUS_RUN;
729
                    $jobDescriptor->write();
730
                }
731
732
                $lastStepProcessed = 0;
733
                // have we stalled at all?
734
                $stallCount = 0;
735
736
                if (class_exists(Subsite::class) && !empty($job->SubsiteID)) {
737
                    Subsite::changeSubsite($job->SubsiteID);
738
739
                    // lets set the base URL as far as Director is concerned so that our URLs are correct
740
                    /** @var Subsite $subsite */
741
                    $subsite = DataObject::get_by_id(Subsite::class, $job->SubsiteID);
742
                    if ($subsite && $subsite->exists()) {
743
                        $domain = $subsite->domain();
744
                        $base = rtrim(Director::protocol() . $domain, '/') . '/';
745
746
                        Config::modify()->set(Director::class, 'alternate_base_url', $base);
747
                    }
748
                }
749
750
                // while not finished
751
                while (!$job->jobFinished() && !$broken) {
752
                    // see that we haven't been set to 'paused' or otherwise by another process
753
                    /** @var QueuedJobDescriptor $jobDescriptor */
754
                    $jobDescriptor = DataObject::get_by_id(
755
                        QueuedJobDescriptor::class,
756
                        (int)$jobId
757
                    );
758
                    if (!$jobDescriptor || !$jobDescriptor->exists()) {
759
                        $broken = true;
760
                        $this->getLogger()->error(
761
                            print_r(
762
                                [
763
                                    'errno' => 0,
764
                                    'errstr' => 'Job descriptor ' . $jobId . ' could not be found',
765
                                    'errfile' => __FILE__,
766
                                    'errline' => __LINE__,
767
                                    'errcontext' => [],
768
                                ],
769
                                true
770
                            ),
771
                            [
772
                                'file' => __FILE__,
773
                                'line' => __LINE__,
774
                            ]
775
                        );
776
                        break;
777
                    }
778 View Code Duplication
                    if ($jobDescriptor->JobStatus != QueuedJob::STATUS_RUN) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
779
                        // we've been paused by something, so we'll just exit
780
                        $job->addMessage(_t(
781
                            __CLASS__ . '.JOB_PAUSED',
782
                            'Job paused at {time}',
783
                            ['time' => DBDatetime::now()->Rfc2822()]
784
                        ));
785
                        $broken = true;
786
                    }
787
788
                    if (!$broken) {
789
                        // Inject real-time log handler
790
                        $logger = Injector::inst()->get(LoggerInterface::class);
791
                        if ($logger instanceof Logger) {
792
                            // Check if there is already a handler
793
                            $exists = false;
794
                            foreach ($logger->getHandlers() as $handler) {
795
                                if ($handler instanceof QueuedJobHandler) {
796
                                    $exists = true;
797
                                    break;
798
                                }
799
                            }
800
801
                            if (!$exists) {
802
                                // Add the handler
803
                                /** @var QueuedJobHandler $queuedJobHandler */
804
                                $queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor);
805
806
                                // We only write for every 100 file
807
                                $bufferHandler = new BufferHandler(
808
                                    $queuedJobHandler,
809
                                    100,
810
                                    Logger::DEBUG,
811
                                    true,
812
                                    true
813
                                );
814
815
                                $logger->pushHandler($bufferHandler);
816
                            }
817
                        } else {
818
                            if ($logger instanceof LoggerInterface) {
819
                                $logger->warning(
820
                                    'Monolog not found, messages will not output while the job is running'
821
                                );
822
                            }
823
                        }
824
825
                        // Collect output as job messages as well as sending it to the screen after processing
826
                        $obLogger = function ($buffer, $phase) use ($job, $jobDescriptor) {
0 ignored issues
show
Unused Code introduced by
The parameter $phase is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
827
                            $job->addMessage($buffer);
828
                            if ($jobDescriptor) {
829
                                $this->copyJobToDescriptor($job, $jobDescriptor);
830
                                $jobDescriptor->write();
831
                            }
832
                            return $buffer;
833
                        };
834
                        ob_start($obLogger, 256);
835
836
                        try {
837
                            $job->process();
838
                        } catch (Exception $e) {
839
                            // okay, we'll just catch this exception for now
840
                            $job->addMessage(
841
                                _t(
842
                                    __CLASS__ . '.JOB_EXCEPT',
843
                                    'Job caused exception {message} in {file} at line {line}',
844
                                    [
845
                                        'message' => $e->getMessage(),
846
                                        'file' => $e->getFile(),
847
                                        'line' => $e->getLine(),
848
                                    ]
849
                                )
850
                            );
851
                            $this->getLogger()->error(
852
                                $e->getMessage(),
853
                                [
854
                                    'exception' => $e,
855
                                ]
856
                            );
857
                            $jobDescriptor->JobStatus =  QueuedJob::STATUS_BROKEN;
858
                            $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e);
859
                        }
860
861
                        // Write any remaining batched messages at the end
862
                        if (isset($bufferHandler)) {
863
                            $bufferHandler->flush();
864
                        }
865
866
                        ob_end_flush();
867
868
                        // now check the job state
869
                        $data = $job->getJobData();
870
                        if ($data->currentStep == $lastStepProcessed) {
871
                            $stallCount++;
872
                        }
873
874 View Code Duplication
                        if ($stallCount > static::config()->get('stall_threshold')) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
875
                            $broken = true;
876
                            $job->addMessage(
877
                                _t(
878
                                    __CLASS__ . '.JOB_STALLED',
879
                                    'Job stalled after {attempts} attempts - please check',
880
                                    ['attempts' => $stallCount]
881
                                )
882
                            );
883
                            $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN;
884
                        }
885
886
                        // now we'll be good and check our memory usage. If it is too high, we'll set the job to
887
                        // a 'Waiting' state, and let the next processing run pick up the job.
888
                        if ($this->isMemoryTooHigh()) {
889
                            $job->addMessage(
890
                                _t(
891
                                    __CLASS__ . '.MEMORY_RELEASE',
892
                                    'Job releasing memory and waiting ({used} used)',
893
                                    ['used' => $this->humanReadable($this->getMemoryUsage())]
894
                                )
895
                            );
896
                            if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) {
897
                                $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT;
898
                            }
899
                            $broken = true;
900
                        }
901
902
                        // Also check if we are running too long
903
                        if ($this->hasPassedTimeLimit()) {
904
                            $job->addMessage(_t(
905
                                __CLASS__ . '.TIME_LIMIT',
906
                                'Queue has passed time limit and will restart before continuing'
907
                            ));
908
                            if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) {
909
                                $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT;
910
                            }
911
                            $broken = true;
912
                        }
913
                    }
914
915
                    if ($jobDescriptor) {
916
                        $this->copyJobToDescriptor($job, $jobDescriptor);
917
                        $jobDescriptor->write();
918 View Code Duplication
                    } else {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
919
                        $this->getLogger()->error(
920
                            print_r(
921
                                [
922
                                    'errno' => 0,
923
                                    'errstr' => 'Job descriptor has been set to null',
924
                                    'errfile' => __FILE__,
925
                                    'errline' => __LINE__,
926
                                    'errcontext' => [],
927
                                ],
928
                                true
929
                            ),
930
                            [
931
                                'file' => __FILE__,
932
                                'line' => __LINE__,
933
                            ]
934
                        );
935
                        $broken = true;
936
                    }
937
                }
938
939
                // a last final save. The job is complete by now
940
                if ($jobDescriptor) {
941
                    $jobDescriptor->write();
942
                }
943
944
                if ($job->jobFinished()) {
945
                    /** @var AbstractQueuedJob|QueuedJob $job */
946
                    $job->afterComplete();
947
                    $jobDescriptor->cleanupJob();
948
949
                    $this->extend('updateJobDescriptorAndJobOnCompletion', $jobDescriptor, $job);
950
                }
951
            } catch (Exception $e) {
952
                // PHP 5.6 exception handling
953
                $this->handleBrokenJobException($jobDescriptor, $job, $e);
954
                $broken = true;
955
            } catch (\Throwable $e) {
0 ignored issues
show
Bug introduced by
The class Throwable does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
956
                // PHP 7 Error handling)
957
                $this->handleBrokenJobException($jobDescriptor, $job, $e);
958
                $broken = true;
959
            }
960
        }
961
962
        $errorHandler->clear();
963
964
        Config::unnest();
965
966
        $this->unsetRunAsUser($runAsUser, $originalUser);
967
968
        return !$broken;
969
    }
970
971
    /**
972
     * @param QueuedJobDescriptor $jobDescriptor
973
     * @param QueuedJob $job
974
     * @param Exception|\Throwable $e
975
     */
976
    protected function handleBrokenJobException(QueuedJobDescriptor $jobDescriptor, QueuedJob $job, $e)
977
    {
978
        // okay, we'll just catch this exception for now
979
        $this->getLogger()->info(
980
            $e->getMessage(),
981
            [
982
                'exception' => $e,
983
            ]
984
        );
985
        $jobDescriptor->JobStatus =  QueuedJob::STATUS_BROKEN;
986
        $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e);
987
        $jobDescriptor->write();
988
    }
989
990
    /**
991
     * @param Member $runAsUser
992
     * @param Member|null $originalUser
993
     * @return null|Member
994
     */
995
    protected function setRunAsUser(Member $runAsUser, Member $originalUser = null)
996
    {
997
        // Sanity check. Can't set the user if they don't exist.
998
        if ($runAsUser === null || !$runAsUser->exists()) {
999
            return null;
1000
        }
1001
1002
        // Don't need to set Security user if we're already logged in as that same user.
1003
        if ($originalUser && $originalUser->ID === $runAsUser->ID) {
1004
            return null;
1005
        }
1006
1007
        // We are currently either not logged in at all, or we're logged in as a different user. We should switch users
1008
        // so that the context within the Job is correct.
1009
        if (Controller::has_curr()) {
1010
            Security::setCurrentUser($runAsUser);
1011
        } else {
1012
            $_SESSION['loggedInAs'] = $runAsUser->ID;
1013
        }
1014
1015
        // This is an explicit coupling brought about by SS not having a nice way of mocking a user, as it requires
1016
        // session nastiness
1017
        if (class_exists('SecurityContext')) {
1018
            singleton('SecurityContext')->setMember($runAsUser);
1019
        }
1020
1021
        return $runAsUser;
1022
    }
1023
1024
    /**
1025
     * @param Member|null $runAsUser
1026
     * @param Member|null $originalUser
1027
     */
1028
    protected function unsetRunAsUser(Member $runAsUser = null, Member $originalUser = null)
1029
    {
1030
        // No runAsUser was set, so we don't need to do anything.
1031
        if ($runAsUser === null) {
1032
            return;
1033
        }
1034
1035
        // There was no originalUser, so we should make sure that we set the user back to null.
1036
        if (!$originalUser) {
1037
            if (Controller::has_curr()) {
1038
                Security::setCurrentUser(null);
1039
            } else {
1040
                $_SESSION['loggedInAs'] = null;
1041
            }
1042
1043
            return;
1044
        }
1045
1046
        // Okay let's reset our user.
1047
        if (Controller::has_curr()) {
1048
            Security::setCurrentUser($originalUser);
1049
        } else {
1050
            $_SESSION['loggedInAs'] = $originalUser->ID;
1051
        }
1052
    }
1053
1054
    /**
1055
     * Start timer
1056
     */
1057
    protected function markStarted()
1058
    {
1059
        if (!$this->startedAt) {
1060
            $this->startedAt = DBDatetime::now()->getTimestamp();
1061
        }
1062
    }
1063
1064
    /**
1065
     * Is execution time too long?
1066
     *
1067
     * @return bool True if the script has passed the configured time_limit
1068
     */
1069
    protected function hasPassedTimeLimit()
1070
    {
1071
        // Ensure a limit exists
1072
        $limit = static::config()->get('time_limit');
1073
        if (!$limit) {
1074
            return false;
1075
        }
1076
1077
        // Ensure started date is set
1078
        $this->markStarted();
1079
1080
        // Check duration
1081
        $now = DBDatetime::now()->getTimestamp();
1082
        return $now > $this->startedAt + $limit;
1083
    }
1084
1085
    /**
1086
     * Is memory usage too high?
1087
     *
1088
     * @return bool
1089
     */
1090
    protected function isMemoryTooHigh()
1091
    {
1092
        $used = $this->getMemoryUsage();
1093
        $limit = $this->getMemoryLimit();
1094
        return $limit && ($used > $limit);
1095
    }
1096
1097
    /**
1098
     * Get peak memory usage of this application
1099
     *
1100
     * @return float
1101
     */
1102
    protected function getMemoryUsage()
1103
    {
1104
        // Note we use real_usage = false
1105
        // http://stackoverflow.com/questions/15745385/memory-get-peak-usage-with-real-usage
1106
        // Also we use the safer peak memory usage
1107
        return (float)memory_get_peak_usage(false);
1108
    }
1109
1110
    /**
1111
     * Determines the memory limit (in bytes) for this application
1112
     * Limits to the smaller of memory_limit configured via php.ini or silverstripe config
1113
     *
1114
     * @return float Memory limit in bytes
1115
     */
1116
    protected function getMemoryLimit()
1117
    {
1118
        // Limit to smaller of explicit limit or php memory limit
1119
        $limit = $this->parseMemory(static::config()->get('memory_limit'));
1120
        if ($limit) {
1121
            return $limit;
1122
        }
1123
1124
        // Fallback to php memory limit
1125
        $phpLimit = $this->getPHPMemoryLimit();
1126
        if ($phpLimit) {
1127
            return $phpLimit;
1128
        }
1129
    }
1130
1131
    /**
1132
     * Calculate the current memory limit of the server
1133
     *
1134
     * @return float
1135
     */
1136
    protected function getPHPMemoryLimit()
1137
    {
1138
        return $this->parseMemory(trim(ini_get("memory_limit")));
1139
    }
1140
1141
    /**
1142
     * Convert memory limit string to bytes.
1143
     * Based on implementation in install.php5
1144
     *
1145
     * @param string $memString
1146
     *
1147
     * @return float
1148
     */
1149
    protected function parseMemory($memString)
1150
    {
1151
        switch (strtolower(substr($memString, -1))) {
1152
            case "b":
1153
                return round(substr($memString, 0, -1));
1154
            case "k":
1155
                return round(substr($memString, 0, -1) * 1024);
1156
            case "m":
1157
                return round(substr($memString, 0, -1) * 1024 * 1024);
1158
            case "g":
1159
                return round(substr($memString, 0, -1) * 1024 * 1024 * 1024);
1160
            default:
1161
                return round($memString);
1162
        }
1163
    }
1164
1165
    protected function humanReadable($size)
1166
    {
1167
        $filesizename = [" Bytes", " KB", " MB", " GB", " TB", " PB", " EB", " ZB", " YB"];
1168
        return $size ? round($size / pow(1024, ($i = floor(log($size, 1024)))), 2) . $filesizename[$i] : '0 Bytes';
1169
    }
1170
1171
1172
    /**
1173
     * Gets a list of all the current jobs (or jobs that have recently finished)
1174
     *
1175
     * @param string $type
1176
     *          if we're after a particular job list
1177
     * @param int $includeUpUntil
1178
     *          The number of seconds to include jobs that have just finished, allowing a job list to be built that
1179
     *          includes recently finished jobs
1180
     *
1181
     * @return DataList|QueuedJobDescriptor[]
1182
     */
1183
    public function getJobList($type = null, $includeUpUntil = 0)
1184
    {
1185
        return DataObject::get(
1186
            QueuedJobDescriptor::class,
1187
            $this->getJobListFilter($type, $includeUpUntil)
1188
        );
1189
    }
1190
1191
    /**
1192
     * Return the SQL filter used to get the job list - this is used by the UI for displaying the job list...
1193
     *
1194
     * @param string $type
1195
     *          if we're after a particular job list
1196
     * @param int $includeUpUntil
1197
     *          The number of seconds to include jobs that have just finished, allowing a job list to be built that
1198
     *          includes recently finished jobs
1199
     *
1200
     * @return string
1201
     */
1202
    public function getJobListFilter($type = null, $includeUpUntil = 0)
1203
    {
1204
        $util = singleton(QJUtils::class);
1205
1206
        $filter = ['JobStatus <>' => QueuedJob::STATUS_COMPLETE];
1207
        if ($includeUpUntil) {
1208
            $filter['JobFinished > '] = DBDatetime::create()->setValue(
1209
                DBDatetime::now()->getTimestamp() - $includeUpUntil
1210
            )->Rfc2822();
1211
        }
1212
1213
        $filter = $util->dbQuote($filter, ' OR ');
1214
1215
        if ($type) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $type of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
1216
            $filter = $util->dbQuote(['JobType =' => (string)$type]) . ' AND (' . $filter . ')';
1217
        }
1218
1219
        return $filter;
1220
    }
1221
1222
    /**
1223
     * Process the job queue with the current queue runner
1224
     *
1225
     * @param string $queue
1226
     */
1227
    public function runQueue($queue)
1228
    {
1229
        if (!self::config()->get('disable_health_check')) {
1230
            $this->checkJobHealth($queue);
1231
        }
1232
        $this->checkdefaultJobs($queue);
1233
        $this->queueRunner->runQueue($queue);
1234
    }
1235
1236
    /**
1237
     * Process all jobs from a given queue
1238
     *
1239
     * @param string $name The job queue to completely process
1240
     */
1241
    public function processJobQueue($name)
1242
    {
1243
        // Start timer to measure lifetime
1244
        $this->markStarted();
1245
1246
        // Begin main loop
1247
        do {
1248
            if (class_exists(Subsite::class)) {
1249
                // clear subsite back to default to prevent any subsite changes from leaking to
1250
                // subsequent actions
1251
                Subsite::changeSubsite(0);
1252
            }
1253
1254
            $job = $this->getNextPendingJob($name);
1255
            if ($job) {
1256
                $success = $this->runJob($job->ID);
1257
                if (!$success) {
1258
                    // make sure job is null so it doesn't continue the current
1259
                    // processing loop. Next queue executor can pick up where
1260
                    // things left off
1261
                    $job = null;
1262
                }
1263
            }
1264
        } while ($job);
1265
    }
1266
1267
    /**
1268
     * When PHP shuts down, we want to process all of the immediate queue items
1269
     *
1270
     * We use the 'getNextPendingJob' method, instead of just iterating the queue, to ensure
1271
     * we ignore paused or stalled jobs.
1272
     */
1273
    public function onShutdown()
1274
    {
1275
        $this->processJobQueue(QueuedJob::IMMEDIATE);
1276
    }
1277
1278
    /**
1279
     * Get a logger
1280
     *
1281
     * @return LoggerInterface
1282
     */
1283
    public function getLogger()
1284
    {
1285
        return Injector::inst()->get(LoggerInterface::class);
1286
    }
1287
}
1288