Completed
Push — irc-link-update ( 5c98e1 )
by Simon
17s queued 13s
created

RunJobQueueTask::stageQueuedTasks()   A

Complexity

Conditions 3
Paths 12

Size

Total Lines 28
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
eloc 16
c 0
b 0
f 0
dl 0
loc 28
ccs 0
cts 21
cp 0
rs 9.7333
cc 3
nc 12
nop 1
crap 12
1
<?php
2
/******************************************************************************
3
 * Wikipedia Account Creation Assistance tool                                 *
4
 *                                                                            *
5
 * All code in this file is released into the public domain by the ACC        *
6
 * Development Team. Please see team.json for a list of contributors.         *
7
 ******************************************************************************/
8
9
namespace Waca\ConsoleTasks;
10
11
use Exception;
12
use PDO;
13
use Waca\Background\BackgroundTaskBase;
14
use Waca\Background\Task\BotCreationTask;
15
use Waca\Background\Task\UserCreationTask;
16
use Waca\Background\Task\WelcomeUserTask;
17
use Waca\DataObjects\JobQueue;
18
use Waca\ExceptionHandler;
19
use Waca\Exceptions\ApplicationLogicException;
20
use Waca\Helpers\Logger;
21
use Waca\PdoDatabase;
22
use Waca\Tasks\ConsoleTaskBase;
23
24
class RunJobQueueTask extends ConsoleTaskBase
25
{
26
    private $taskList = array(
27
        WelcomeUserTask::class,
28
        BotCreationTask::class,
29
        UserCreationTask::class
30
    );
31
32
    public function execute()
33
    {
34
        $database = $this->getDatabase();
35
36
        // ensure we're running inside a tx here.
37
        if (!$database->hasActiveTransaction()) {
38
            $database->beginTransaction();
39
        }
40
41
        $sql = 'SELECT * FROM jobqueue WHERE status = :status ORDER BY enqueue LIMIT :lim';
42
        $statement = $database->prepare($sql);
43
        $statement->execute(array(
44
            ':status' => JobQueue::STATUS_READY,
45
            ':lim' => $this->getSiteConfiguration()->getJobQueueBatchSize()
46
        ));
47
48
        /** @var JobQueue[] $queuedJobs */
49
        $queuedJobs = $statement->fetchAll(PDO::FETCH_CLASS, JobQueue::class);
50
51
        // mark all the jobs as running, and commit the txn so we're not holding onto long-running transactions.
52
        // We'll re-lock the row when we get to it.
53
        foreach ($queuedJobs as $job) {
54
            $job->setDatabase($database);
55
            $job->setStatus(JobQueue::STATUS_WAITING);
56
            $job->setError(null);
57
            $job->setAcknowledged(null);
58
            $job->save();
59
        }
60
61
        $database->commit();
62
63
        set_error_handler(array(RunJobQueueTask::class, 'errorHandler'), E_ALL);
64
65
        foreach ($queuedJobs as $job) {
66
            try {
67
                // refresh from the database
68
                /** @var JobQueue $job */
69
                $job = JobQueue::getById($job->getId(), $database);
70
71
                if ($job->getStatus() !== JobQueue::STATUS_WAITING) {
72
                    continue;
73
                }
74
75
                $database->beginTransaction();
76
                $job->setStatus(JobQueue::STATUS_RUNNING);
77
                $job->save();
78
                $database->commit();
79
80
                $database->beginTransaction();
81
82
                // re-lock the job
83
                $job->setStatus(JobQueue::STATUS_RUNNING);
84
                $job->save();
85
86
                // validate we're allowed to run the requested task (whitelist)
87
                if (!in_array($job->getTask(), $this->taskList)) {
88
                    throw new ApplicationLogicException('Job task not registered');
89
                }
90
91
                // Create a task.
92
                $taskName = $job->getTask();
93
94
                if (!class_exists($taskName)) {
95
                    throw new ApplicationLogicException('Job task does not exist');
96
                }
97
98
                /** @var BackgroundTaskBase $task */
99
                $task = new $taskName;
100
101
                $this->setupTask($task, $job);
102
                $task->run();
103
            }
104
            catch (Exception $ex) {
105
                $database->rollBack();
106
                $database->beginTransaction();
107
108
                /** @var JobQueue $job */
109
                $job = JobQueue::getById($job->getId(), $database);
110
                $job->setDatabase($database);
111
                $job->setStatus(JobQueue::STATUS_FAILED);
112
                $job->setError($ex->getMessage());
113
                $job->setAcknowledged(0);
114
                $job->save();
115
116
                Logger::backgroundJobIssue($this->getDatabase(), $job);
117
118
                $database->commit();
119
            }
120
            finally {
121
                $database->commit();
122
            }
123
        }
124
125
        $this->stageQueuedTasks($database);
126
    }
127
128
    /**
129
     * @param BackgroundTaskBase $task
130
     * @param JobQueue           $job
131
     */
132
    private function setupTask(BackgroundTaskBase $task, JobQueue $job)
133
    {
134
        $task->setJob($job);
135
        $task->setDatabase($this->getDatabase());
136
        $task->setHttpHelper($this->getHttpHelper());
137
        $task->setOauthProtocolHelper($this->getOAuthProtocolHelper());
138
        $task->setEmailHelper($this->getEmailHelper());
139
        $task->setSiteConfiguration($this->getSiteConfiguration());
140
        $task->setNotificationHelper($this->getNotificationHelper());
141
    }
142
143
    /** @noinspection PhpUnusedParameterInspection */
144
    public static function errorHandler($errno, $errstr, $errfile, $errline)
145
    {
146
        throw new Exception($errfile . "@" . $errline . ": " . $errstr);
147
    }
148
149
    /**
150
     * Stages tasks for execution during the *next* jobqueue run.
151
     *
152
     * This is to build in some delay between enqueue and execution to allow for accidentally-triggered tasks to be
153
     * cancelled.
154
     *
155
     * @param PdoDatabase $database
156
     */
157
    protected function stageQueuedTasks(PdoDatabase $database): void
158
    {
159
        try {
160
            $database->beginTransaction();
161
162
            $sql = 'SELECT * FROM jobqueue WHERE status = :status ORDER BY enqueue LIMIT :lim';
163
            $statement = $database->prepare($sql);
164
165
            // use a larger batch size than the main runner, but still keep it limited in case things go crazy.
166
            $statement->execute(array(
167
                ':status' => JobQueue::STATUS_QUEUED,
168
                ':lim' => $this->getSiteConfiguration()->getJobQueueBatchSize() * 2
169
            ));
170
171
            /** @var JobQueue[] $queuedJobs */
172
            $queuedJobs = $statement->fetchAll(PDO::FETCH_CLASS, JobQueue::class);
173
174
            foreach ($queuedJobs as $job) {
175
                $job->setDatabase($database);
176
                $job->setStatus(JobQueue::STATUS_READY);
177
                $job->save();
178
            }
179
180
            $database->commit();
181
        }
182
        catch (Exception $ex) {
183
            $database->rollBack();
184
            ExceptionHandler::logExceptionToDisk($ex, $this->getSiteConfiguration());
185
        }
186
    }
187
}
188