Total Complexity | 47 |
Total Lines | 402 |
Duplicated Lines | 0 % |
Changes | 12 | ||
Bugs | 2 | Features | 0 |
Complex classes like QueueShell often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use QueueShell, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
14 | class QueueShell extends AppShell { |
||
15 | |||
16 | /** |
||
17 | * An array of names of models to load. |
||
18 | * |
||
19 | * @var array |
||
20 | */ |
||
21 | public $uses = ['Queue.QueuedTask']; |
||
22 | |||
23 | /** |
||
24 | * A list of available queue tasks and their individual configurations. |
||
25 | * |
||
26 | * @var array |
||
27 | */ |
||
28 | protected $_taskConf; |
||
29 | |||
30 | /** |
||
31 | * Indicates whether or not the worker should exit on next the iteration. |
||
32 | * |
||
33 | * @var bool |
||
34 | */ |
||
35 | private $__exit; |
||
36 | |||
37 | /** |
||
38 | * Overwrite shell initialize to dynamically load all queue related tasks. |
||
39 | * |
||
40 | * @return void |
||
41 | */ |
||
42 | public function initialize() { |
||
43 | // Check for tasks inside plugins and application |
||
44 | $paths = App::path('Console/Command/Task'); |
||
45 | |||
46 | foreach ($paths as $path) { |
||
47 | $Folder = new Folder($path); |
||
48 | $res = array_merge($this->tasks, $Folder->find('Queue.*\.php')); |
||
49 | foreach ($res as &$r) { |
||
50 | $r = basename($r, 'Task.php'); |
||
51 | } |
||
52 | |||
53 | $this->tasks = $res; |
||
54 | } |
||
55 | |||
56 | $plugins = App::objects('plugin'); |
||
57 | foreach ($plugins as $plugin) { |
||
58 | $pluginPaths = App::path('Console/Command/Task', $plugin); |
||
59 | foreach ($pluginPaths as $pluginPath) { |
||
60 | $Folder = new Folder($pluginPath); |
||
61 | $res = $Folder->find('Queue.*Task\.php'); |
||
62 | foreach ($res as &$r) { |
||
63 | $r = $plugin . '.' . basename($r, 'Task.php'); |
||
64 | } |
||
65 | |||
66 | $this->tasks = array_merge($this->tasks, $res); |
||
67 | } |
||
68 | } |
||
69 | |||
70 | $conf = Configure::read('Queue'); |
||
71 | if (!is_array($conf)) { |
||
72 | $conf = []; |
||
73 | } |
||
74 | |||
75 | // Merge with default configuration vars. |
||
76 | Configure::write('Queue', array_merge( |
||
77 | [ |
||
78 | 'workers' => 3, |
||
79 | 'sleepTime' => 10, |
||
80 | 'gcprop' => 10, |
||
81 | 'defaultWorkerTimeout' => 2 * MINUTE, |
||
82 | 'defaultWorkerRetries' => 4, |
||
83 | 'workerMaxRuntime' => 0, |
||
84 | 'cleanupTimeout' => DAY, |
||
85 | 'exitWhenNothingToDo' => false, |
||
86 | 'gcOnExit' => true, |
||
87 | ], |
||
88 | $conf |
||
89 | ) |
||
90 | ); |
||
91 | |||
92 | parent::initialize(); |
||
93 | } |
||
94 | |||
95 | /** |
||
96 | * Gets and configures the option parser. |
||
97 | * |
||
98 | * @return ConsoleOptionParser |
||
99 | */ |
||
100 | public function getOptionParser() { |
||
101 | $parser = parent::getOptionParser(); |
||
102 | $parser->addSubcommand('add', [ |
||
103 | 'help' => __d('queue', 'Tries to call the cli `add()` function on a task.'), |
||
104 | 'parser' => [ |
||
105 | 'description' => [ |
||
106 | __d('queue', 'Tries to call the cli `add()` function on a task.'), |
||
107 | __d('queue', 'Tasks may or may not provide this functionality.') |
||
108 | ], |
||
109 | 'arguments' => [ |
||
110 | 'taskname' => [ |
||
111 | 'help' => __d('queue', 'Name of the task.'), |
||
112 | 'required' => true, |
||
113 | 'choices' => $this->taskNames |
||
114 | ], |
||
115 | 'taskdata' => [ |
||
116 | 'help' => __d('queue', 'Data needed by task.'), |
||
117 | 'required' => false, |
||
118 | ] |
||
119 | ] |
||
120 | ] |
||
121 | ])->addSubcommand('runworker', [ |
||
122 | 'help' => __d('queue', 'Run a queue worker.'), |
||
123 | 'parser' => [ |
||
124 | 'description' => [__d('queue', 'Run a queue worker, which will look for a pending task it can execute.')], |
||
125 | 'options' => [ |
||
126 | 'type' => [ |
||
127 | 'short' => 't', |
||
128 | 'help' => 'Type (comma separated list possible)', |
||
129 | 'default' => null |
||
130 | ] |
||
131 | ] |
||
132 | ] |
||
133 | ])->addSubcommand('stats', [ |
||
134 | 'help' => __d('queue', 'Display general statistics.'), |
||
135 | 'parser' => [ |
||
136 | 'description' => __d('queue', 'Display general statistics.'), |
||
137 | ] |
||
138 | ])->addSubcommand('clean', [ |
||
139 | 'help' => __d('queue', 'Manually call cleanup function to delete task data of completed tasks.'), |
||
140 | 'parser' => [ |
||
141 | 'description' => __d('queue', 'Manually call cleanup function to delete task data of completed tasks.') |
||
142 | ] |
||
143 | ])->addSubcommand('clean_failed', [ |
||
144 | 'help' => __d('queue', 'Manually call cleanup function to delete task data of failed tasks.'), |
||
145 | 'parser' => [ |
||
146 | 'description' => __d('queue', 'Manually call cleanup function to delete task data of failed tasks.') |
||
147 | ] |
||
148 | ])->description(__d('queue', 'CakePHP Queue Plugin.')); |
||
149 | |||
150 | return $parser; |
||
151 | } |
||
152 | |||
153 | /** |
||
154 | * Looks for a queue task of the passed name and try to call add() on it. |
||
155 | * |
||
156 | * A queue task may provide an add function to enable the user to create new tasks via commandline. |
||
157 | * |
||
158 | * @return void |
||
159 | */ |
||
160 | public function add() { |
||
161 | $name = Inflector::camelize($this->args[0]); |
||
162 | |||
163 | if (in_array($name, $this->taskNames)) { |
||
164 | $this->{$name}->add(); |
||
165 | } elseif (in_array('Queue' . $name, $this->taskNames)) { |
||
166 | $this->{'Queue' . $name}->add(); |
||
167 | } else { |
||
168 | $this->out(__d('queue', 'Error: Task not Found: %s', $name)); |
||
169 | $this->out('Available Tasks:'); |
||
170 | foreach ($this->taskNames as $loadedTask) { |
||
171 | $this->out(' * ' . $this->_taskName($loadedTask)); |
||
172 | } |
||
173 | } |
||
174 | } |
||
175 | |||
176 | /** |
||
177 | * Output the task without Queue or Task |
||
178 | * example: QueueImageTask becomes Image on display |
||
179 | * |
||
180 | * @param string $task A task name |
||
181 | * @return string Cleaned task name |
||
182 | */ |
||
183 | protected function _taskName($task) { |
||
184 | if (strpos($task, 'Queue') === 0) { |
||
185 | return substr($task, 5); |
||
186 | } |
||
187 | |||
188 | return $task; |
||
189 | } |
||
190 | |||
191 | /** |
||
192 | * Run a queue worker loop. |
||
193 | * |
||
194 | * Runs a queue worker process which will try to find unassigned tasks in the queue |
||
195 | * which it may run and try to fetch and execute them. |
||
196 | * |
||
197 | * @return void |
||
198 | */ |
||
199 | public function runworker() { |
||
200 | // Enable garbage collector (PHP >= 5.3) |
||
201 | if (function_exists('gc_enable')) { |
||
202 | gc_enable(); |
||
203 | } |
||
204 | |||
205 | // Register signal handler(s) if possible |
||
206 | if (function_exists('pcntl_signal')) { |
||
207 | pcntl_signal(SIGTERM, [$this, 'signalHandler']); |
||
208 | pcntl_signal(SIGINT, [$this, 'signalHandler']); |
||
209 | } else { |
||
210 | $this->err(__d('queue', 'Signal handler(s) could not be registered.')); |
||
211 | } |
||
212 | |||
213 | $this->__exit = false; |
||
214 | |||
215 | $workerStartTime = time(); |
||
216 | |||
217 | $typesParam = $this->param('type'); |
||
218 | $types = is_string($typesParam) ? $this->_stringToArray($typesParam) : []; |
||
219 | |||
220 | while (!$this->__exit) { |
||
221 | $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE); |
||
222 | |||
223 | $data = $this->QueuedTask->requestJob($this->_getTaskConf(), $types); |
||
224 | if ($this->QueuedTask->exit === true) { |
||
225 | $this->__exit = true; |
||
226 | } else { |
||
227 | if ($data !== false) { |
||
228 | $jobId = $data['id']; |
||
229 | $taskname = 'Queue' . $data['task']; |
||
230 | $this->out(__d('queue', 'Running job of task \'%s\' \'%d\'.', $data['task'], $jobId)); |
||
231 | |||
232 | $taskStartTime = time(); |
||
233 | $return = $this->{$taskname}->run(unserialize($data['data'])); |
||
234 | $took = time() - $taskStartTime; |
||
235 | if ($return) { |
||
236 | $this->QueuedTask->markJobDone($jobId); |
||
237 | $this->out( |
||
238 | __d( |
||
239 | 'queue', |
||
240 | 'Job \'%d\' finished (took %s).', |
||
241 | $jobId, |
||
242 | __dn('queue', '%d second', '%d seconds', $took, $took) |
||
243 | ) |
||
244 | ); |
||
245 | } else { |
||
246 | $failureMessage = null; |
||
247 | if (isset($this->{$taskname}->failureMessage) && !empty($this->{$taskname}->failureMessage)) { |
||
248 | $failureMessage = $this->{$taskname}->failureMessage; |
||
249 | } |
||
250 | $this->QueuedTask->markJobFailed($jobId, $failureMessage); |
||
251 | $this->out(__d('queue', 'Job \'%d\' did not finish, requeued.', $jobId)); |
||
252 | } |
||
253 | } elseif (Configure::read('Queue.exitWhenNothingToDo')) { |
||
254 | $this->out(__d('queue', 'Nothing to do, exiting.')); |
||
255 | $this->__exit = true; |
||
256 | } else { |
||
257 | $this->out( |
||
258 | __d('queue', 'Nothing to do, sleeping for %d second(s).', Configure::read('Queue.sleepTime')), |
||
259 | 1, Shell::VERBOSE |
||
260 | ); |
||
261 | |||
262 | sleep(Configure::read('Queue.sleepTime')); |
||
263 | } |
||
264 | |||
265 | // Check if we are over the maximum runtime and end processing if so. |
||
266 | if (Configure::read('Queue.workerMaxRuntime') != 0 |
||
267 | && (time() - $workerStartTime) >= Configure::read('Queue.workerMaxRuntime') |
||
268 | ) { |
||
269 | $this->__exit = true; |
||
270 | $this->out(__d('queue', |
||
271 | 'Reached runtime of %s seconds (max. %s), terminating.', |
||
272 | (time() - $workerStartTime), |
||
273 | Configure::read('Queue.workerMaxRuntime') |
||
274 | )); |
||
275 | } |
||
276 | |||
277 | if (($this->_exit && Configure::read('Queue.gcOnExit')) || rand(0, 100) > (100 - Configure::read('Queue.gcprop'))) { |
||
278 | $this->out(__d('queue', 'Performing old job cleanup.')); |
||
279 | $this->QueuedTask->cleanOldJobs($this->_getTaskConf()); |
||
280 | } |
||
281 | } |
||
282 | } |
||
283 | } |
||
284 | |||
285 | /** |
||
286 | * Triggers manual job cleanup of completed jobs. |
||
287 | * |
||
288 | * @return void |
||
289 | */ |
||
290 | public function clean() { |
||
291 | $this->out(__d('queue', 'Deleting old completed jobs, that have had cleanup timeout.')); |
||
292 | $this->QueuedTask->cleanOldJobs($this->_getTaskConf()); |
||
293 | } |
||
294 | |||
295 | /** |
||
296 | * Triggers manual job cleanup of failed jobs. |
||
297 | * |
||
298 | * @return void |
||
299 | */ |
||
300 | public function clean_failed() { |
||
301 | $this->out(__d('queue', 'Deleting failed Jobs, that have had maximum worker retries.')); |
||
302 | $this->QueuedTask->cleanFailedJobs($this->_getTaskConf()); |
||
303 | } |
||
304 | |||
305 | /** |
||
306 | * Displays some statistics about finished Jobs. |
||
307 | * |
||
308 | * @return void |
||
309 | */ |
||
310 | public function stats() { |
||
338 | } |
||
339 | } |
||
340 | |||
341 | /** |
||
342 | * Returns a list of available queue tasks and their individual configurations. |
||
343 | * |
||
344 | * @return array A list of available queue tasks and their individual configurations |
||
345 | */ |
||
346 | protected function _getTaskConf() { |
||
347 | if (!is_array($this->_taskConf)) { |
||
348 | $this->_taskConf = []; |
||
349 | foreach ($this->tasks as $task) { |
||
350 | list($pluginName, $taskName) = pluginSplit($task); |
||
351 | |||
352 | $this->_taskConf[$taskName]['name'] = substr($taskName, 5); |
||
353 | $this->_taskConf[$taskName]['plugin'] = $pluginName; |
||
354 | |||
355 | if (property_exists($this->{$taskName}, 'timeout')) { |
||
356 | $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout; |
||
357 | } else { |
||
358 | $this->_taskConf[$taskName]['timeout'] = Configure::read('Queue.defaultWorkerTimeout'); |
||
359 | } |
||
360 | if (property_exists($this->{$taskName}, 'retries')) { |
||
361 | $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries; |
||
362 | } else { |
||
363 | $this->_taskConf[$taskName]['retries'] = Configure::read('Queue.defaultWorkerRetries'); |
||
364 | } |
||
365 | if (property_exists($this->{$taskName}, 'cleanupTimeout')) { |
||
366 | $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout; |
||
367 | } else { |
||
368 | $this->_taskConf[$taskName]['cleanupTimeout'] = Configure::read('Queue.cleanupTimeout'); |
||
369 | } |
||
370 | } |
||
371 | } |
||
372 | |||
373 | return $this->_taskConf; |
||
374 | } |
||
375 | |||
376 | /** |
||
377 | * Signal handler (for SIGTERM and SIGINT signal) |
||
378 | * |
||
379 | * @param int $signalNumber A signal number |
||
380 | * @return void |
||
381 | */ |
||
382 | public function signalHandler($signalNumber) { |
||
383 | switch($signalNumber) { |
||
384 | case SIGTERM: |
||
385 | $this->out(__d('queue', 'Caught %s signal, exiting.', sprintf('SIGTERM (%d)', SIGTERM))); |
||
386 | |||
387 | $this->__exit = true; |
||
388 | break; |
||
389 | case SIGINT: |
||
390 | $this->out(__d('queue', 'Caught %s signal, exiting.', sprintf('SIGINT (%d)', SIGINT))); |
||
391 | |||
392 | $this->__exit = true; |
||
393 | break; |
||
394 | } |
||
395 | } |
||
396 | |||
397 | /** |
||
398 | * Converts string to array |
||
399 | * |
||
400 | * @param string|null $param String to convert |
||
401 | * @return array |
||
402 | */ |
||
403 | protected function _stringToArray(string $param = null) : array { |
||
416 | } |
||
417 | |||
418 | } |
||
419 |