Completed
Pull Request — master (#35)
by
unknown
02:56
created

QueueShell   F

Complexity

Total Complexity 61

Size/Duplication

Total Lines 519
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 214
dl 0
loc 519
rs 3.52
c 0
b 0
f 0
wmc 61

20 Methods

Rating   Name   Duplication   Size   Complexity  
A startup() 0 7 2
A initialize() 0 6 1
A clean() 0 8 2
A add() 0 21 4
A _exit() 0 4 1
A _stringToArray() 0 9 2
A _logError() 0 11 2
A getDescription() 0 16 2
A _timeNeeded() 0 6 1
A _displayAvailableTasks() 0 5 2
A stats() 0 19 3
A _taskName() 0 7 2
A _getTaskConf() 0 28 6
A _memoryUsage() 0 10 2
B runJob() 0 48 7
A _log() 0 17 4
B runworker() 0 58 11
A getOptionParser() 0 40 1
A settings() 0 15 4
A _time() 0 7 2

How to fix   Complexity   

Complex Class

Complex classes like QueueShell often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueShell, and based on these observations, apply Extract Interface, too.

1
<?php
2
namespace Queue\Shell;
3
4
use Cake\Console\Shell;
5
use Cake\Core\Configure;
6
use Cake\I18n\Number;
7
use Cake\Log\Log;
8
use Cake\Utility\Inflector;
9
use Cake\Utility\Text;
10
use Exception;
11
use Queue\Model\Entity\QueuedTask;
12
use Queue\Model\QueueException;
13
use Queue\Queue\Config;
14
use Queue\Queue\TaskFinder;
15
use Queue\Shell\Task\AddInterface;
16
use Queue\Shell\Task\QueueTaskInterface;
17
use RuntimeException;
18
use Throwable;
19
20
declare(ticks = 1);
21
22
/**
23
 * Main shell to init and run queue workers.
24
 *
25
 * @author [email protected]
26
 * @license http://www.opensource.org/licenses/mit-license.php The MIT License
27
 * @link http://github.com/MSeven/cakephp_queue
28
 * @property \Queue\Model\Table\QueuedTasksTable $QueuedTasks
29
 */
30
class QueueShell extends Shell
31
{
32
33
    /**
34
     *
35
     * @var string
36
     */
37
    public $modelClass = 'Queue.QueuedTasks';
38
39
    /**
40
     *
41
     * @var array|null
42
     */
43
    protected $_taskConf;
44
45
    /**
46
     *
47
     * @var int
48
     */
49
    protected $_time = 0;
50
51
    /**
52
     *
53
     * @var bool
54
     */
55
    protected $_exit = false;
56
57
    /**
58
     * Overwrite shell initialize to dynamically load all Queue Related Tasks.
59
     *
60
     * @return void
61
     */
62
    public function initialize()
63
    {
64
        $taskFinder = new TaskFinder();
65
        $this->tasks = $taskFinder->allAppAndPluginTasks();
66
67
        parent::initialize();
68
    }
69
70
    /**
71
     *
72
     * @return void
73
     */
74
    public function startup()
75
    {
76
        if ($this->param('quiet')) {
77
            $this->interactive = false;
78
        }
79
80
        parent::startup();
81
    }
82
83
    /**
84
     *
85
     * @return string
86
     */
87
    public function getDescription()
88
    {
89
        $tasks = [];
90
        foreach ($this->taskNames as $loadedTask) {
91
            $tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
92
        }
93
        $tasks = implode(PHP_EOL, $tasks);
94
95
        $text = <<<TEXT
96
Simple and minimalistic job queue (or deferred-task) system.
97
98
Available Tasks:
99
$tasks
100
TEXT;
101
102
        return $text;
103
    }
104
105
    /**
106
     * Look for a Queue Task of hte passed name and try to call add() on it.
107
     * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
108
     *
109
     * @return void
110
     */
111
    public function add()
112
    {
113
        if (count($this->args) < 1) {
114
            $this->out('Please call like this:');
115
            $this->out('       bin/cake queue add <taskname>');
116
            $this->_displayAvailableTasks();
117
118
            return;
119
        }
120
121
        $name = Inflector::camelize($this->args[0]);
122
        if (in_array('Queue' . $name, $this->taskNames, true)) {
123
            /** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
124
            $task = $this->{'Queue' . $name};
125
            if (!($task instanceof AddInterface)) {
126
                $this->abort('This task does not support adding via CLI call');
127
            }
128
            $task->add();
129
        } else {
130
            $this->out('Error: Task not found: ' . $name);
131
            $this->_displayAvailableTasks();
132
        }
133
    }
134
135
    /**
136
     * Output the task without Queue or Task
137
     * example: QueueImageTask becomes Image on display
138
     *
139
     * @param string $task Task name
140
     * @return string Cleaned task name
141
     */
142
    protected function _taskName($task)
143
    {
144
        if (strpos($task, 'Queue') === 0) {
145
            return substr($task, 5);
146
        }
147
148
        return $task;
149
    }
150
151
    /**
152
     * Run a QueueWorker loop.
153
     * Runs a Queue Worker process which will try to find unassigned jobs in the queue
154
     * which it may run and try to fetch and execute them.
155
     *
156
     * @return void
157
     */
158
    public function runworker()
159
    {
160
        // Enable Garbage Collector (PHP >= 5.3)
161
        if (function_exists('gc_enable')) {
162
            gc_enable();
163
        }
164
        if (function_exists('pcntl_signal')) {
165
            pcntl_signal(SIGTERM, [
166
                &$this,
167
                '_exit'
168
            ]);
169
            pcntl_signal(SIGINT, [
170
                &$this,
171
                '_exit'
172
            ]);
173
            pcntl_signal(SIGTSTP, [
174
                &$this,
175
                '_exit'
176
            ]);
177
            pcntl_signal(SIGQUIT, [
178
                &$this,
179
                '_exit'
180
            ]);
181
        }
182
        $this->_exit = false;
183
184
        $startTime = time();
185
        $types = $this->_stringToArray($this->param('type'));
186
187
        while (!$this->_exit) {
188
            $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
189
190
            $queuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
191
192
            if ($queuedTask) {
193
                $this->runJob($queuedTask);
194
            } elseif (Configure::read('Queue.exitWhenNothingToDo')) {
195
                $this->out(__d('queue', 'nothing to do, exiting.'));
196
                $this->_exit = true;
197
            } else {
198
                $this->out(__d('queue', 'nothing to do, sleeping.'));
199
                sleep(Config::sleepTime());
200
            }
201
202
            // check if we are over the maximum runtime and end processing if so.
203
            if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
204
                $this->_exit = true;
205
                $this->out(__d('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.'));
206
            }
207
            if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
208
                $this->out(__d('queue', 'Performing old job cleanup.'));
209
                $this->QueuedTasks->cleanOldJobs();
210
            }
211
            $this->hr();
212
        }
213
214
        if ($this->param('verbose')) {
215
            $this->_log('endworker');
216
        }
217
    }
218
219
    /**
220
     *
221
     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
222
     * @return void
223
     */
224
    protected function runJob(QueuedTask $queuedTask)
225
    {
226
        $this->out('Running Job of type "' . $queuedTask->task . '"');
227
        $this->_log('job ' . $queuedTask->task . ', id ' . $queuedTask->id, null, false);
228
        $taskName = 'Queue' . $queuedTask->task;
229
230
        try {
231
            $this->_time = time();
232
233
            $data = unserialize($queuedTask->data);
234
            /** @var \Queue\Shell\Task\QueueTask $task */
235
            $task = $this->{$taskName};
236
            if (!$task instanceof QueueTaskInterface) {
0 ignored issues
show
introduced by
$task is always a sub-type of Queue\Shell\Task\QueueTaskInterface.
Loading history...
237
                throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
238
            }
239
240
            $return = $task->run((array)$data, $queuedTask->id);
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $return is correct as $task->run((array)$data, $queuedTask->id) targeting Queue\Shell\Task\QueueTaskInterface::run() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
241
            if ($return !== null) {
0 ignored issues
show
introduced by
The condition $return !== null is always true.
Loading history...
242
                trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
243
            }
244
            $failureMessage = $taskName . ' failed';
245
        } catch (Throwable $e) {
246
            $return = false;
247
248
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
249
            if (!($e instanceof QueueException)) {
250
                $failureMessage .= "\n" . $e->getTraceAsString();
251
            }
252
253
            $this->_logError($taskName . ' (job ' . $queuedTask->id . ')' . "\n" . $failureMessage);
254
        } catch (Exception $e) {
255
            $return = false;
256
257
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
258
            $this->_logError($taskName . "\n" . $failureMessage);
259
        }
260
261
        if ($return === false) {
0 ignored issues
show
introduced by
The condition $return === false is always true.
Loading history...
262
            $this->QueuedTasks->markJobFailed($queuedTask, $failureMessage);
263
            $failedStatus = $this->QueuedTasks->getFailedStatus($queuedTask, $this->_getTaskConf());
264
            $this->_log('job ' . $queuedTask->task . ', id ' . $queuedTask->id . ' failed and ' . $failedStatus);
265
            $this->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedTask->failed . '.');
0 ignored issues
show
Bug Best Practice introduced by
The property failed does not exist on Queue\Model\Entity\QueuedTask. Since you implemented __get, consider adding a @property annotation.
Loading history...
266
267
            return;
268
        }
269
270
        $this->QueuedTasks->markJobDone($queuedTask);
271
        $this->out('Job Finished.');
272
    }
273
274
    /**
275
     * Manually trigger a Finished job cleanup.
276
     *
277
     * @return void
278
     */
279
    public function clean()
280
    {
281
        if (!Configure::read('Queue.cleanupTimeout')) {
282
            $this->abort('You disabled cleanuptimout in config. Aborting.');
283
        }
284
285
        $this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
286
        $this->QueuedTasks->cleanOldJobs();
287
    }
288
289
    /**
290
     * Display current settings
291
     *
292
     * @return void
293
     */
294
    public function settings()
295
    {
296
        $this->out('Current Settings:');
297
        $conf = (array)Configure::read('Queue');
298
        foreach ($conf as $key => $val) {
299
            if ($val === false) {
300
                $val = 'no';
301
            }
302
            if ($val === true) {
303
                $val = 'yes';
304
            }
305
            $this->out('* ' . $key . ': ' . print_r($val, true));
306
        }
307
308
        $this->out();
309
    }
310
311
    /**
312
     * Display some statistics about Finished Jobs.
313
     *
314
     * @return void
315
     */
316
    public function stats()
317
    {
318
        $this->out('Jobs currently in the queue:');
319
320
        $types = $this->QueuedTasks->getTypes()->toArray();
321
        foreach ($types as $type) {
322
            $this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
323
        }
324
        $this->hr();
325
        $this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
326
        $this->hr();
327
        $this->out('Finished job statistics:');
328
        $data = $this->QueuedTasks->getStats();
329
        foreach ($data as $item) {
330
            $this->out(' ' . $item['task'] . ': ');
331
            $this->out('   Finished Jobs in Database: ' . $item['num']);
332
            $this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
333
            $this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
334
            $this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
335
        }
336
    }
337
338
    /**
339
     * Get option parser method to parse commandline options
340
     *
341
     * @return \Cake\Console\ConsoleOptionParser
342
     */
343
    public function getOptionParser()
344
    {
345
        $subcommandParser = [
346
            'options' => [
347
                /*
348
                 * 'dry-run'=> array(
349
                 * 'short' => 'd',
350
                 * 'help' => 'Dry run the update, no jobs will actually be added.',
351
                 * 'boolean' => true
352
                 * ),
353
                 */
354
            ]
355
        ];
356
        $subcommandParserFull = $subcommandParser;
357
        $subcommandParserFull['options']['type'] = [
358
            'short' => 't',
359
            'help' => 'Type (comma separated list possible)',
360
            'default' => null
361
        ];
362
363
        return parent::getOptionParser()->setDescription($this->getDescription())
364
            ->addSubcommand('clean', [
365
                'help' => 'Remove old jobs (cleanup)',
366
                'parser' => $subcommandParser
367
            ])
368
            ->addSubcommand('add', [
369
                'help' => 'Add Job',
370
                'parser' => $subcommandParser
371
            ])
372
            ->addSubcommand('stats', [
373
                'help' => 'Stats',
374
                'parser' => $subcommandParserFull
375
            ])
376
            ->addSubcommand('settings', [
377
                'help' => 'Settings',
378
                'parser' => $subcommandParserFull
379
            ])
380
            ->addSubcommand('runworker', [
381
                'help' => 'Run Worker',
382
                'parser' => $subcommandParserFull
383
            ]);
384
    }
385
386
    /**
387
     * Timestamped log.
388
     *
389
     * @param string $message Log type
390
     * @param string|null $pid PID of the process
391
     * @param bool $addDetails Details
392
     * @return void
393
     */
394
    protected function _log($message, $pid = null, $addDetails = true)
395
    {
396
        if (!Configure::read('Queue.log')) {
397
            return;
398
        }
399
400
        if ($addDetails) {
401
            $timeNeeded = $this->_timeNeeded();
402
            $memoryUsage = $this->_memoryUsage();
403
            $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
404
        }
405
406
        if ($pid) {
407
            $message .= ' (pid ' . $pid . ')';
408
        }
409
        Log::write('info', $message, [
410
            'scope' => 'queue'
411
        ]);
412
    }
413
414
    /**
415
     *
416
     * @param string $message Message
417
     * @param string|null $pid PID of the process
418
     * @return void
419
     */
420
    protected function _logError($message, $pid = null)
421
    {
422
        $timeNeeded = $this->_timeNeeded();
423
        $memoryUsage = $this->_memoryUsage();
424
        $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
425
426
        if ($pid) {
427
            $message .= ' (pid ' . $pid . ')';
428
        }
429
430
        Log::write('error', $message);
431
    }
432
433
    /**
434
     * Returns a List of available QueueTasks and their individual configurations.
435
     *
436
     * @return array
437
     */
438
    protected function _getTaskConf()
439
    {
440
        if (!is_array($this->_taskConf)) {
441
            $this->_taskConf = [];
442
            foreach ($this->tasks as $task) {
443
                list ($pluginName, $taskName) = pluginSplit($task);
444
445
                $this->_taskConf[$taskName]['name'] = substr($taskName, 5);
446
                $this->_taskConf[$taskName]['plugin'] = $pluginName;
447
                if (property_exists($this->{$taskName}, 'timeout')) {
448
                    $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
449
                } else {
450
                    $this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
451
                }
452
                if (property_exists($this->{$taskName}, 'retries')) {
453
                    $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
454
                } else {
455
                    $this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
456
                }
457
                if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
458
                    $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
459
                } else {
460
                    $this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
461
                }
462
            }
463
        }
464
465
        return $this->_taskConf;
466
    }
