enwikipedia-acc /
waca
| 1 | <?php |
||
| 2 | /****************************************************************************** |
||
| 3 | * Wikipedia Account Creation Assistance tool * |
||
| 4 | * ACC Development Team. Please see team.json for a list of contributors. * |
||
| 5 | * * |
||
| 6 | * This is free and unencumbered software released into the public domain. * |
||
| 7 | * Please see LICENSE.md for the full licencing statement. * |
||
| 8 | ******************************************************************************/ |
||
| 9 | |||
| 10 | namespace Waca\ConsoleTasks; |
||
| 11 | |||
| 12 | use Exception; |
||
| 13 | use PDO; |
||
| 14 | use Waca\Background\BackgroundTaskBase; |
||
| 15 | use Waca\Background\Task\BotCreationTask; |
||
| 16 | use Waca\Background\Task\UserCreationTask; |
||
| 17 | use Waca\Background\Task\WelcomeUserTask; |
||
| 18 | use Waca\DataObjects\JobQueue; |
||
| 19 | use Waca\ExceptionHandler; |
||
| 20 | use Waca\Exceptions\ApplicationLogicException; |
||
| 21 | use Waca\Helpers\Logger; |
||
| 22 | use Waca\PdoDatabase; |
||
| 23 | use Waca\Tasks\ConsoleTaskBase; |
||
| 24 | |||
| 25 | class RunJobQueueTask extends ConsoleTaskBase |
||
| 26 | { |
||
| 27 | private $taskList = array( |
||
| 28 | WelcomeUserTask::class, |
||
| 29 | BotCreationTask::class, |
||
| 30 | UserCreationTask::class |
||
| 31 | ); |
||
| 32 | |||
| 33 | public function execute() |
||
| 34 | { |
||
| 35 | $database = $this->getDatabase(); |
||
| 36 | |||
| 37 | // ensure we're running inside a tx here. |
||
| 38 | if (!$database->hasActiveTransaction()) { |
||
| 39 | $database->beginTransaction(); |
||
| 40 | } |
||
| 41 | |||
| 42 | $sql = 'SELECT * FROM jobqueue WHERE status = :status ORDER BY enqueue LIMIT :lim'; |
||
| 43 | $statement = $database->prepare($sql); |
||
| 44 | $statement->execute(array( |
||
| 45 | ':status' => JobQueue::STATUS_READY, |
||
| 46 | ':lim' => $this->getSiteConfiguration()->getJobQueueBatchSize() |
||
| 47 | )); |
||
| 48 | |||
| 49 | /** @var JobQueue[] $queuedJobs */ |
||
| 50 | $queuedJobs = $statement->fetchAll(PDO::FETCH_CLASS, JobQueue::class); |
||
| 51 | |||
| 52 | // mark all the jobs as running, and commit the txn so we're not holding onto long-running transactions. |
||
| 53 | // We'll re-lock the row when we get to it. |
||
| 54 | foreach ($queuedJobs as $job) { |
||
| 55 | $job->setDatabase($database); |
||
| 56 | $job->setStatus(JobQueue::STATUS_WAITING); |
||
| 57 | $job->setError(null); |
||
| 58 | $job->setAcknowledged(null); |
||
| 59 | $job->save(); |
||
| 60 | } |
||
| 61 | |||
| 62 | $database->commit(); |
||
| 63 | |||
| 64 | set_error_handler(array(RunJobQueueTask::class, 'errorHandler'), E_ALL); |
||
| 65 | |||
| 66 | foreach ($queuedJobs as $job) { |
||
| 67 | try { |
||
| 68 | // refresh from the database |
||
| 69 | /** @var JobQueue $job */ |
||
| 70 | $job = JobQueue::getById($job->getId(), $database); |
||
| 71 | |||
| 72 | if ($job->getStatus() !== JobQueue::STATUS_WAITING) { |
||
| 73 | continue; |
||
| 74 | } |
||
| 75 | |||
| 76 | $database->beginTransaction(); |
||
| 77 | $job->setStatus(JobQueue::STATUS_RUNNING); |
||
| 78 | $job->save(); |
||
| 79 | $database->commit(); |
||
| 80 | |||
| 81 | $database->beginTransaction(); |
||
| 82 | |||
| 83 | // re-lock the job |
||
| 84 | $job->setStatus(JobQueue::STATUS_RUNNING); |
||
| 85 | $job->save(); |
||
| 86 | |||
| 87 | // validate we're allowed to run the requested task (whitelist) |
||
| 88 | if (!in_array($job->getTask(), $this->taskList)) { |
||
| 89 | throw new ApplicationLogicException('Job task not registered'); |
||
| 90 | } |
||
| 91 | |||
| 92 | // Create a task. |
||
| 93 | $taskName = $job->getTask(); |
||
| 94 | |||
| 95 | if (!class_exists($taskName)) { |
||
| 96 | throw new ApplicationLogicException('Job task does not exist'); |
||
| 97 | } |
||
| 98 | |||
| 99 | /** @var BackgroundTaskBase $task */ |
||
| 100 | $task = new $taskName; |
||
| 101 | |||
| 102 | $this->setupTask($task, $job); |
||
| 103 | $task->run(); |
||
| 104 | } |
||
| 105 | catch (Exception $ex) { |
||
| 106 | $database->rollBack(); |
||
| 107 | try { |
||
| 108 | $database->beginTransaction(); |
||
| 109 | |||
| 110 | /** @var JobQueue $job */ |
||
| 111 | $job = JobQueue::getById($job->getId(), $database); |
||
| 112 | $job->setDatabase($database); |
||
| 113 | if (str_contains($ex->getMessage, 'mwoauth-invalid-authorization')) { |
||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||
| 114 | $job->setAcknowledged(1); |
||
| 115 | $job->setError('Receieved error with authorization headers. Will retry'); |
||
| 116 | $job->setStatus(JobQueue::STATUS_READY); |
||
| 117 | } else { |
||
| 118 | $job->setStatus(JobQueue::STATUS_FAILED); |
||
| 119 | $job->setError($ex->getMessage()); |
||
| 120 | $job->setAcknowledged(0); |
||
| 121 | } |
||
| 122 | $job->save(); |
||
| 123 | |||
| 124 | Logger::backgroundJobIssue($this->getDatabase(), $job); |
||
| 125 | |||
| 126 | $database->commit(); |
||
| 127 | } |
||
| 128 | catch (Exception $ex) { |
||
| 129 | // oops, something went horribly wrong trying to handle this in a nice way; let's just fall back to |
||
| 130 | // logging this to disk for a tool root to investigate. |
||
| 131 | ExceptionHandler::logExceptionToDisk($ex, $this->getSiteConfiguration()); |
||
| 132 | } |
||
| 133 | } |
||
| 134 | finally { |
||
| 135 | $database->commit(); |
||
| 136 | } |
||
| 137 | } |
||
| 138 | |||
| 139 | $this->stageQueuedTasks($database); |
||
| 140 | } |
||
| 141 | |||
| 142 | /** |
||
| 143 | * @param BackgroundTaskBase $task |
||
| 144 | * @param JobQueue $job |
||
| 145 | */ |
||
| 146 | private function setupTask(BackgroundTaskBase $task, JobQueue $job) |
||
| 147 | { |
||
| 148 | $task->setJob($job); |
||
| 149 | $task->setDatabase($this->getDatabase()); |
||
| 150 | $task->setHttpHelper($this->getHttpHelper()); |
||
| 151 | $task->setOauthProtocolHelper($this->getOAuthProtocolHelper()); |
||
| 152 | $task->setEmailHelper($this->getEmailHelper()); |
||
| 153 | $task->setSiteConfiguration($this->getSiteConfiguration()); |
||
| 154 | $task->setNotificationHelper($this->getNotificationHelper()); |
||
| 155 | } |
||
| 156 | |||
| 157 | /** @noinspection PhpUnusedParameterInspection */ |
||
| 158 | public static function errorHandler($errno, $errstr, $errfile, $errline) |
||
| 159 | { |
||
| 160 | throw new Exception($errfile . "@" . $errline . ": " . $errstr); |
||
| 161 | } |
||
| 162 | |||
| 163 | /** |
||
| 164 | * Stages tasks for execution during the *next* jobqueue run. |
||
| 165 | * |
||
| 166 | * This is to build in some delay between enqueue and execution to allow for accidentally-triggered tasks to be |
||
| 167 | * cancelled. |
||
| 168 | * |
||
| 169 | * @param PdoDatabase $database |
||
| 170 | */ |
||
| 171 | protected function stageQueuedTasks(PdoDatabase $database): void |
||
| 172 | { |
||
| 173 | try { |
||
| 174 | $database->beginTransaction(); |
||
| 175 | |||
| 176 | $sql = 'SELECT * FROM jobqueue WHERE status = :status ORDER BY enqueue LIMIT :lim'; |
||
| 177 | $statement = $database->prepare($sql); |
||
| 178 | |||
| 179 | // use a larger batch size than the main runner, but still keep it limited in case things go crazy. |
||
| 180 | $statement->execute(array( |
||
| 181 | ':status' => JobQueue::STATUS_QUEUED, |
||
| 182 | ':lim' => $this->getSiteConfiguration()->getJobQueueBatchSize() * 2 |
||
| 183 | )); |
||
| 184 | |||
| 185 | /** @var JobQueue[] $queuedJobs */ |
||
| 186 | $queuedJobs = $statement->fetchAll(PDO::FETCH_CLASS, JobQueue::class); |
||
| 187 | |||
| 188 | foreach ($queuedJobs as $job) { |
||
| 189 | $job->setDatabase($database); |
||
| 190 | $job->setStatus(JobQueue::STATUS_READY); |
||
| 191 | $job->save(); |
||
| 192 | } |
||
| 193 | |||
| 194 | $database->commit(); |
||
| 195 | } |
||
| 196 | catch (Exception $ex) { |
||
| 197 | $database->rollBack(); |
||
| 198 | ExceptionHandler::logExceptionToDisk($ex, $this->getSiteConfiguration()); |
||
| 199 | } |
||
| 200 | } |
||
| 201 | } |
||
| 202 |