Test Failed
Push — newinternal ( 8c4587...b2f220 )
by Michael
15:41 queued 06:17
created

RunJobQueueTask   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 115
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 10
eloc 61
c 2
b 0
f 0
dl 0
loc 115
rs 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A errorHandler() 0 2 1
A setupTask() 0 9 1
D execute() 0 86 8
1
<?php
2
/******************************************************************************
3
 * Wikipedia Account Creation Assistance tool                                 *
4
 *                                                                            *
5
 * All code in this file is released into the public domain by the ACC        *
6
 * Development Team. Please see team.json for a list of contributors.         *
7
 ******************************************************************************/
8
9
namespace Waca\ConsoleTasks;
10
11
use Exception;
12
use PDO;
13
use Waca\Background\BackgroundTaskBase;
14
use Waca\Background\Task\BotCreationTask;
15
use Waca\Background\Task\UserCreationTask;
16
use Waca\Background\Task\WelcomeUserTask;
17
use Waca\DataObjects\JobQueue;
18
use Waca\Exceptions\ApplicationLogicException;
19
use Waca\Helpers\Logger;
20
use Waca\Tasks\ConsoleTaskBase;
21
22
class RunJobQueueTask extends ConsoleTaskBase
23
{
24
    private $taskList = array(
25
        WelcomeUserTask::class,
26
        BotCreationTask::class,
27
        UserCreationTask::class
28
    );
29
30
    public function execute()
31
    {
32
        $database = $this->getDatabase();
33
34
        // ensure we're running inside a tx here.
35
        if (!$database->hasActiveTransaction()) {
36
            $database->beginTransaction();
37
        }
38
39
        $sql = 'SELECT * FROM jobqueue WHERE status = :status ORDER BY enqueue LIMIT :lim';
40
        $statement = $database->prepare($sql);
41
        $statement->execute(array(':status' => JobQueue::STATUS_READY, ':lim' => 10));
42
        /** @var JobQueue[] $queuedJobs */
43
        $queuedJobs = $statement->fetchAll(PDO::FETCH_CLASS, JobQueue::class);
44
45
        // mark all the jobs as running, and commit the txn so we're not holding onto long-running transactions.
46
        // We'll re-lock the row when we get to it.
47
        foreach ($queuedJobs as $job) {
48
            $job->setDatabase($database);
49
            $job->setStatus(JobQueue::STATUS_WAITING);
50
            $job->setError(null);
51
            $job->setAcknowledged(null);
52
            $job->save();
53
        }
54
55
        $database->commit();
56
57
        set_error_handler(array(RunJobQueueTask::class, 'errorHandler'), E_ALL);
58
59
        foreach ($queuedJobs as $job) {
60
            try {
61
                // refresh from the database
62
                /** @var JobQueue $job */
63
                $job = JobQueue::getById($job->getId(), $database);
64
65
                if ($job->getStatus() !== JobQueue::STATUS_WAITING) {
66
                    continue;
67
                }
68
69
                $database->beginTransaction();
70
                $job->setStatus(JobQueue::STATUS_RUNNING);
71
                $job->save();
72
                $database->commit();
73
74
                $database->beginTransaction();
75
76
                // re-lock the job
77
                $job->setStatus(JobQueue::STATUS_RUNNING);
78
                $job->save();
79
80
                // validate we're allowed to run the requested task (whitelist)
81
                if (!in_array($job->getTask(), $this->taskList)) {
82
                    throw new ApplicationLogicException('Job task not registered');
83
                }
84
85
                // Create a task.
86
                $taskName = $job->getTask();
87
88
                if(!class_exists($taskName)) {
89
                    throw new ApplicationLogicException('Job task does not exist');
90
                }
91
92
                /** @var BackgroundTaskBase $task */
93
                $task = new $taskName;
94
95
                $this->setupTask($task, $job);
96
                $task->run();
97
            }
98
            catch (Exception $ex) {
99
                $database->rollBack();
100
                $database->beginTransaction();
101
102
                /** @var JobQueue $job */
103
                $job = JobQueue::getById($job->getId(), $database);
104
                $job->setDatabase($database);
105
                $job->setStatus(JobQueue::STATUS_FAILED);
106
                $job->setError($ex->getMessage());
107
                $job->setAcknowledged(0);
108
                $job->save();
109
110
                Logger::backgroundJobIssue($this->getDatabase(), $job);
111
112
                $database->commit();
113
            }
114
            finally {
115
                $database->commit();
116
            }
117
        }
118
    }
119
120
    /**
121
     * @param BackgroundTaskBase $task
122
     * @param JobQueue           $job
123
     */
124
    private function setupTask(BackgroundTaskBase $task, JobQueue $job)
125
    {
126
        $task->setJob($job);
127
        $task->setDatabase($this->getDatabase());
128
        $task->setHttpHelper($this->getHttpHelper());
129
        $task->setOauthProtocolHelper($this->getOAuthProtocolHelper());
130
        $task->setEmailHelper($this->getEmailHelper());
131
        $task->setSiteConfiguration($this->getSiteConfiguration());
132
        $task->setNotificationHelper($this->getNotificationHelper());
133
    }
134
135
    public static function errorHandler($errno, $errstr, $errfile, $errline) {
136
        throw new Exception($errfile . "@" . $errline . ": " . $errstr);
137
    }
138
}
139