Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like QueuedJobService often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use QueuedJobService, and based on these observations, apply Extract Interface, too.
| 1 | <?php |
||
| 51 | class QueuedJobService |
||
| 52 | { |
||
| 53 | use Configurable; |
||
| 54 | use Injectable; |
||
| 55 | use Extensible; |
||
| 56 | |||
| 57 | /** |
||
| 58 | * @config |
||
| 59 | * @var int |
||
| 60 | */ |
||
| 61 | private static $stall_threshold = 3; |
||
| 62 | |||
| 63 | /** |
||
| 64 | * How much ram will we allow before pausing and releasing the memory? |
||
| 65 | * |
||
| 66 | * For instance, set to 268435456 (256MB) to pause this process if used memory exceeds |
||
| 67 | * this value. This needs to be set to a value lower than the php_ini max_memory as |
||
| 68 | * the system will otherwise crash before shutdown can be handled gracefully. |
||
| 69 | * |
||
| 70 | * This was increased to 256MB for SilverStripe 4.x as framework uses more memory than 3.x |
||
| 71 | * |
||
| 72 | * @var int |
||
| 73 | * @config |
||
| 74 | */ |
||
| 75 | private static $memory_limit = 268435456; |
||
| 76 | |||
| 77 | /** |
||
| 78 | * Optional time limit (in seconds) to run the service before restarting to release resources. |
||
| 79 | * |
||
| 80 | * Defaults to no limit. |
||
| 81 | * |
||
| 82 | * @var int |
||
| 83 | * @config |
||
| 84 | */ |
||
| 85 | private static $time_limit = 0; |
||
| 86 | |||
| 87 | /** |
||
| 88 | * Disable health checks that usually occur when a runner first picks up a queue. Note that completely disabling |
||
| 89 | * health checks could result in many jobs that are always marked as running - that will never be restarted. If |
||
| 90 | * this option is disabled you may alternatively use the build task |
||
| 91 | * |
||
| 92 | * @see \Symbiote\QueuedJobs\Tasks\CheckJobHealthTask |
||
| 93 | * |
||
| 94 | * @var bool |
||
| 95 | * @config |
||
| 96 | */ |
||
| 97 | private static $disable_health_check = false; |
||
| 98 | |||
| 99 | /** |
||
| 100 | * Maximum number of jobs that can be initialised at any one time. |
||
| 101 | * |
||
| 102 | * Prevents too many jobs getting into this state in case something goes wrong with the child processes. |
||
| 103 | * We shouldn't have too many jobs in the initialising state anyway. |
||
| 104 | * |
||
| 105 | * Valid values: |
||
| 106 | * 0 - unlimited (default) |
||
| 107 | * greater than 0 - maximum number of jobs in initialised state |
||
| 108 | * |
||
| 109 | * @var int |
||
| 110 | * @config |
||
| 111 | */ |
||
| 112 | private static $max_init_jobs = 0; |
||
| 113 | |||
| 114 | /** |
||
| 115 | * Timestamp (in seconds) when the queue was started |
||
| 116 | * |
||
| 117 | * @var int |
||
| 118 | */ |
||
| 119 | protected $startedAt = 0; |
||
| 120 | |||
| 121 | /** |
||
| 122 | * Should "immediate" jobs be managed using the shutdown function? |
||
| 123 | * |
||
| 124 | * It is recommended you set up an inotify watch and use that for |
||
| 125 | * triggering immediate jobs. See the wiki for more information |
||
| 126 | * |
||
| 127 | * @var boolean |
||
| 128 | * @config |
||
| 129 | */ |
||
| 130 | private static $use_shutdown_function = true; |
||
| 131 | |||
| 132 | /** |
||
| 133 | * The location for immediate jobs to be stored in |
||
| 134 | * |
||
| 135 | * @var string |
||
| 136 | * @config |
||
| 137 | */ |
||
| 138 | private static $cache_dir = 'queuedjobs'; |
||
| 139 | |||
| 140 | /** |
||
| 141 | * Maintenance lock file feature enabled / disable setting |
||
| 142 | * |
||
| 143 | * @config |
||
| 144 | * @var bool |
||
| 145 | */ |
||
| 146 | private static $lock_file_enabled = false; |
||
| 147 | |||
| 148 | /** |
||
| 149 | * Maintenance lock file name |
||
| 150 | * |
||
| 151 | * @config |
||
| 152 | * @var string |
||
| 153 | */ |
||
| 154 | private static $lock_file_name = 'maintenance-lock.txt'; |
||
| 155 | |||
| 156 | /** |
||
| 157 | * Maintenance lock path (relative path starting at the base folder) |
||
| 158 | * Note that this path needs to point to a folder on a shared drive if multiple instances are used |
||
| 159 | * |
||
| 160 | * @config |
||
| 161 | * @var string |
||
| 162 | */ |
||
| 163 | private static $lock_file_path = ''; |
||
| 164 | |||
| 165 | /** |
||
| 166 | * @var DefaultQueueHandler |
||
| 167 | */ |
||
| 168 | public $queueHandler; |
||
| 169 | |||
| 170 | /** |
||
| 171 | * |
||
| 172 | * @var TaskRunnerEngine |
||
| 173 | */ |
||
| 174 | public $queueRunner; |
||
| 175 | |||
| 176 | /** |
||
| 177 | * Config controlled list of default/required jobs |
||
| 178 | * |
||
| 179 | * @var array |
||
| 180 | */ |
||
| 181 | public $defaultJobs = []; |
||
| 182 | |||
| 183 | /** |
||
| 184 | * Register our shutdown handler |
||
| 185 | */ |
||
| 186 | public function __construct() |
||
| 204 | |||
| 205 | /** |
||
| 206 | * Adds a job to the queue to be started |
||
| 207 | * |
||
| 208 | * @param QueuedJob $job The job to start. |
||
| 209 | * @param null|string $startAfter The date (in Y-m-d H:i:s format) to start execution after |
||
| 210 | * @param null|int $userId The ID of a user to execute the job as. Defaults to the current user |
||
| 211 | * @param null|int $queueName |
||
| 212 | * @return int |
||
| 213 | * @throws ValidationException |
||
| 214 | */ |
||
| 215 | public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $queueName = null) |
||
| 278 | |||
| 279 | /** |
||
| 280 | * Start a job (or however the queue handler determines it should be started) |
||
| 281 | * |
||
| 282 | * @param QueuedJobDescriptor $jobDescriptor |
||
| 283 | * @param string $startAfter |
||
| 284 | */ |
||
| 285 | public function startJob($jobDescriptor, $startAfter = null) |
||
| 294 | |||
| 295 | /** |
||
| 296 | * Check if maximum number of jobs are currently initialised. |
||
| 297 | * |
||
| 298 | * @return bool |
||
| 299 | */ |
||
| 300 | public function isAtMaxJobs() |
||
| 317 | |||
| 318 | /** |
||
| 319 | * Copies data from a job into a descriptor for persisting |
||
| 320 | * |
||
| 321 | * @param QueuedJob $job |
||
| 322 | * @param QueuedJobDescriptor $jobDescriptor |
||
| 323 | */ |
||
| 324 | protected function copyJobToDescriptor($job, $jobDescriptor) |
||
| 338 | |||
| 339 | /** |
||
| 340 | * @param QueuedJobDescriptor $jobDescriptor |
||
| 341 | * @param QueuedJob $job |
||
| 342 | */ |
||
| 343 | protected function copyDescriptorToJob($jobDescriptor, $job) |
||
| 364 | |||
| 365 | /** |
||
| 366 | * Check the current job queues and see if any of the jobs currently in there should be started. If so, |
||
| 367 | * return the next job that should be executed |
||
| 368 | * |
||
| 369 | * @param string $type Job type |
||
| 370 | * |
||
| 371 | * @return QueuedJobDescriptor|false |
||
| 372 | */ |
||
| 373 | public function getNextPendingJob($type = null) |
||
| 411 | |||
| 412 | /** |
||
| 413 | * Runs an explicit check on all currently running jobs to make sure their "processed" count is incrementing |
||
| 414 | * between each run. If it's not, then we need to flag it as paused due to an error. |
||
| 415 | * |
||
| 416 | * This typically happens when a PHP fatal error is thrown, which can't be picked up by the error |
||
| 417 | * handler or exception checker; in this case, we detect these stalled jobs later and fix (try) to |
||
| 418 | * fix them |
||
| 419 | * |
||
| 420 | * @param int $queue The queue to check against |
||
| 421 | */ |
||
| 422 | public function checkJobHealth($queue = null) |
||
| 474 | |||
| 475 | /** |
||
| 476 | * Checks through ll the scheduled jobs that are expected to exist |
||
| 477 | */ |
||
| 478 | public function checkDefaultJobs($queue = null) |
||
| 479 | { |
||
| 480 | $queue = $queue ?: QueuedJob::QUEUED; |
||
| 481 | if (count($this->defaultJobs)) { |
||
| 482 | $activeJobs = QueuedJobDescriptor::get()->filter( |
||
| 483 | 'JobStatus', |
||
| 484 | [ |
||
| 485 | QueuedJob::STATUS_NEW, |
||
| 486 | QueuedJob::STATUS_INIT, |
||
| 487 | QueuedJob::STATUS_RUN, |
||
| 488 | QueuedJob::STATUS_WAIT, |
||
| 489 | QueuedJob::STATUS_PAUSED, |
||
| 490 | ] |
||
| 491 | ); |
||
| 492 | foreach ($this->defaultJobs as $title => $jobConfig) { |
||
| 493 | if (!isset($jobConfig['filter']) || !isset($jobConfig['type'])) { |
||
| 494 | $this->getLogger()->error( |
||
| 495 | "Default Job config: $title incorrectly set up. Please check the readme for examples", |
||
| 496 | [ |
||
| 497 | 'file' => __FILE__, |
||
| 498 | 'line' => __LINE__, |
||
| 499 | ] |
||
| 500 | ); |
||
| 501 | continue; |
||
| 502 | } |
||
| 503 | $job = $activeJobs->filter(array_merge( |
||
| 504 | ['Implementation' => $jobConfig['type']], |
||
| 505 | $jobConfig['filter'] |
||
| 506 | )); |
||
| 507 | if (!$job->count()) { |
||
| 508 | $this->getLogger()->info( |
||
| 509 | "Default Job config: $title was missing from Queue", |
||
| 510 | [ |
||
| 511 | 'file' => __FILE__, |
||
| 512 | 'line' => __LINE__, |
||
| 513 | ] |
||
| 514 | ); |
||
| 515 | |||
| 516 | $to = isset($jobConfig['email']) |
||
| 517 | ? $jobConfig['email'] |
||
| 518 | : Config::inst()->get('Email', 'queued_job_admin_email'); |
||
| 519 | |||
| 520 | if ($to) { |
||
| 521 | Email::create() |
||
| 522 | ->setTo($to) |
||
| 523 | ->setFrom(Config::inst()->get('Email', 'queued_job_admin_email')) |
||
| 524 | ->setSubject('Default Job "' . $title . '" missing') |
||
| 525 | ->setData($jobConfig) |
||
| 526 | ->addData('Title', $title) |
||
| 527 | ->addData('Site', Director::absoluteBaseURL()) |
||
| 528 | ->setHTMLTemplate('QueuedJobsDefaultJob') |
||
| 529 | ->send(); |
||
| 530 | } |
||
| 531 | |||
| 532 | if (isset($jobConfig['recreate']) && $jobConfig['recreate']) { |
||
| 533 | if ( |
||
| 534 | !array_key_exists('construct', $jobConfig) |
||
| 535 | || !isset($jobConfig['startDateFormat']) |
||
| 536 | || !isset($jobConfig['startTimeString']) |
||
| 537 | ) { |
||
| 538 | $this->getLogger()->error( |
||
| 539 | "Default Job config: $title incorrectly set up. Please check the readme for examples", |
||
| 540 | [ |
||
| 541 | 'file' => __FILE__, |
||
| 542 | 'line' => __LINE__, |
||
| 543 | ] |
||
| 544 | ); |
||
| 545 | continue; |
||
| 546 | } |
||
| 547 | QueuedJobService::singleton()->queueJob( |
||
| 548 | Injector::inst()->createWithArgs($jobConfig['type'], $jobConfig['construct']), |
||
| 549 | date($jobConfig['startDateFormat'], strtotime($jobConfig['startTimeString'])) |
||
| 550 | ); |
||
| 551 | $this->getLogger()->info( |
||
| 552 | "Default Job config: $title has been re-added to the Queue", |
||
| 553 | [ |
||
| 554 | 'file' => __FILE__, |
||
| 555 | 'line' => __LINE__, |
||
| 556 | ] |
||
| 557 | ); |
||
| 558 | } |
||
| 559 | } |
||
| 560 | } |
||
| 561 | } |
||
| 562 | } |
||
| 563 | |||
| 564 | /** |
||
| 565 | * Attempt to restart a stalled job |
||
| 566 | * |
||
| 567 | * @param QueuedJobDescriptor $stalledJob |
||
| 568 | * |
||
| 569 | * @return bool True if the job was successfully restarted |
||
| 570 | */ |
||
| 571 | protected function restartStalledJob($stalledJob) |
||
| 572 | { |
||
| 573 | if ($stalledJob->ResumeCounts < static::config()->get('stall_threshold')) { |
||
| 574 | $stalledJob->restart(); |
||
| 575 | $logLevel = 'warning'; |
||
| 576 | $message = _t( |
||
| 577 | __CLASS__ . '.STALLED_JOB_RESTART_MSG', |
||
| 578 | 'A job named {name} (#{id}) appears to have stalled. It will be stopped and restarted, please ' |
||
| 579 | . 'login to make sure it has continued', |
||
| 580 | ['name' => $stalledJob->JobTitle, 'id' => $stalledJob->ID] |
||
| 581 | ); |
||
| 582 | } else { |
||
| 583 | $stalledJob->pause(); |
||
| 584 | $logLevel = 'error'; |
||
| 585 | $message = _t( |
||
| 586 | __CLASS__ . '.STALLED_JOB_MSG', |
||
| 587 | 'A job named {name} (#{id}) appears to have stalled. It has been paused, please login to check it', |
||
| 588 | ['name' => $stalledJob->JobTitle, 'id' => $stalledJob->ID] |
||
| 589 | ); |
||
| 590 | } |
||
| 591 | |||
| 592 | $this->getLogger()->log( |
||
| 593 | $logLevel, |
||
| 594 | $message, |
||
| 595 | [ |
||
| 596 | 'file' => __FILE__, |
||
| 597 | 'line' => __LINE__, |
||
| 598 | ] |
||
| 599 | ); |
||
| 600 | $from = Config::inst()->get(Email::class, 'admin_email'); |
||
| 601 | $to = Config::inst()->get(Email::class, 'queued_job_admin_email'); |
||
| 602 | $subject = _t(__CLASS__ . '.STALLED_JOB', 'Stalled job'); |
||
| 603 | |||
| 604 | if ($to) { |
||
| 605 | $mail = Email::create($from, $to, $subject) |
||
| 606 | ->setData([ |
||
| 607 | 'JobID' => $stalledJob->ID, |
||
| 608 | 'Message' => $message, |
||
| 609 | 'Site' => Director::absoluteBaseURL(), |
||
| 610 | ]) |
||
| 611 | ->setHTMLTemplate('QueuedJobsStalledJob'); |
||
| 612 | $mail->send(); |
||
| 613 | } |
||
| 614 | } |
||
| 615 | |||
| 616 | /** |
||
| 617 | * Prepares the given jobDescriptor for execution. Returns the job that |
||
| 618 | * will actually be run in a state ready for executing. |
||
| 619 | * |
||
| 620 | * Note that this is called each time a job is picked up to be executed from the cron |
||
| 621 | * job - meaning that jobs that are paused and restarted will have 'setup()' called on them again, |
||
| 622 | * so your job MUST detect that and act accordingly. |
||
| 623 | * |
||
| 624 | * @param QueuedJobDescriptor $jobDescriptor |
||
| 625 | * The Job descriptor of a job to prepare for execution |
||
| 626 | * |
||
| 627 | * @return QueuedJob|boolean |
||
| 628 | * @throws Exception |
||
| 629 | */ |
||
| 630 | protected function initialiseJob(QueuedJobDescriptor $jobDescriptor) |
||
| 631 | { |
||
| 632 | // create the job class |
||
| 633 | $impl = $jobDescriptor->Implementation; |
||
| 634 | /** @var QueuedJob $job */ |
||
| 635 | $job = Injector::inst()->create($impl); |
||
| 636 | /* @var $job QueuedJob */ |
||
| 637 | if (!$job) { |
||
| 638 | throw new Exception("Implementation $impl no longer exists"); |
||
| 639 | } |
||
| 640 | |||
| 641 | $jobDescriptor->JobStatus = QueuedJob::STATUS_INIT; |
||
| 642 | $jobDescriptor->write(); |
||
| 643 | |||
| 644 | // make sure the data is there |
||
| 645 | $this->copyDescriptorToJob($jobDescriptor, $job); |
||
| 646 | |||
| 647 | // see if it needs 'setup' or 'restart' called |
||
| 648 | if ($jobDescriptor->StepsProcessed <= 0) { |
||
| 649 | $job->setup(); |
||
| 650 | } else { |
||
| 651 | $job->prepareForRestart(); |
||
| 652 | } |
||
| 653 | |||
| 654 | // make sure the descriptor is up to date with anything changed |
||
| 655 | $this->copyJobToDescriptor($job, $jobDescriptor); |
||
| 656 | $jobDescriptor->write(); |
||
| 657 | |||
| 658 | return $job; |
||
| 659 | } |
||
| 660 | |||
| 661 | /** |
||
| 662 | * Given a {@link QueuedJobDescriptor} mark the job as initialised. Works sort of like a mutex. |
||
| 663 | * Currently a database lock isn't entirely achievable, due to database adapters not supporting locks. |
||
| 664 | * This may still have a race condition, but this should minimise the possibility. |
||
| 665 | * Side effect is the job status will be changed to "Initialised". |
||
| 666 | * |
||
| 667 | * Assumption is the job has a status of "Queued" or "Wait". |
||
| 668 | * |
||
| 669 | * @param QueuedJobDescriptor $jobDescriptor |
||
| 670 | * |
||
| 671 | * @return boolean |
||
| 672 | */ |
||
| 673 | protected function grabMutex(QueuedJobDescriptor $jobDescriptor) |
||
| 674 | { |
||
| 675 | // write the status and determine if any rows were affected, for protection against a |
||
| 676 | // potential race condition where two or more processes init the same job at once. |
||
| 677 | // This deliberately does not use write() as that would always update LastEdited |
||
| 678 | // and thus the row would always be affected. |
||
| 679 | try { |
||
| 680 | DB::query(sprintf( |
||
| 681 | 'UPDATE "QueuedJobDescriptor" SET "JobStatus" = \'%s\' WHERE "ID" = %s' |
||
| 682 | . ' AND "JobFinished" IS NULL AND "JobStatus" NOT IN (%s)', |
||
| 683 | QueuedJob::STATUS_INIT, |
||
| 684 | $jobDescriptor->ID, |
||
| 685 | "'" . QueuedJob::STATUS_RUN . "', '" . QueuedJob::STATUS_COMPLETE . "', '" |
||
| 686 | . QueuedJob::STATUS_PAUSED . "', '" . QueuedJob::STATUS_CANCELLED . "'" |
||
| 687 | )); |
||
| 688 | } catch (Exception $e) { |
||
| 689 | return false; |
||
| 690 | } |
||
| 691 | |||
| 692 | if (DB::get_conn()->affectedRows() === 0 && $jobDescriptor->JobStatus !== QueuedJob::STATUS_INIT) { |
||
| 693 | return false; |
||
| 694 | } |
||
| 695 | |||
| 696 | return true; |
||
| 697 | } |
||
| 698 | |||
| 699 | /** |
||
| 700 | * Start the actual execution of a job. |
||
| 701 | * The assumption is the jobID refers to a {@link QueuedJobDescriptor} that is status set as "Queued". |
||
| 702 | * |
||
| 703 | * This method will continue executing until the job says it's completed |
||
| 704 | * |
||
| 705 | * @param int $jobId |
||
| 706 | * The ID of the job to start executing |
||
| 707 | * |
||
| 708 | * @return boolean |
||
| 709 | * @throws Exception |
||
| 710 | */ |
||
| 711 | public function runJob($jobId) |
||
| 712 | { |
||
| 713 | // first retrieve the descriptor |
||
| 714 | /** @var QueuedJobDescriptor $jobDescriptor */ |
||
| 715 | $jobDescriptor = DataObject::get_by_id( |
||
| 716 | QueuedJobDescriptor::class, |
||
| 717 | (int)$jobId |
||
| 718 | ); |
||
| 719 | if (!$jobDescriptor) { |
||
| 720 | throw new Exception("$jobId is invalid"); |
||
| 721 | } |
||
| 722 | |||
| 723 | // now lets see whether we have a current user to run as. Typically, if the job is executing via the CLI, |
||
| 724 | // we want it to actually execute as the RunAs user - however, if running via the web (which is rare...), we |
||
| 725 | // want to ensure that the current user has admin privileges before switching. Otherwise, we just run it |
||
| 726 | // as the currently logged in user and hope for the best |
||
| 727 | |||
| 728 | // We need to use $_SESSION directly because SS ties the session to a controller that no longer exists at |
||
| 729 | // this point of execution in some circumstances |
||
| 730 | $originalUserID = isset($_SESSION['loggedInAs']) ? $_SESSION['loggedInAs'] : 0; |
||
| 731 | /** @var Member|null $originalUser */ |
||
| 732 | $originalUser = $originalUserID |
||
| 733 | ? DataObject::get_by_id(Member::class, $originalUserID) |
||
| 734 | : null; |
||
| 735 | $runAsUser = null; |
||
| 736 | |||
| 737 | // If the Job has requested that we run it as a particular user, then we should try and do that. |
||
| 738 | if ($jobDescriptor->RunAs() !== null) { |
||
| 739 | $runAsUser = $this->setRunAsUser($jobDescriptor->RunAs(), $originalUser); |
||
| 740 | } |
||
| 741 | |||
| 742 | // set up a custom error handler for this processing |
||
| 743 | $errorHandler = new JobErrorHandler(); |
||
| 744 | |||
| 745 | $job = null; |
||
| 746 | |||
| 747 | $broken = false; |
||
| 748 | |||
| 749 | // Push a config context onto the stack for the duration of this job run. |
||
| 750 | Config::nest(); |
||
| 751 | |||
| 752 | if ($this->grabMutex($jobDescriptor)) { |
||
| 753 | try { |
||
| 754 | $job = $this->initialiseJob($jobDescriptor); |
||
| 755 | |||
| 756 | // get the job ready to begin. |
||
| 757 | if (!$jobDescriptor->JobStarted) { |
||
| 758 | $jobDescriptor->JobStarted = DBDatetime::now()->Rfc2822(); |
||
| 759 | } else { |
||
| 760 | $jobDescriptor->JobRestarted = DBDatetime::now()->Rfc2822(); |
||
| 761 | } |
||
| 762 | |||
| 763 | // Only write to job as "Running" if 'isComplete' was NOT set to true |
||
| 764 | // during setup() or prepareForRestart() |
||
| 765 | if (!$job->jobFinished()) { |
||
| 766 | $jobDescriptor->JobStatus = QueuedJob::STATUS_RUN; |
||
| 767 | $jobDescriptor->write(); |
||
| 768 | } |
||
| 769 | |||
| 770 | $lastStepProcessed = 0; |
||
| 771 | // have we stalled at all? |
||
| 772 | $stallCount = 0; |
||
| 773 | |||
| 774 | if (class_exists(Subsite::class) && !empty($job->SubsiteID)) { |
||
| 775 | Subsite::changeSubsite($job->SubsiteID); |
||
| 776 | |||
| 777 | // lets set the base URL as far as Director is concerned so that our URLs are correct |
||
| 778 | /** @var Subsite $subsite */ |
||
| 779 | $subsite = DataObject::get_by_id(Subsite::class, $job->SubsiteID); |
||
| 780 | if ($subsite && $subsite->exists()) { |
||
| 781 | $domain = $subsite->domain(); |
||
| 782 | $base = rtrim(Director::protocol() . $domain, '/') . '/'; |
||
| 783 | |||
| 784 | Config::modify()->set(Director::class, 'alternate_base_url', $base); |
||
| 785 | } |
||
| 786 | } |
||
| 787 | |||
| 788 | // while not finished |
||
| 789 | while (!$job->jobFinished() && !$broken) { |
||
| 790 | // see that we haven't been set to 'paused' or otherwise by another process |
||
| 791 | /** @var QueuedJobDescriptor $jobDescriptor */ |
||
| 792 | $jobDescriptor = DataObject::get_by_id( |
||
| 793 | QueuedJobDescriptor::class, |
||
| 794 | (int)$jobId |
||
| 795 | ); |
||
| 796 | if (!$jobDescriptor || !$jobDescriptor->exists()) { |
||
| 797 | $broken = true; |
||
| 798 | $this->getLogger()->error( |
||
| 799 | print_r( |
||
| 800 | [ |
||
| 801 | 'errno' => 0, |
||
| 802 | 'errstr' => 'Job descriptor ' . $jobId . ' could not be found', |
||
| 803 | 'errfile' => __FILE__, |
||
| 804 | 'errline' => __LINE__, |
||
| 805 | 'errcontext' => [], |
||
| 806 | ], |
||
| 807 | true |
||
| 808 | ), |
||
| 809 | [ |
||
| 810 | 'file' => __FILE__, |
||
| 811 | 'line' => __LINE__, |
||
| 812 | ] |
||
| 813 | ); |
||
| 814 | break; |
||
| 815 | } |
||
| 816 | View Code Duplication | if ($jobDescriptor->JobStatus != QueuedJob::STATUS_RUN) { |
|
| 817 | // we've been paused by something, so we'll just exit |
||
| 818 | $job->addMessage(_t( |
||
| 819 | __CLASS__ . '.JOB_PAUSED', |
||
| 820 | 'Job paused at {time}', |
||
| 821 | ['time' => DBDatetime::now()->Rfc2822()] |
||
| 822 | )); |
||
| 823 | $broken = true; |
||
| 824 | } |
||
| 825 | |||
| 826 | if (!$broken) { |
||
| 827 | // Inject real-time log handler |
||
| 828 | $logger = Injector::inst()->get(LoggerInterface::class); |
||
| 829 | if ($logger instanceof Logger) { |
||
| 830 | // Check if there is already a handler |
||
| 831 | $exists = false; |
||
| 832 | foreach ($logger->getHandlers() as $handler) { |
||
| 833 | if ($handler instanceof QueuedJobHandler) { |
||
| 834 | $exists = true; |
||
| 835 | break; |
||
| 836 | } |
||
| 837 | } |
||
| 838 | |||
| 839 | if (!$exists) { |
||
| 840 | // Add the handler |
||
| 841 | /** @var QueuedJobHandler $queuedJobHandler */ |
||
| 842 | $queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor); |
||
| 843 | |||
| 844 | // We only write for every 100 file |
||
| 845 | $bufferHandler = new BufferHandler( |
||
| 846 | $queuedJobHandler, |
||
| 847 | 100, |
||
| 848 | Logger::DEBUG, |
||
| 849 | true, |
||
| 850 | true |
||
| 851 | ); |
||
| 852 | |||
| 853 | $logger->pushHandler($bufferHandler); |
||
| 854 | } |
||
| 855 | } else { |
||
| 856 | if ($logger instanceof LoggerInterface) { |
||
| 857 | $logger->warning( |
||
| 858 | 'Monolog not found, messages will not output while the job is running' |
||
| 859 | ); |
||
| 860 | } |
||
| 861 | } |
||
| 862 | |||
| 863 | // Collect output as job messages as well as sending it to the screen after processing |
||
| 864 | $obLogger = function ($buffer, $phase) use ($job, $jobDescriptor) { |
||
| 865 | $job->addMessage($buffer); |
||
| 866 | if ($jobDescriptor) { |
||
| 867 | $this->copyJobToDescriptor($job, $jobDescriptor); |
||
| 868 | $jobDescriptor->write(); |
||
| 869 | } |
||
| 870 | return $buffer; |
||
| 871 | }; |
||
| 872 | ob_start($obLogger, 256); |
||
| 873 | |||
| 874 | try { |
||
| 875 | $job->process(); |
||
| 876 | } catch (Exception $e) { |
||
| 877 | // okay, we'll just catch this exception for now |
||
| 878 | $job->addMessage( |
||
| 879 | _t( |
||
| 880 | __CLASS__ . '.JOB_EXCEPT', |
||
| 881 | 'Job caused exception {message} in {file} at line {line}', |
||
| 882 | [ |
||
| 883 | 'message' => $e->getMessage(), |
||
| 884 | 'file' => $e->getFile(), |
||
| 885 | 'line' => $e->getLine(), |
||
| 886 | ] |
||
| 887 | ) |
||
| 888 | ); |
||
| 889 | $this->getLogger()->error( |
||
| 890 | $e->getMessage(), |
||
| 891 | [ |
||
| 892 | 'exception' => $e, |
||
| 893 | ] |
||
| 894 | ); |
||
| 895 | $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; |
||
| 896 | $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e); |
||
| 897 | } |
||
| 898 | |||
| 899 | // Write any remaining batched messages at the end |
||
| 900 | if (isset($bufferHandler)) { |
||
| 901 | $bufferHandler->flush(); |
||
| 902 | } |
||
| 903 | |||
| 904 | ob_end_flush(); |
||
| 905 | |||
| 906 | // now check the job state |
||
| 907 | $data = $job->getJobData(); |
||
| 908 | if ($data->currentStep == $lastStepProcessed) { |
||
| 909 | $stallCount++; |
||
| 910 | } |
||
| 911 | |||
| 912 | View Code Duplication | if ($stallCount > static::config()->get('stall_threshold')) { |
|
| 913 | $broken = true; |
||
| 914 | $job->addMessage( |
||
| 915 | _t( |
||
| 916 | __CLASS__ . '.JOB_STALLED', |
||
| 917 | 'Job stalled after {attempts} attempts - please check', |
||
| 918 | ['attempts' => $stallCount] |
||
| 919 | ) |
||
| 920 | ); |
||
| 921 | $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; |
||
| 922 | } |
||
| 923 | |||
| 924 | // now we'll be good and check our memory usage. If it is too high, we'll set the job to |
||
| 925 | // a 'Waiting' state, and let the next processing run pick up the job. |
||
| 926 | if ($this->isMemoryTooHigh()) { |
||
| 927 | $job->addMessage( |
||
| 928 | _t( |
||
| 929 | __CLASS__ . '.MEMORY_RELEASE', |
||
| 930 | 'Job releasing memory and waiting ({used} used)', |
||
| 931 | ['used' => $this->humanReadable($this->getMemoryUsage())] |
||
| 932 | ) |
||
| 933 | ); |
||
| 934 | if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) { |
||
| 935 | $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; |
||
| 936 | } |
||
| 937 | $broken = true; |
||
| 938 | } |
||
| 939 | |||
| 940 | // Also check if we are running too long |
||
| 941 | if ($this->hasPassedTimeLimit()) { |
||
| 942 | $job->addMessage(_t( |
||
| 943 | __CLASS__ . '.TIME_LIMIT', |
||
| 944 | 'Queue has passed time limit and will restart before continuing' |
||
| 945 | )); |
||
| 946 | if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) { |
||
| 947 | $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; |
||
| 948 | } |
||
| 949 | $broken = true; |
||
| 950 | } |
||
| 951 | } |
||
| 952 | |||
| 953 | if ($jobDescriptor) { |
||
| 954 | $this->copyJobToDescriptor($job, $jobDescriptor); |
||
| 955 | $jobDescriptor->write(); |
||
| 956 | View Code Duplication | } else { |
|
| 957 | $this->getLogger()->error( |
||
| 958 | print_r( |
||
| 959 | [ |
||
| 960 | 'errno' => 0, |
||
| 961 | 'errstr' => 'Job descriptor has been set to null', |
||
| 962 | 'errfile' => __FILE__, |
||
| 963 | 'errline' => __LINE__, |
||
| 964 | 'errcontext' => [], |
||
| 965 | ], |
||
| 966 | true |
||
| 967 | ), |
||
| 968 | [ |
||
| 969 | 'file' => __FILE__, |
||
| 970 | 'line' => __LINE__, |
||
| 971 | ] |
||
| 972 | ); |
||
| 973 | $broken = true; |
||
| 974 | } |
||
| 975 | } |
||
| 976 | |||
| 977 | // a last final save. The job is complete by now |
||
| 978 | if ($jobDescriptor) { |
||
| 979 | $jobDescriptor->write(); |
||
| 980 | } |
||
| 981 | |||
| 982 | if ($job->jobFinished()) { |
||
| 983 | /** @var AbstractQueuedJob|QueuedJob $job */ |
||
| 984 | $job->afterComplete(); |
||
| 985 | $jobDescriptor->cleanupJob(); |
||
| 986 | |||
| 987 | $this->extend('updateJobDescriptorAndJobOnCompletion', $jobDescriptor, $job); |
||
| 988 | } |
||
| 989 | } catch (Exception $e) { |
||
| 990 | // PHP 5.6 exception handling |
||
| 991 | $this->handleBrokenJobException($jobDescriptor, $job, $e); |
||
| 992 | $broken = true; |
||
| 993 | } catch (\Throwable $e) { |
||
| 994 | // PHP 7 Error handling) |
||
| 995 | $this->handleBrokenJobException($jobDescriptor, $job, $e); |
||
| 996 | $broken = true; |
||
| 997 | } |
||
| 998 | } |
||
| 999 | |||
| 1000 | $errorHandler->clear(); |
||
| 1001 | |||
| 1002 | Config::unnest(); |
||
| 1003 | |||
| 1004 | $this->unsetRunAsUser($runAsUser, $originalUser); |
||
| 1005 | |||
| 1006 | return !$broken; |
||
| 1007 | } |
||
| 1008 | |||
| 1009 | /** |
||
| 1010 | * @param QueuedJobDescriptor $jobDescriptor |
||
| 1011 | * @param QueuedJob $job |
||
| 1012 | * @param Exception|\Throwable $e |
||
| 1013 | */ |
||
| 1014 | protected function handleBrokenJobException(QueuedJobDescriptor $jobDescriptor, QueuedJob $job, $e) |
||
| 1015 | { |
||
| 1016 | // okay, we'll just catch this exception for now |
||
| 1017 | $this->getLogger()->info( |
||
| 1018 | $e->getMessage(), |
||
| 1019 | [ |
||
| 1020 | 'exception' => $e, |
||
| 1021 | ] |
||
| 1022 | ); |
||
| 1023 | $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; |
||
| 1024 | $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e); |
||
| 1025 | $jobDescriptor->write(); |
||
| 1026 | } |
||
| 1027 | |||
| 1028 | /** |
||
| 1029 | * @param Member $runAsUser |
||
| 1030 | * @param Member|null $originalUser |
||
| 1031 | * @return null|Member |
||
| 1032 | */ |
||
| 1033 | protected function setRunAsUser(Member $runAsUser, Member $originalUser = null) |
||
| 1061 | |||
| 1062 | /** |
||
| 1063 | * @param Member|null $runAsUser |
||
| 1064 | * @param Member|null $originalUser |
||
| 1065 | */ |
||
| 1066 | protected function unsetRunAsUser(Member $runAsUser = null, Member $originalUser = null) |
||
| 1091 | |||
| 1092 | /** |
||
| 1093 | * Start timer |
||
| 1094 | */ |
||
| 1095 | protected function markStarted() |
||
| 1101 | |||
| 1102 | /** |
||
| 1103 | * Is execution time too long? |
||
| 1104 | * |
||
| 1105 | * @return bool True if the script has passed the configured time_limit |
||
| 1106 | */ |
||
| 1107 | protected function hasPassedTimeLimit() |
||
| 1122 | |||
| 1123 | /** |
||
| 1124 | * Is memory usage too high? |
||
| 1125 | * |
||
| 1126 | * @return bool |
||
| 1127 | */ |
||
| 1128 | protected function isMemoryTooHigh() |
||
| 1134 | |||
| 1135 | /** |
||
| 1136 | * Get peak memory usage of this application |
||
| 1137 | * |
||
| 1138 | * @return float |
||
| 1139 | */ |
||
| 1140 | protected function getMemoryUsage() |
||
| 1147 | |||
| 1148 | /** |
||
| 1149 | * Determines the memory limit (in bytes) for this application |
||
| 1150 | * Limits to the smaller of memory_limit configured via php.ini or silverstripe config |
||
| 1151 | * |
||
| 1152 | * @return float Memory limit in bytes |
||
| 1153 | */ |
||
| 1154 | protected function getMemoryLimit() |
||
| 1168 | |||
| 1169 | /** |
||
| 1170 | * Calculate the current memory limit of the server |
||
| 1171 | * |
||
| 1172 | * @return float |
||
| 1173 | */ |
||
| 1174 | protected function getPHPMemoryLimit() |
||
| 1178 | |||
| 1179 | /** |
||
| 1180 | * Convert memory limit string to bytes. |
||
| 1181 | * Based on implementation in install.php5 |
||
| 1182 | * |
||
| 1183 | * @param string $memString |
||
| 1184 | * |
||
| 1185 | * @return float |
||
| 1186 | */ |
||
| 1187 | protected function parseMemory($memString) |
||
| 1202 | |||
| 1203 | protected function humanReadable($size) |
||
| 1208 | |||
| 1209 | |||
| 1210 | /** |
||
| 1211 | * Gets a list of all the current jobs (or jobs that have recently finished) |
||
| 1212 | * |
||
| 1213 | * @param string $type |
||
| 1214 | * if we're after a particular job list |
||
| 1215 | * @param int $includeUpUntil |
||
| 1216 | * The number of seconds to include jobs that have just finished, allowing a job list to be built that |
||
| 1217 | * includes recently finished jobs |
||
| 1218 | * |
||
| 1219 | * @return DataList|QueuedJobDescriptor[] |
||
| 1220 | */ |
||
| 1221 | public function getJobList($type = null, $includeUpUntil = 0) |
||
| 1228 | |||
| 1229 | /** |
||
| 1230 | * Return the SQL filter used to get the job list - this is used by the UI for displaying the job list... |
||
| 1231 | * |
||
| 1232 | * @param string $type |
||
| 1233 | * if we're after a particular job list |
||
| 1234 | * @param int $includeUpUntil |
||
| 1235 | * The number of seconds to include jobs that have just finished, allowing a job list to be built that |
||
| 1236 | * includes recently finished jobs |
||
| 1237 | * |
||
| 1238 | * @return string |
||
| 1239 | */ |
||
| 1240 | public function getJobListFilter($type = null, $includeUpUntil = 0) |
||
| 1259 | |||
| 1260 | /** |
||
| 1261 | * Process the job queue with the current queue runner |
||
| 1262 | * |
||
| 1263 | * @param string $queue |
||
| 1264 | */ |
||
| 1265 | public function runQueue($queue) |
||
| 1273 | |||
| 1274 | /** |
||
| 1275 | * Process all jobs from a given queue |
||
| 1276 | * |
||
| 1277 | * @param string $name The job queue to completely process |
||
| 1278 | */ |
||
| 1279 | public function processJobQueue($name) |
||
| 1304 | |||
| 1305 | /** |
||
| 1306 | * When PHP shuts down, we want to process all of the immediate queue items |
||
| 1307 | * |
||
| 1308 | * We use the 'getNextPendingJob' method, instead of just iterating the queue, to ensure |
||
| 1309 | * we ignore paused or stalled jobs. |
||
| 1310 | */ |
||
| 1311 | public function onShutdown() |
||
| 1315 | |||
| 1316 | /** |
||
| 1317 | * Get a logger |
||
| 1318 | * |
||
| 1319 | * @return LoggerInterface |
||
| 1320 | */ |
||
| 1321 | public function getLogger() |
||
| 1325 | |||
| 1326 | public function enableMaintenanceLock() |
||
| 1337 | |||
| 1338 | View Code Duplication | public function disableMaintenanceLock() |
|
| 1351 | |||
| 1352 | /** |
||
| 1353 | * @return bool |
||
| 1354 | */ |
||
| 1355 | View Code Duplication | public function isMaintenanceLockActive() |
|
| 1365 | |||
| 1366 | /** |
||
| 1367 | * @return string |
||
| 1368 | */ |
||
| 1369 | private function lockFilePath() |
||
| 1378 | } |
||
| 1379 |
Since your code implements the magic setter
_set, this function will be called for any write access on an undefined variable. You can add the@propertyannotation to your class or interface to document the existence of this variable.Since the property has write access only, you can use the @property-write annotation instead.
Of course, you may also just have mistyped another name, in which case you should fix the error.
See also the PhpDoc documentation for @property.