Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like QueuedJobService often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use QueuedJobService, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
51 | class QueuedJobService |
||
52 | { |
||
53 | use Configurable; |
||
54 | use Injectable; |
||
55 | use Extensible; |
||
56 | |||
57 | /** |
||
58 | * @config |
||
59 | * @var int |
||
60 | */ |
||
61 | private static $stall_threshold = 3; |
||
62 | |||
63 | /** |
||
64 | * How much ram will we allow before pausing and releasing the memory? |
||
65 | * |
||
66 | * For instance, set to 268435456 (256MB) to pause this process if used memory exceeds |
||
67 | * this value. This needs to be set to a value lower than the php_ini max_memory as |
||
68 | * the system will otherwise crash before shutdown can be handled gracefully. |
||
69 | * |
||
70 | * This was increased to 256MB for SilverStripe 4.x as framework uses more memory than 3.x |
||
71 | * |
||
72 | * @var int |
||
73 | * @config |
||
74 | */ |
||
75 | private static $memory_limit = 268435456; |
||
76 | |||
77 | /** |
||
78 | * Optional time limit (in seconds) to run the service before restarting to release resources. |
||
79 | * |
||
80 | * Defaults to no limit. |
||
81 | * |
||
82 | * @var int |
||
83 | * @config |
||
84 | */ |
||
85 | private static $time_limit = 0; |
||
86 | |||
87 | /** |
||
88 | * Disable health checks that usually occur when a runner first picks up a queue. Note that completely disabling |
||
89 | * health checks could result in many jobs that are always marked as running - that will never be restarted. If |
||
90 | * this option is disabled you may alternatively use the build task |
||
91 | * |
||
92 | * @see \Symbiote\QueuedJobs\Tasks\CheckJobHealthTask |
||
93 | * |
||
94 | * @var bool |
||
95 | * @config |
||
96 | */ |
||
97 | private static $disable_health_check = false; |
||
98 | |||
99 | /** |
||
100 | * Maximum number of jobs that can be initialised at any one time. |
||
101 | * |
||
102 | * Prevents too many jobs getting into this state in case something goes wrong with the child processes. |
||
103 | * We shouldn't have too many jobs in the initialising state anyway. |
||
104 | * |
||
105 | * Valid values: |
||
106 | * 0 - unlimited (default) |
||
107 | * greater than 0 - maximum number of jobs in initialised state |
||
108 | * |
||
109 | * @var int |
||
110 | * @config |
||
111 | */ |
||
112 | private static $max_init_jobs = 0; |
||
113 | |||
114 | /** |
||
115 | * Timestamp (in seconds) when the queue was started |
||
116 | * |
||
117 | * @var int |
||
118 | */ |
||
119 | protected $startedAt = 0; |
||
120 | |||
121 | /** |
||
122 | * Should "immediate" jobs be managed using the shutdown function? |
||
123 | * |
||
124 | * It is recommended you set up an inotify watch and use that for |
||
125 | * triggering immediate jobs. See the wiki for more information |
||
126 | * |
||
127 | * @var boolean |
||
128 | * @config |
||
129 | */ |
||
130 | private static $use_shutdown_function = true; |
||
131 | |||
132 | /** |
||
133 | * The location for immediate jobs to be stored in |
||
134 | * |
||
135 | * @var string |
||
136 | * @config |
||
137 | */ |
||
138 | private static $cache_dir = 'queuedjobs'; |
||
139 | |||
140 | /** |
||
141 | * Maintenance lock file feature enabled / disable setting |
||
142 | * |
||
143 | * @config |
||
144 | * @var bool |
||
145 | */ |
||
146 | private static $lock_file_enabled = false; |
||
147 | |||
148 | /** |
||
149 | * Maintenance lock file name |
||
150 | * |
||
151 | * @config |
||
152 | * @var string |
||
153 | */ |
||
154 | private static $lock_file_name = 'maintenance-lock.txt'; |
||
155 | |||
156 | /** |
||
157 | * Maintenance lock path (relative path starting at the base folder) |
||
158 | * Note that this path needs to point to a folder on a shared drive if multiple instances are used |
||
159 | * |
||
160 | * @config |
||
161 | * @var string |
||
162 | */ |
||
163 | private static $lock_file_path = ''; |
||
164 | |||
165 | /** |
||
166 | * @var DefaultQueueHandler |
||
167 | */ |
||
168 | public $queueHandler; |
||
169 | |||
170 | /** |
||
171 | * |
||
172 | * @var TaskRunnerEngine |
||
173 | */ |
||
174 | public $queueRunner; |
||
175 | |||
176 | /** |
||
177 | * Config controlled list of default/required jobs |
||
178 | * |
||
179 | * @var array |
||
180 | */ |
||
181 | public $defaultJobs = []; |
||
182 | |||
183 | /** |
||
184 | * Register our shutdown handler |
||
185 | */ |
||
186 | public function __construct() |
||
204 | |||
205 | /** |
||
206 | * Adds a job to the queue to be started |
||
207 | * |
||
208 | * @param QueuedJob $job The job to start. |
||
209 | * @param null|string $startAfter The date (in Y-m-d H:i:s format) to start execution after |
||
210 | * @param null|int $userId The ID of a user to execute the job as. Defaults to the current user |
||
211 | * @param null|int $queueName |
||
212 | * @return int |
||
213 | * @throws ValidationException |
||
214 | */ |
||
215 | public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $queueName = null) |
||
278 | |||
279 | /** |
||
280 | * Start a job (or however the queue handler determines it should be started) |
||
281 | * |
||
282 | * @param QueuedJobDescriptor $jobDescriptor |
||
283 | * @param string $startAfter |
||
284 | */ |
||
285 | public function startJob($jobDescriptor, $startAfter = null) |
||
294 | |||
295 | /** |
||
296 | * Check if maximum number of jobs are currently initialised. |
||
297 | * |
||
298 | * @return bool |
||
299 | */ |
||
300 | public function isAtMaxJobs() |
||
317 | |||
318 | /** |
||
319 | * Copies data from a job into a descriptor for persisting |
||
320 | * |
||
321 | * @param QueuedJob $job |
||
322 | * @param QueuedJobDescriptor $jobDescriptor |
||
323 | */ |
||
324 | protected function copyJobToDescriptor($job, $jobDescriptor) |
||
338 | |||
339 | /** |
||
340 | * @param QueuedJobDescriptor $jobDescriptor |
||
341 | * @param QueuedJob $job |
||
342 | */ |
||
343 | protected function copyDescriptorToJob($jobDescriptor, $job) |
||
364 | |||
365 | /** |
||
366 | * Check the current job queues and see if any of the jobs currently in there should be started. If so, |
||
367 | * return the next job that should be executed |
||
368 | * |
||
369 | * @param string $type Job type |
||
370 | * |
||
371 | * @return QueuedJobDescriptor|false |
||
372 | */ |
||
373 | public function getNextPendingJob($type = null) |
||
411 | |||
412 | /** |
||
413 | * Runs an explicit check on all currently running jobs to make sure their "processed" count is incrementing |
||
414 | * between each run. If it's not, then we need to flag it as paused due to an error. |
||
415 | * |
||
416 | * This typically happens when a PHP fatal error is thrown, which can't be picked up by the error |
||
417 | * handler or exception checker; in this case, we detect these stalled jobs later and fix (try) to |
||
418 | * fix them |
||
419 | * |
||
420 | * @param int $queue The queue to check against |
||
421 | */ |
||
422 | public function checkJobHealth($queue = null) |
||
474 | |||
475 | /** |
||
476 | * Checks through ll the scheduled jobs that are expected to exist |
||
477 | */ |
||
478 | public function checkDefaultJobs($queue = null) |
||
479 | { |
||
480 | $queue = $queue ?: QueuedJob::QUEUED; |
||
481 | if (count($this->defaultJobs)) { |
||
482 | $activeJobs = QueuedJobDescriptor::get()->filter( |
||
483 | 'JobStatus', |
||
484 | [ |
||
485 | QueuedJob::STATUS_NEW, |
||
486 | QueuedJob::STATUS_INIT, |
||
487 | QueuedJob::STATUS_RUN, |
||
488 | QueuedJob::STATUS_WAIT, |
||
489 | QueuedJob::STATUS_PAUSED, |
||
490 | ] |
||
491 | ); |
||
492 | foreach ($this->defaultJobs as $title => $jobConfig) { |
||
493 | if (!isset($jobConfig['filter']) || !isset($jobConfig['type'])) { |
||
494 | $this->getLogger()->error( |
||
495 | "Default Job config: $title incorrectly set up. Please check the readme for examples", |
||
496 | [ |
||
497 | 'file' => __FILE__, |
||
498 | 'line' => __LINE__, |
||
499 | ] |
||
500 | ); |
||
501 | continue; |
||
502 | } |
||
503 | $job = $activeJobs->filter(array_merge( |
||
504 | ['Implementation' => $jobConfig['type']], |
||
505 | $jobConfig['filter'] |
||
506 | )); |
||
507 | if (!$job->count()) { |
||
508 | $this->getLogger()->info( |
||
509 | "Default Job config: $title was missing from Queue", |
||
510 | [ |
||
511 | 'file' => __FILE__, |
||
512 | 'line' => __LINE__, |
||
513 | ] |
||
514 | ); |
||
515 | |||
516 | $to = isset($jobConfig['email']) |
||
517 | ? $jobConfig['email'] |
||
518 | : Config::inst()->get('Email', 'queued_job_admin_email'); |
||
519 | |||
520 | if ($to) { |
||
521 | Email::create() |
||
522 | ->setTo($to) |
||
523 | ->setFrom(Config::inst()->get('Email', 'queued_job_admin_email')) |
||
524 | ->setSubject('Default Job "' . $title . '" missing') |
||
525 | ->setData($jobConfig) |
||
526 | ->addData('Title', $title) |
||
527 | ->addData('Site', Director::absoluteBaseURL()) |
||
528 | ->setHTMLTemplate('QueuedJobsDefaultJob') |
||
529 | ->send(); |
||
530 | } |
||
531 | |||
532 | if (isset($jobConfig['recreate']) && $jobConfig['recreate']) { |
||
533 | if ( |
||
534 | !array_key_exists('construct', $jobConfig) |
||
535 | || !isset($jobConfig['startDateFormat']) |
||
536 | || !isset($jobConfig['startTimeString']) |
||
537 | ) { |
||
538 | $this->getLogger()->error( |
||
539 | "Default Job config: $title incorrectly set up. Please check the readme for examples", |
||
540 | [ |
||
541 | 'file' => __FILE__, |
||
542 | 'line' => __LINE__, |
||
543 | ] |
||
544 | ); |
||
545 | continue; |
||
546 | } |
||
547 | QueuedJobService::singleton()->queueJob( |
||
548 | Injector::inst()->createWithArgs($jobConfig['type'], $jobConfig['construct']), |
||
549 | date($jobConfig['startDateFormat'], strtotime($jobConfig['startTimeString'])) |
||
550 | ); |
||
551 | $this->getLogger()->info( |
||
552 | "Default Job config: $title has been re-added to the Queue", |
||
553 | [ |
||
554 | 'file' => __FILE__, |
||
555 | 'line' => __LINE__, |
||
556 | ] |
||
557 | ); |
||
558 | } |
||
559 | } |
||
560 | } |
||
561 | } |
||
562 | } |
||
563 | |||
564 | /** |
||
565 | * Attempt to restart a stalled job |
||
566 | * |
||
567 | * @param QueuedJobDescriptor $stalledJob |
||
568 | * |
||
569 | * @return bool True if the job was successfully restarted |
||
570 | */ |
||
571 | protected function restartStalledJob($stalledJob) |
||
572 | { |
||
573 | if ($stalledJob->ResumeCounts < static::config()->get('stall_threshold')) { |
||
574 | $stalledJob->restart(); |
||
575 | $logLevel = 'warning'; |
||
576 | $message = _t( |
||
577 | __CLASS__ . '.STALLED_JOB_RESTART_MSG', |
||
578 | 'A job named {name} (#{id}) appears to have stalled. It will be stopped and restarted, please ' |
||
579 | . 'login to make sure it has continued', |
||
580 | ['name' => $stalledJob->JobTitle, 'id' => $stalledJob->ID] |
||
581 | ); |
||
582 | } else { |
||
583 | $stalledJob->pause(); |
||
584 | $logLevel = 'error'; |
||
585 | $message = _t( |
||
586 | __CLASS__ . '.STALLED_JOB_MSG', |
||
587 | 'A job named {name} (#{id}) appears to have stalled. It has been paused, please login to check it', |
||
588 | ['name' => $stalledJob->JobTitle, 'id' => $stalledJob->ID] |
||
589 | ); |
||
590 | } |
||
591 | |||
592 | $this->getLogger()->log( |
||
593 | $logLevel, |
||
594 | $message, |
||
595 | [ |
||
596 | 'file' => __FILE__, |
||
597 | 'line' => __LINE__, |
||
598 | ] |
||
599 | ); |
||
600 | $from = Config::inst()->get(Email::class, 'admin_email'); |
||
601 | $to = Config::inst()->get(Email::class, 'queued_job_admin_email'); |
||
602 | $subject = _t(__CLASS__ . '.STALLED_JOB', 'Stalled job'); |
||
603 | |||
604 | if ($to) { |
||
605 | $mail = Email::create($from, $to, $subject) |
||
606 | ->setData([ |
||
607 | 'JobID' => $stalledJob->ID, |
||
608 | 'Message' => $message, |
||
609 | 'Site' => Director::absoluteBaseURL(), |
||
610 | ]) |
||
611 | ->setHTMLTemplate('QueuedJobsStalledJob'); |
||
612 | $mail->send(); |
||
613 | } |
||
614 | } |
||
615 | |||
616 | /** |
||
617 | * Prepares the given jobDescriptor for execution. Returns the job that |
||
618 | * will actually be run in a state ready for executing. |
||
619 | * |
||
620 | * Note that this is called each time a job is picked up to be executed from the cron |
||
621 | * job - meaning that jobs that are paused and restarted will have 'setup()' called on them again, |
||
622 | * so your job MUST detect that and act accordingly. |
||
623 | * |
||
624 | * @param QueuedJobDescriptor $jobDescriptor |
||
625 | * The Job descriptor of a job to prepare for execution |
||
626 | * |
||
627 | * @return QueuedJob|boolean |
||
628 | * @throws Exception |
||
629 | */ |
||
630 | protected function initialiseJob(QueuedJobDescriptor $jobDescriptor) |
||
631 | { |
||
632 | // create the job class |
||
633 | $impl = $jobDescriptor->Implementation; |
||
634 | /** @var QueuedJob $job */ |
||
635 | $job = Injector::inst()->create($impl); |
||
636 | /* @var $job QueuedJob */ |
||
637 | if (!$job) { |
||
638 | throw new Exception("Implementation $impl no longer exists"); |
||
639 | } |
||
640 | |||
641 | $jobDescriptor->JobStatus = QueuedJob::STATUS_INIT; |
||
642 | $jobDescriptor->write(); |
||
643 | |||
644 | // make sure the data is there |
||
645 | $this->copyDescriptorToJob($jobDescriptor, $job); |
||
646 | |||
647 | // see if it needs 'setup' or 'restart' called |
||
648 | if ($jobDescriptor->StepsProcessed <= 0) { |
||
649 | $job->setup(); |
||
650 | } else { |
||
651 | $job->prepareForRestart(); |
||
652 | } |
||
653 | |||
654 | // make sure the descriptor is up to date with anything changed |
||
655 | $this->copyJobToDescriptor($job, $jobDescriptor); |
||
656 | $jobDescriptor->write(); |
||
657 | |||
658 | return $job; |
||
659 | } |
||
660 | |||
661 | /** |
||
662 | * Given a {@link QueuedJobDescriptor} mark the job as initialised. Works sort of like a mutex. |
||
663 | * Currently a database lock isn't entirely achievable, due to database adapters not supporting locks. |
||
664 | * This may still have a race condition, but this should minimise the possibility. |
||
665 | * Side effect is the job status will be changed to "Initialised". |
||
666 | * |
||
667 | * Assumption is the job has a status of "Queued" or "Wait". |
||
668 | * |
||
669 | * @param QueuedJobDescriptor $jobDescriptor |
||
670 | * |
||
671 | * @return boolean |
||
672 | */ |
||
673 | protected function grabMutex(QueuedJobDescriptor $jobDescriptor) |
||
674 | { |
||
675 | // write the status and determine if any rows were affected, for protection against a |
||
676 | // potential race condition where two or more processes init the same job at once. |
||
677 | // This deliberately does not use write() as that would always update LastEdited |
||
678 | // and thus the row would always be affected. |
||
679 | try { |
||
680 | DB::query(sprintf( |
||
681 | 'UPDATE "QueuedJobDescriptor" SET "JobStatus" = \'%s\' WHERE "ID" = %s' |
||
682 | . ' AND "JobFinished" IS NULL AND "JobStatus" NOT IN (%s)', |
||
683 | QueuedJob::STATUS_INIT, |
||
684 | $jobDescriptor->ID, |
||
685 | "'" . QueuedJob::STATUS_RUN . "', '" . QueuedJob::STATUS_COMPLETE . "', '" |
||
686 | . QueuedJob::STATUS_PAUSED . "', '" . QueuedJob::STATUS_CANCELLED . "'" |
||
687 | )); |
||
688 | } catch (Exception $e) { |
||
689 | return false; |
||
690 | } |
||
691 | |||
692 | if (DB::get_conn()->affectedRows() === 0 && $jobDescriptor->JobStatus !== QueuedJob::STATUS_INIT) { |
||
693 | return false; |
||
694 | } |
||
695 | |||
696 | return true; |
||
697 | } |
||
698 | |||
699 | /** |
||
700 | * Start the actual execution of a job. |
||
701 | * The assumption is the jobID refers to a {@link QueuedJobDescriptor} that is status set as "Queued". |
||
702 | * |
||
703 | * This method will continue executing until the job says it's completed |
||
704 | * |
||
705 | * @param int $jobId |
||
706 | * The ID of the job to start executing |
||
707 | * |
||
708 | * @return boolean |
||
709 | * @throws Exception |
||
710 | */ |
||
711 | public function runJob($jobId) |
||
712 | { |
||
713 | // first retrieve the descriptor |
||
714 | /** @var QueuedJobDescriptor $jobDescriptor */ |
||
715 | $jobDescriptor = DataObject::get_by_id( |
||
716 | QueuedJobDescriptor::class, |
||
717 | (int)$jobId |
||
718 | ); |
||
719 | if (!$jobDescriptor) { |
||
720 | throw new Exception("$jobId is invalid"); |
||
721 | } |
||
722 | |||
723 | // now lets see whether we have a current user to run as. Typically, if the job is executing via the CLI, |
||
724 | // we want it to actually execute as the RunAs user - however, if running via the web (which is rare...), we |
||
725 | // want to ensure that the current user has admin privileges before switching. Otherwise, we just run it |
||
726 | // as the currently logged in user and hope for the best |
||
727 | |||
728 | // We need to use $_SESSION directly because SS ties the session to a controller that no longer exists at |
||
729 | // this point of execution in some circumstances |
||
730 | $originalUserID = isset($_SESSION['loggedInAs']) ? $_SESSION['loggedInAs'] : 0; |
||
731 | /** @var Member|null $originalUser */ |
||
732 | $originalUser = $originalUserID |
||
733 | ? DataObject::get_by_id(Member::class, $originalUserID) |
||
734 | : null; |
||
735 | $runAsUser = null; |
||
736 | |||
737 | // If the Job has requested that we run it as a particular user, then we should try and do that. |
||
738 | if ($jobDescriptor->RunAs() !== null) { |
||
739 | $runAsUser = $this->setRunAsUser($jobDescriptor->RunAs(), $originalUser); |
||
740 | } |
||
741 | |||
742 | // set up a custom error handler for this processing |
||
743 | $errorHandler = new JobErrorHandler(); |
||
744 | |||
745 | $job = null; |
||
746 | |||
747 | $broken = false; |
||
748 | |||
749 | // Push a config context onto the stack for the duration of this job run. |
||
750 | Config::nest(); |
||
751 | |||
752 | if ($this->grabMutex($jobDescriptor)) { |
||
753 | try { |
||
754 | $job = $this->initialiseJob($jobDescriptor); |
||
755 | |||
756 | // get the job ready to begin. |
||
757 | if (!$jobDescriptor->JobStarted) { |
||
758 | $jobDescriptor->JobStarted = DBDatetime::now()->Rfc2822(); |
||
759 | } else { |
||
760 | $jobDescriptor->JobRestarted = DBDatetime::now()->Rfc2822(); |
||
761 | } |
||
762 | |||
763 | // Only write to job as "Running" if 'isComplete' was NOT set to true |
||
764 | // during setup() or prepareForRestart() |
||
765 | if (!$job->jobFinished()) { |
||
766 | $jobDescriptor->JobStatus = QueuedJob::STATUS_RUN; |
||
767 | $jobDescriptor->write(); |
||
768 | } |
||
769 | |||
770 | $lastStepProcessed = 0; |
||
771 | // have we stalled at all? |
||
772 | $stallCount = 0; |
||
773 | |||
774 | if (class_exists(Subsite::class) && !empty($job->SubsiteID)) { |
||
775 | Subsite::changeSubsite($job->SubsiteID); |
||
776 | |||
777 | // lets set the base URL as far as Director is concerned so that our URLs are correct |
||
778 | /** @var Subsite $subsite */ |
||
779 | $subsite = DataObject::get_by_id(Subsite::class, $job->SubsiteID); |
||
780 | if ($subsite && $subsite->exists()) { |
||
781 | $domain = $subsite->domain(); |
||
782 | $base = rtrim(Director::protocol() . $domain, '/') . '/'; |
||
783 | |||
784 | Config::modify()->set(Director::class, 'alternate_base_url', $base); |
||
785 | } |
||
786 | } |
||
787 | |||
788 | // while not finished |
||
789 | while (!$job->jobFinished() && !$broken) { |
||
790 | // see that we haven't been set to 'paused' or otherwise by another process |
||
791 | /** @var QueuedJobDescriptor $jobDescriptor */ |
||
792 | $jobDescriptor = DataObject::get_by_id( |
||
793 | QueuedJobDescriptor::class, |
||
794 | (int)$jobId |
||
795 | ); |
||
796 | if (!$jobDescriptor || !$jobDescriptor->exists()) { |
||
797 | $broken = true; |
||
798 | $this->getLogger()->error( |
||
799 | print_r( |
||
800 | [ |
||
801 | 'errno' => 0, |
||
802 | 'errstr' => 'Job descriptor ' . $jobId . ' could not be found', |
||
803 | 'errfile' => __FILE__, |
||
804 | 'errline' => __LINE__, |
||
805 | 'errcontext' => [], |
||
806 | ], |
||
807 | true |
||
808 | ), |
||
809 | [ |
||
810 | 'file' => __FILE__, |
||
811 | 'line' => __LINE__, |
||
812 | ] |
||
813 | ); |
||
814 | break; |
||
815 | } |
||
816 | View Code Duplication | if ($jobDescriptor->JobStatus != QueuedJob::STATUS_RUN) { |
|
817 | // we've been paused by something, so we'll just exit |
||
818 | $job->addMessage(_t( |
||
819 | __CLASS__ . '.JOB_PAUSED', |
||
820 | 'Job paused at {time}', |
||
821 | ['time' => DBDatetime::now()->Rfc2822()] |
||
822 | )); |
||
823 | $broken = true; |
||
824 | } |
||
825 | |||
826 | if (!$broken) { |
||
827 | // Inject real-time log handler |
||
828 | $logger = Injector::inst()->get(LoggerInterface::class); |
||
829 | if ($logger instanceof Logger) { |
||
830 | // Check if there is already a handler |
||
831 | $exists = false; |
||
832 | foreach ($logger->getHandlers() as $handler) { |
||
833 | if ($handler instanceof QueuedJobHandler) { |
||
834 | $exists = true; |
||
835 | break; |
||
836 | } |
||
837 | } |
||
838 | |||
839 | if (!$exists) { |
||
840 | // Add the handler |
||
841 | /** @var QueuedJobHandler $queuedJobHandler */ |
||
842 | $queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor); |
||
843 | |||
844 | // We only write for every 100 file |
||
845 | $bufferHandler = new BufferHandler( |
||
846 | $queuedJobHandler, |
||
847 | 100, |
||
848 | Logger::DEBUG, |
||
849 | true, |
||
850 | true |
||
851 | ); |
||
852 | |||
853 | $logger->pushHandler($bufferHandler); |
||
854 | } |
||
855 | } else { |
||
856 | if ($logger instanceof LoggerInterface) { |
||
857 | $logger->warning( |
||
858 | 'Monolog not found, messages will not output while the job is running' |
||
859 | ); |
||
860 | } |
||
861 | } |
||
862 | |||
863 | // Collect output as job messages as well as sending it to the screen after processing |
||
864 | $obLogger = function ($buffer, $phase) use ($job, $jobDescriptor) { |
||
865 | $job->addMessage($buffer); |
||
866 | if ($jobDescriptor) { |
||
867 | $this->copyJobToDescriptor($job, $jobDescriptor); |
||
868 | $jobDescriptor->write(); |
||
869 | } |
||
870 | return $buffer; |
||
871 | }; |
||
872 | ob_start($obLogger, 256); |
||
873 | |||
874 | try { |
||
875 | $job->process(); |
||
876 | } catch (Exception $e) { |
||
877 | // okay, we'll just catch this exception for now |
||
878 | $job->addMessage( |
||
879 | _t( |
||
880 | __CLASS__ . '.JOB_EXCEPT', |
||
881 | 'Job caused exception {message} in {file} at line {line}', |
||
882 | [ |
||
883 | 'message' => $e->getMessage(), |
||
884 | 'file' => $e->getFile(), |
||
885 | 'line' => $e->getLine(), |
||
886 | ] |
||
887 | ) |
||
888 | ); |
||
889 | $this->getLogger()->error( |
||
890 | $e->getMessage(), |
||
891 | [ |
||
892 | 'exception' => $e, |
||
893 | ] |
||
894 | ); |
||
895 | $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; |
||
896 | $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e); |
||
897 | } |
||
898 | |||
899 | // Write any remaining batched messages at the end |
||
900 | if (isset($bufferHandler)) { |
||
901 | $bufferHandler->flush(); |
||
902 | } |
||
903 | |||
904 | ob_end_flush(); |
||
905 | |||
906 | // now check the job state |
||
907 | $data = $job->getJobData(); |
||
908 | if ($data->currentStep == $lastStepProcessed) { |
||
909 | $stallCount++; |
||
910 | } |
||
911 | |||
912 | View Code Duplication | if ($stallCount > static::config()->get('stall_threshold')) { |
|
913 | $broken = true; |
||
914 | $job->addMessage( |
||
915 | _t( |
||
916 | __CLASS__ . '.JOB_STALLED', |
||
917 | 'Job stalled after {attempts} attempts - please check', |
||
918 | ['attempts' => $stallCount] |
||
919 | ) |
||
920 | ); |
||
921 | $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; |
||
922 | } |
||
923 | |||
924 | // now we'll be good and check our memory usage. If it is too high, we'll set the job to |
||
925 | // a 'Waiting' state, and let the next processing run pick up the job. |
||
926 | if ($this->isMemoryTooHigh()) { |
||
927 | $job->addMessage( |
||
928 | _t( |
||
929 | __CLASS__ . '.MEMORY_RELEASE', |
||
930 | 'Job releasing memory and waiting ({used} used)', |
||
931 | ['used' => $this->humanReadable($this->getMemoryUsage())] |
||
932 | ) |
||
933 | ); |
||
934 | if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) { |
||
935 | $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; |
||
936 | } |
||
937 | $broken = true; |
||
938 | } |
||
939 | |||
940 | // Also check if we are running too long |
||
941 | if ($this->hasPassedTimeLimit()) { |
||
942 | $job->addMessage(_t( |
||
943 | __CLASS__ . '.TIME_LIMIT', |
||
944 | 'Queue has passed time limit and will restart before continuing' |
||
945 | )); |
||
946 | if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) { |
||
947 | $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; |
||
948 | } |
||
949 | $broken = true; |
||
950 | } |
||
951 | } |
||
952 | |||
953 | if ($jobDescriptor) { |
||
954 | $this->copyJobToDescriptor($job, $jobDescriptor); |
||
955 | $jobDescriptor->write(); |
||
956 | View Code Duplication | } else { |
|
957 | $this->getLogger()->error( |
||
958 | print_r( |
||
959 | [ |
||
960 | 'errno' => 0, |
||
961 | 'errstr' => 'Job descriptor has been set to null', |
||
962 | 'errfile' => __FILE__, |
||
963 | 'errline' => __LINE__, |
||
964 | 'errcontext' => [], |
||
965 | ], |
||
966 | true |
||
967 | ), |
||
968 | [ |
||
969 | 'file' => __FILE__, |
||
970 | 'line' => __LINE__, |
||
971 | ] |
||
972 | ); |
||
973 | $broken = true; |
||
974 | } |
||
975 | } |
||
976 | |||
977 | // a last final save. The job is complete by now |
||
978 | if ($jobDescriptor) { |
||
979 | $jobDescriptor->write(); |
||
980 | } |
||
981 | |||
982 | if ($job->jobFinished()) { |
||
983 | /** @var AbstractQueuedJob|QueuedJob $job */ |
||
984 | $job->afterComplete(); |
||
985 | $jobDescriptor->cleanupJob(); |
||
986 | |||
987 | $this->extend('updateJobDescriptorAndJobOnCompletion', $jobDescriptor, $job); |
||
988 | } |
||
989 | } catch (Exception $e) { |
||
990 | // PHP 5.6 exception handling |
||
991 | $this->handleBrokenJobException($jobDescriptor, $job, $e); |
||
992 | $broken = true; |
||
993 | } catch (\Throwable $e) { |
||
994 | // PHP 7 Error handling) |
||
995 | $this->handleBrokenJobException($jobDescriptor, $job, $e); |
||
996 | $broken = true; |
||
997 | } |
||
998 | } |
||
999 | |||
1000 | $errorHandler->clear(); |
||
1001 | |||
1002 | Config::unnest(); |
||
1003 | |||
1004 | $this->unsetRunAsUser($runAsUser, $originalUser); |
||
1005 | |||
1006 | return !$broken; |
||
1007 | } |
||
1008 | |||
1009 | /** |
||
1010 | * @param QueuedJobDescriptor $jobDescriptor |
||
1011 | * @param QueuedJob $job |
||
1012 | * @param Exception|\Throwable $e |
||
1013 | */ |
||
1014 | protected function handleBrokenJobException(QueuedJobDescriptor $jobDescriptor, QueuedJob $job, $e) |
||
1015 | { |
||
1016 | // okay, we'll just catch this exception for now |
||
1017 | $this->getLogger()->info( |
||
1018 | $e->getMessage(), |
||
1019 | [ |
||
1020 | 'exception' => $e, |
||
1021 | ] |
||
1022 | ); |
||
1023 | $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; |
||
1024 | $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e); |
||
1025 | $jobDescriptor->write(); |
||
1026 | } |
||
1027 | |||
1028 | /** |
||
1029 | * @param Member $runAsUser |
||
1030 | * @param Member|null $originalUser |
||
1031 | * @return null|Member |
||
1032 | */ |
||
1033 | protected function setRunAsUser(Member $runAsUser, Member $originalUser = null) |
||
1061 | |||
1062 | /** |
||
1063 | * @param Member|null $runAsUser |
||
1064 | * @param Member|null $originalUser |
||
1065 | */ |
||
1066 | protected function unsetRunAsUser(Member $runAsUser = null, Member $originalUser = null) |
||
1091 | |||
1092 | /** |
||
1093 | * Start timer |
||
1094 | */ |
||
1095 | protected function markStarted() |
||
1101 | |||
1102 | /** |
||
1103 | * Is execution time too long? |
||
1104 | * |
||
1105 | * @return bool True if the script has passed the configured time_limit |
||
1106 | */ |
||
1107 | protected function hasPassedTimeLimit() |
||
1122 | |||
1123 | /** |
||
1124 | * Is memory usage too high? |
||
1125 | * |
||
1126 | * @return bool |
||
1127 | */ |
||
1128 | protected function isMemoryTooHigh() |
||
1134 | |||
1135 | /** |
||
1136 | * Get peak memory usage of this application |
||
1137 | * |
||
1138 | * @return float |
||
1139 | */ |
||
1140 | protected function getMemoryUsage() |
||
1147 | |||
1148 | /** |
||
1149 | * Determines the memory limit (in bytes) for this application |
||
1150 | * Limits to the smaller of memory_limit configured via php.ini or silverstripe config |
||
1151 | * |
||
1152 | * @return float Memory limit in bytes |
||
1153 | */ |
||
1154 | protected function getMemoryLimit() |
||
1168 | |||
1169 | /** |
||
1170 | * Calculate the current memory limit of the server |
||
1171 | * |
||
1172 | * @return float |
||
1173 | */ |
||
1174 | protected function getPHPMemoryLimit() |
||
1178 | |||
1179 | /** |
||
1180 | * Convert memory limit string to bytes. |
||
1181 | * Based on implementation in install.php5 |
||
1182 | * |
||
1183 | * @param string $memString |
||
1184 | * |
||
1185 | * @return float |
||
1186 | */ |
||
1187 | protected function parseMemory($memString) |
||
1202 | |||
1203 | protected function humanReadable($size) |
||
1208 | |||
1209 | |||
1210 | /** |
||
1211 | * Gets a list of all the current jobs (or jobs that have recently finished) |
||
1212 | * |
||
1213 | * @param string $type |
||
1214 | * if we're after a particular job list |
||
1215 | * @param int $includeUpUntil |
||
1216 | * The number of seconds to include jobs that have just finished, allowing a job list to be built that |
||
1217 | * includes recently finished jobs |
||
1218 | * |
||
1219 | * @return DataList|QueuedJobDescriptor[] |
||
1220 | */ |
||
1221 | public function getJobList($type = null, $includeUpUntil = 0) |
||
1228 | |||
1229 | /** |
||
1230 | * Return the SQL filter used to get the job list - this is used by the UI for displaying the job list... |
||
1231 | * |
||
1232 | * @param string $type |
||
1233 | * if we're after a particular job list |
||
1234 | * @param int $includeUpUntil |
||
1235 | * The number of seconds to include jobs that have just finished, allowing a job list to be built that |
||
1236 | * includes recently finished jobs |
||
1237 | * |
||
1238 | * @return string |
||
1239 | */ |
||
1240 | public function getJobListFilter($type = null, $includeUpUntil = 0) |
||
1259 | |||
1260 | /** |
||
1261 | * Process the job queue with the current queue runner |
||
1262 | * |
||
1263 | * @param string $queue |
||
1264 | */ |
||
1265 | public function runQueue($queue) |
||
1273 | |||
1274 | /** |
||
1275 | * Process all jobs from a given queue |
||
1276 | * |
||
1277 | * @param string $name The job queue to completely process |
||
1278 | */ |
||
1279 | public function processJobQueue($name) |
||
1304 | |||
1305 | /** |
||
1306 | * When PHP shuts down, we want to process all of the immediate queue items |
||
1307 | * |
||
1308 | * We use the 'getNextPendingJob' method, instead of just iterating the queue, to ensure |
||
1309 | * we ignore paused or stalled jobs. |
||
1310 | */ |
||
1311 | public function onShutdown() |
||
1315 | |||
1316 | /** |
||
1317 | * Get a logger |
||
1318 | * |
||
1319 | * @return LoggerInterface |
||
1320 | */ |
||
1321 | public function getLogger() |
||
1325 | |||
1326 | public function enableMaintenanceLock() |
||
1337 | |||
1338 | View Code Duplication | public function disableMaintenanceLock() |
|
1351 | |||
1352 | /** |
||
1353 | * @return bool |
||
1354 | */ |
||
1355 | View Code Duplication | public function isMaintenanceLockActive() |
|
1365 | |||
1366 | /** |
||
1367 | * @return string |
||
1368 | */ |
||
1369 | private function lockFilePath() |
||
1378 | } |
||
1379 |
Since your code implements the magic setter
_set
, this function will be called for any write access on an undefined variable. You can add the@property
annotation to your class or interface to document the existence of this variable.Since the property has write access only, you can use the @property-write annotation instead.
Of course, you may also just have mistyped another name, in which case you should fix the error.
See also the PhpDoc documentation for @property.