1 | <?php |
||
2 | App::uses('AppModel', 'Model'); |
||
3 | |||
4 | /** |
||
5 | * QueuedTask Model. |
||
6 | * |
||
7 | */ |
||
8 | class QueuedTask extends AppModel { |
||
9 | |||
10 | /** |
||
11 | * The (translation) domain to be used for extracted validation messages in models. |
||
12 | * |
||
13 | * @var string |
||
14 | */ |
||
15 | public $validationDomain = 'queue'; |
||
16 | |||
17 | /** |
||
18 | * Adds a new Job to the queue. |
||
19 | * |
||
20 | * @param string $taskName A queue task name |
||
21 | * @param array $data Any data |
||
22 | * @param ?string $notBefore A datetime which indicates when the job may be executed |
||
23 | * @return mixed On success `Model::$data` if its not empty or true, false on failure |
||
24 | */ |
||
25 | public function createJob(string $taskName, array $data, $notBefore = null) { |
||
26 | $data = [ |
||
27 | 'task' => $taskName, |
||
28 | 'data' => serialize($data), |
||
29 | 'not_before' => date('Y-m-d H:i:s'), |
||
30 | ]; |
||
31 | |||
32 | if (!empty($notBefore)) { |
||
33 | $data['not_before'] = date('Y-m-d H:i:s', strtotime($notBefore)); |
||
34 | } |
||
35 | |||
36 | $this->create(); |
||
37 | |||
38 | return $this->save($data); |
||
39 | } |
||
40 | |||
41 | /** |
||
42 | * Looks for a new job that can be processed with the current abilities |
||
43 | * |
||
44 | * @param array $capabilities Available queue worker tasks. |
||
45 | * @param array $types Request a job from these types (or exclude certain types), or any otherwise. |
||
46 | * @return mixed Job data or false. |
||
47 | */ |
||
48 | public function requestJob($capabilities, array $types = []) { |
||
49 | $idlist = []; |
||
50 | $wasFetched = []; |
||
51 | |||
52 | $this->virtualFields['age'] = 'IFNULL(TIMESTAMPDIFF(SECOND, NOW(), not_before), 0)'; |
||
53 | $conditions = [ |
||
54 | 'completed' => null, |
||
55 | 'OR' => [] |
||
56 | ]; |
||
57 | $fields = [ |
||
58 | 'id', |
||
59 | 'fetched', |
||
60 | 'age' |
||
61 | ]; |
||
62 | $order = [ |
||
63 | 'age' => 'ASC', |
||
64 | 'id' => 'ASC' |
||
65 | ]; |
||
66 | $limit = Configure::read('Queue.workers'); |
||
67 | |||
68 | if ($types) { |
||
0 ignored issues
–
show
|
|||
69 | $conditions = $this->_addFilter($conditions, 'task', $types); |
||
70 | } |
||
71 | |||
72 | // Generate the job specific conditions. |
||
73 | foreach ($capabilities as $task) { |
||
74 | list($plugin, $name) = pluginSplit($task['name']); |
||
75 | $tmp = [ |
||
76 | 'task' => $name, |
||
77 | 'AND' => [ |
||
78 | 'not_before <=' => date('Y-m-d H:i:s'), |
||
79 | [ |
||
80 | 'OR' => [ |
||
81 | 'fetched <' => date('Y-m-d H:i:s', time() - $task['timeout']), |
||
82 | 'fetched' => null |
||
83 | ] |
||
84 | ] |
||
85 | ], |
||
86 | 'failed_count <' => ($task['retries'] + 1) |
||
87 | ]; |
||
88 | $conditions['OR'][] = $tmp; |
||
89 | } |
||
90 | |||
91 | // First, find a list of a few of the oldest unfinished jobs. |
||
92 | $data = $this->find('all', compact('conditions', 'fields', 'order', 'limit')); |
||
93 | |||
94 | if (!empty($data)) { |
||
95 | // Generate a list of their ids |
||
96 | foreach ($data as $item) { |
||
97 | $idlist[] = $item[$this->name]['id']; |
||
98 | if (!empty($item[$this->name]['fetched'])) { |
||
99 | $wasFetched[] = $item[$this->name]['id']; |
||
100 | } |
||
101 | } |
||
102 | |||
103 | // Generate a unique identifier for the current worker thread |
||
104 | $key = sha1(microtime()); |
||
105 | |||
106 | // Try to update one of the found jobs with the key of this worker. |
||
107 | $this->query( |
||
108 | 'UPDATE ' . $this->tablePrefix . $this->table . ' SET worker_key = "' . $key . |
||
109 | '", fetched = "' . date('Y-m-d H:i:s') . '" WHERE ' . |
||
110 | 'id IN(' . implode(',', $idlist) . ') AND ' . |
||
111 | '(worker_key IS NULL OR fetched <= "' . date('Y-m-d H:i:s', time() - $task['timeout']) . '") ' . |
||
112 | 'ORDER BY ' . $this->virtualFields['age'] . ' ASC LIMIT 1' |
||
113 | ); |
||
114 | |||
115 | // Read which one actually got updated, which is the job we are supposed to execute. |
||
116 | $conditions = ['worker_key' => $key]; |
||
117 | $data = $this->find('first', compact('conditions')); |
||
118 | if (!empty($data)) { |
||
119 | // If the job had an existing fetched timestamp, increment the failure counter. |
||
120 | if (in_array($data[$this->name]['id'], $wasFetched)) { |
||
121 | $data[$this->name]['failed_count'] += 1; |
||
122 | $data[$this->name]['failure_message'] = 'Restart after timeout'; |
||
123 | $this->save($data); |
||
124 | } |
||
125 | |||
126 | return $data[$this->name]; |
||
127 | } |
||
128 | } |
||
129 | |||
130 | return false; |
||
131 | } |
||
132 | |||
133 | /** |
||
134 | * Marks a job as completed, removing it from the queue. |
||
135 | * |
||
136 | * @param int $id A job id |
||
137 | * @return mixed On success `Model::$data` if its not empty or true, false on failure |
||
138 | */ |
||
139 | public function markJobDone($id) { |
||
140 | $this->id = $id; |
||
141 | |||
142 | return $this->saveField('completed', date('Y-m-d H:i:s'), true); |
||
143 | } |
||
144 | |||
145 | /** |
||
146 | * Marks a job as failed, incrementing the failed-counter and requeueing it. |
||
147 | * |
||
148 | * @param int $id A job id |
||
149 | * @param string $failureMessage A message to append to the failure message field (optional) |
||
150 | * @return bool Success |
||
151 | * @todo Remove / reimplement getDataSource()->value |
||
152 | * @suppress PhanUndeclaredMethod |
||
153 | */ |
||
154 | public function markJobFailed($id, $failureMessage = null) { |
||
155 | $conditions = compact('id'); |
||
156 | $fields = [ |
||
157 | 'failed_count' => 'failed_count + 1', |
||
158 | 'failure_message' => $this->getDataSource()->value($failureMessage, 'failure_message') |
||
159 | ]; |
||
160 | |||
161 | return $this->updateAll($fields, $conditions); |
||
162 | } |
||
163 | |||
164 | /** |
||
165 | * Returns the number of items in the queue. |
||
166 | * |
||
167 | * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed task. |
||
168 | * |
||
169 | * @param string $taskName A task name to count |
||
170 | * @return int The number of pending jobs |
||
171 | */ |
||
172 | public function getLength($taskName = null) : int { |
||
173 | $conditions = ['completed' => null]; |
||
174 | if (!empty($taskName)) { |
||
175 | $conditions['task'] = $taskName; |
||
176 | } |
||
177 | |||
178 | return (int)$this->find('count', compact('conditions')); |
||
179 | } |
||
180 | |||
181 | /** |
||
182 | * Return a list of all task names in the queue. |
||
183 | * |
||
184 | * @return array A list of task names |
||
185 | */ |
||
186 | public function getTypes() : array { |
||
187 | $fields = ['task', 'task']; |
||
188 | $group = ['task']; |
||
189 | |||
190 | return $this->find('list', compact('fields', 'group')); |
||
191 | } |
||
192 | |||
193 | /** |
||
194 | * Calculates some statistics for finished jobs (that are still in the database). |
||
195 | * |
||
196 | * @return array An array with statistics |
||
197 | */ |
||
198 | public function getStats() : array { |
||
199 | $fields = [ |
||
200 | 'task', |
||
201 | 'COUNT(id) AS num', |
||
202 | 'AVG(UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)) AS alltime', |
||
203 | 'AVG(UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)) AS runtime', |
||
204 | 'AVG(UNIX_TIMESTAMP(fetched) - IF(not_before IS NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))) AS fetchdelay' |
||
205 | ]; |
||
206 | $conditions = ['NOT' => ['completed' => null]]; |
||
207 | $group = ['task']; |
||
208 | |||
209 | return $this->find('all', compact('fields', 'conditions', 'group')); |
||
210 | } |
||
211 | |||
212 | /** |
||
213 | * Cleanups / delete completed jobs with given capabilities after cleanup timeout. |
||
214 | * |
||
215 | * @param array $capabilities Available queue worker tasks. |
||
216 | * @return bool Success |
||
217 | */ |
||
218 | public function cleanOldJobs(array $capabilities) : bool { |
||
219 | $success = true; |
||
220 | foreach ($capabilities as $task) { |
||
221 | list(, $name) = pluginSplit($task['name']); |
||
222 | $conditions = [ |
||
223 | 'task' => $name, |
||
224 | 'completed <' => date('Y-m-d H:i:s', time() - $task['cleanupTimeout']) |
||
225 | ]; |
||
226 | if (!$this->deleteAll($conditions, false)) { |
||
227 | $success = false; |
||
228 | break; |
||
229 | } |
||
230 | } |
||
231 | |||
232 | return $success; |
||
233 | } |
||
234 | |||
235 | /** |
||
236 | * Cleanups / delete failed jobs with given capabilities after maximum retries. |
||
237 | * |
||
238 | * @param array $capabilities Available queue worker tasks. |
||
239 | * @return bool Success |
||
240 | */ |
||
241 | public function cleanFailedJobs(array $capabilities) : bool { |
||
242 | $success = true; |
||
243 | foreach ($capabilities as $task) { |
||
244 | list(, $name) = pluginSplit($task['name']); |
||
245 | $conditions = [ |
||
246 | 'task' => $name, |
||
247 | 'failed_count >' => $task['retries'] |
||
248 | ]; |
||
249 | if (!$this->deleteAll($conditions, false)) { |
||
250 | $success = false; |
||
251 | break; |
||
252 | } |
||
253 | } |
||
254 | |||
255 | return $success; |
||
256 | } |
||
257 | |||
258 | /** |
||
259 | * Filters field `key` based on the provided values. Values prefixed with '-' are excluded. |
||
260 | * |
||
261 | * @param array $conditions Conditions |
||
262 | * @param string $key Key |
||
263 | * @param array $values Values |
||
264 | * @return array the conditions |
||
265 | */ |
||
266 | protected function _addFilter(array $conditions, $key, array $values) : array { |
||
267 | $include = []; |
||
268 | $exclude = []; |
||
269 | foreach ($values as $value) { |
||
270 | if (substr($value, 0, 1) === '-') { |
||
271 | $exclude[] = substr($value, 1); |
||
272 | } else { |
||
273 | $include[] = $value; |
||
274 | } |
||
275 | } |
||
276 | |||
277 | if ($include) { |
||
278 | $conditions[$key . ' IN'] = $include; |
||
279 | } |
||
280 | if ($exclude) { |
||
281 | $conditions[$key . ' NOT IN'] = $exclude; |
||
282 | } |
||
283 | |||
284 | return $conditions; |
||
285 | } |
||
286 | |||
287 | } |
||
288 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.