467
468
    /**
469
     * Signal handling to queue worker for clean shutdown
470
     *
471
     * @param int $signal The signal
472
     * @return void
473
     */
474
    protected function _exit($signal)
475
    {
476
        $this->out(__d('queue', 'Caught signal {0}, exiting.', $signal));
477
        $this->_exit = true;
478
    }
479
480
    /**
481
     *
482
     * @return void
483
     */
484
    protected function _displayAvailableTasks()
485
    {
486
        $this->out('Available Tasks:');
487
        foreach ($this->taskNames as $loadedTask) {
488
            $this->out("\t" . '* ' . $this->_taskName($loadedTask));
489
        }
490
    }
491
492
    /**
493
     *
494
     * @return string Memory usage in MB.
495
     */
496
    protected function _memoryUsage()
497
    {
498
        $limit = ini_get('memory_limit');
499
500
        $used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
501
        if ($limit !== '-1') {
502
            $used .= '/' . $limit;
503
        }
504
505
        return $used;
506
    }
507
508
    /**
509
     *
510
     * @return string
511
     */
512
    protected function _timeNeeded()
513
    {
514
        $diff = $this->_time() - $this->_time($this->_time);
515
        $seconds = max($diff, 1);
516
517
        return $seconds . 's';
518
    }
519
520
    /**
521
     *
522
     * @param int|null $providedTime Provided time
523
     *
524
     * @return int
525
     */
526
    protected function _time($providedTime = null)
527
    {
528
        if ($providedTime !== null) {
529
            return $providedTime;
530
        }
531
532
        return time();
533
    }
534
535
    /**
536
     *
537
     * @param string|null $param String to convert
538
     * @return array
539
     */
540
    protected function _stringToArray($param)
541
    {
542
        if (!$param) {
543
            return [];
544
        }
545
546
        $array = Text::tokenize($param);
547
548
        return array_filter($array);
549
    }
550
}
551