Passed
Pull Request — master (#35)
by
unknown
03:02
created

QueueShell::_memoryUsage()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nc 2
nop 0
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
<?php
2
namespace Queue\Shell;
3
4
use Cake\Console\Shell;
5
use Cake\Core\Configure;
6
use Cake\I18n\Number;
7
use Cake\Log\Log;
8
use Cake\ORM\Exception\PersistenceFailedException;
9
use Cake\Utility\Inflector;
10
use Cake\Utility\Text;
11
use Exception;
12
use Queue\Model\Entity\QueuedTask;
13
use Queue\Model\QueueException;
14
use Queue\Queue\Config;
15
use Queue\Queue\TaskFinder;
16
use Queue\Shell\Task\AddInterface;
17
use Queue\Shell\Task\QueueTaskInterface;
18
use RuntimeException;
19
use Throwable;
20
21
declare(ticks = 1);
22
23
/**
24
 * Main shell to init and run queue workers.
25
 *
26
 * @author [email protected]
27
 * @license http://www.opensource.org/licenses/mit-license.php The MIT License
28
 * @link http://github.com/MSeven/cakephp_queue
29
 * @property \Queue\Model\Table\QueuedTasksTable $QueuedTasks
30
 */
31
class QueueShell extends Shell
32
{
33
34
    /**
35
     *
36
     * @var string
37
     */
38
    public $modelClass = 'Queue.QueuedTasks';
39
40
    /**
41
     *
42
     * @var array|null
43
     */
44
    protected $_taskConf;
45
46
    /**
47
     *
48
     * @var int
49
     */
50
    protected $_time = 0;
51
52
    /**
53
     *
54
     * @var bool
55
     */
56
    protected $_exit = false;
57
58
    /**
59
     * Overwrite shell initialize to dynamically load all Queue Related Tasks.
60
     *
61
     * @return void
62
     */
63
    public function initialize()
64
    {
65
        $taskFinder = new TaskFinder();
66
        $this->tasks = $taskFinder->allAppAndPluginTasks();
67
68
        parent::initialize();
69
    }
70
71
    /**
72
     *
73
     * @return void
74
     */
75
    public function startup()
76
    {
77
        if ($this->param('quiet')) {
78
            $this->interactive = false;
79
        }
80
81
        parent::startup();
82
    }
83
84
    /**
85
     *
86
     * @return string
87
     */
88
    public function getDescription()
89
    {
90
        $tasks = [];
91
        foreach ($this->taskNames as $loadedTask) {
92
            $tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
93
        }
94
        $tasks = implode(PHP_EOL, $tasks);
95
96
        $text = <<<TEXT
97
Simple and minimalistic job queue (or deferred-task) system.
98
99
Available Tasks:
100
$tasks
101
TEXT;
102
103
        return $text;
104
    }
105
106
    /**
107
     * Look for a Queue Task of hte passed name and try to call add() on it.
108
     * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
109
     *
110
     * @return void
111
     */
112
    public function add()
113
    {
114
        if (count($this->args) < 1) {
115
            $this->out('Please call like this:');
116
            $this->out('       bin/cake queue add <taskname>');
117
            $this->_displayAvailableTasks();
118
119
            return;
120
        }
121
122
        $name = Inflector::camelize($this->args[0]);
123
        if (in_array('Queue' . $name, $this->taskNames, true)) {
124
            /** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
125
            $task = $this->{'Queue' . $name};
126
            if (!($task instanceof AddInterface)) {
127
                $this->abort('This task does not support adding via CLI call');
128
            }
129
            $task->add();
130
        } else {
131
            $this->out('Error: Task not found: ' . $name);
132
            $this->_displayAvailableTasks();
133
        }
134
    }
135
136
    /**
137
     * Output the task without Queue or Task
138
     * example: QueueImageTask becomes Image on display
139
     *
140
     * @param string $task Task name
141
     * @return string Cleaned task name
142
     */
143
    protected function _taskName($task)
144
    {
145
        if (strpos($task, 'Queue') === 0) {
146
            return substr($task, 5);
147
        }
148
149
        return $task;
150
    }
151
152
    /**
153
     * Run a QueueWorker loop.
154
     * Runs a Queue Worker process which will try to find unassigned jobs in the queue
155
     * which it may run and try to fetch and execute them.
156
     *
157
     * @return int|null
158
     */
159
    public function runworker()
160
    {
161
        try {
162
            $pid = $this->_initPid();
163
        } catch (PersistenceFailedException $exception) {
164
            $this->err($exception->getMessage());
165
            $limit = (int)Configure::read('Queue.maxWorkers');
166
            if ($limit) {
167
                $this->out('Cannot start worker: Too many workers already/still running on this server (' . $limit . '/' . $limit . ')');
168
            }
169
170
            return static::CODE_ERROR;
171
        }
172
173
        // Enable Garbage Collector (PHP >= 5.3)
174
        if (function_exists('gc_enable')) {
175
            gc_enable();
176
        }
177
        if (function_exists('pcntl_signal')) {
178
            pcntl_signal(SIGTERM, [
179
                &$this,
180
                '_exit'
181
            ]);
182
            pcntl_signal(SIGINT, [
183
                &$this,
184
                '_exit'
185
            ]);
186
            pcntl_signal(SIGTSTP, [
187
                &$this,
188
                '_exit'
189
            ]);
190
            pcntl_signal(SIGQUIT, [
191
                &$this,
192
                '_exit'
193
            ]);
194
        }
195
        $this->_exit = false;
196
197
        $startTime = time();
198
        $types = $this->_stringToArray($this->param('type'));
199
200
        while (!$this->_exit) {
201
            $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
202
203
            $QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
204
205
            if ($QueuedTask) {
206
                $this->runJob($QueuedTask, $pid);
207
            } elseif (Configure::read('Queue.exitWhenNothingToDo')) {
208
                $this->out('nothing to do, exiting.');
209
                $this->_exit = true;
210
            } else {
211
                $this->out('nothing to do, sleeping.');
212
                sleep(Config::sleepTime());
213
            }
214
215
            // check if we are over the maximum runtime and end processing if so.
216
            if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
217
                $this->_exit = true;
218
                $this->out('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
0 ignored issues
show
Bug introduced by
'Reached runtime of ' . ...e') . '), terminating.' of type string is incompatible with the type integer expected by parameter $newlines of Cake\Console\Shell::out(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

218
                $this->out('queue', /** @scrutinizer ignore-type */ 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
Loading history...
219
            }
220
            if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
221
                $this->out(__d('queue', 'Performing old job cleanup.'));
222
                $this->QueuedTasks->cleanOldJobs();
223
            }
224
            $this->hr();
225
        }
226
227
        $this->_deletePid($pid);
228
229
        if ($this->param('verbose')) {
230
            $this->_log('endworker', $pid);
231
        }
232
    }
233
234
    /**
235
     *
236
     * @param \Queue\Model\Entity\QueuedTask $QueuedTask Queued task
237
     * @param string $pid PID of the process
238
     * @return void
239
     */
240
    protected function runJob(QueuedTask $QueuedTask, $pid)
241
    {
242
        $this->out('Running Job of type "' . $QueuedTask->task . '"');
243
        $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, $pid, false);
244
        $taskName = 'Queue' . $QueuedTask->task;
245
246
        try {
247
            $this->_time = time();
248
249
            $data = unserialize($QueuedTask->data);
250
            /** @var \Queue\Shell\Task\QueueTask $task */
251
            $task = $this->{$taskName};
252
            if (!$task instanceof QueueTaskInterface) {
0 ignored issues
show
introduced by
$task is always a sub-type of Queue\Shell\Task\QueueTaskInterface.
Loading history...
253
                throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
254
            }
255
256
            $return = $task->run((array)$data, $QueuedTask->id);
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $return is correct as $task->run((array)$data, $QueuedTask->id) targeting Queue\Shell\Task\QueueTaskInterface::run() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
257
            if ($return !== null) {
0 ignored issues
show
introduced by
The condition $return !== null is always true.
Loading history...
258
                trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
259
            }
260
            $failureMessage = $taskName . ' failed';
261
        } catch (Throwable $e) {
262
            $return = false;
263
264
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
265
            if (!($e instanceof QueueException)) {
266
                $failureMessage .= "\n" . $e->getTraceAsString();
267
            }
268
269
            $this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage, $pid);
270
        } catch (Exception $e) {
271
            $return = false;
272
273
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
274
            $this->_logError($taskName . "\n" . $failureMessage, $pid);
275
        }
276
277
        if ($return === false) {
0 ignored issues
show
introduced by
The condition $return === false is always true.
Loading history...
278
            $this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
279
            $failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
280
            $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus, $pid);
281
            $this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
0 ignored issues
show
Bug Best Practice introduced by
The property failed does not exist on Queue\Model\Entity\QueuedTask. Since you implemented __get, consider adding a @property annotation.
Loading history...
282
283
            return;
284
        }
285
286
        $this->QueuedTasks->markJobDone($QueuedTask);
287
        $this->out('Job Finished.');
288
    }
289
290
    /**
291
     * Manually trigger a Finished job cleanup.
292
     *
293
     * @return void
294
     */
295
    public function clean()
296
    {
297
        if (!Configure::read('Queue.cleanupTimeout')) {
298
            $this->abort('You disabled cleanuptimout in config. Aborting.');
299
        }
300
301
        $this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
302
        $this->QueuedTasks->cleanOldJobs();
303
    }
304
305
    /**
306
     * Display current settings
307
     *
308
     * @return void
309
     */
310
    public function settings()
311
    {
312
        $this->out('Current Settings:');
313
        $conf = (array)Configure::read('Queue');
314
        foreach ($conf as $key => $val) {
315
            if ($val === false) {
316
                $val = 'no';
317
            }
318
            if ($val === true) {
319
                $val = 'yes';
320
            }
321
            $this->out('* ' . $key . ': ' . print_r($val, true));
322
        }
323
324
        $this->out();
325
    }
326
327
    /**
328
     * Display some statistics about Finished Jobs.
329
     *
330
     * @return void
331
     */
332
    public function stats()
333
    {
334
        $this->out('Jobs currently in the queue:');
335
336
        $types = $this->QueuedTasks->getTypes()->toArray();
337
        foreach ($types as $type) {
338
            $this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
339
        }
340
        $this->hr();
341
        $this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
342
        $this->hr();
343
        $this->out('Finished job statistics:');
344
        $data = $this->QueuedTasks->getStats();
345
        foreach ($data as $item) {
346
            $this->out(' ' . $item['task'] . ': ');
347
            $this->out('   Finished Jobs in Database: ' . $item['num']);
348
            $this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
349
            $this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
350
            $this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
351
        }
352
    }
353
354
    /**
355
     * Get option parser method to parse commandline options
356
     *
357
     * @return \Cake\Console\ConsoleOptionParser
358
     */
359
    public function getOptionParser()
360
    {
361
        $subcommandParser = [
362
            'options' => [
363
                /*
364
                 * 'dry-run'=> array(
365
                 * 'short' => 'd',
366
                 * 'help' => 'Dry run the update, no jobs will actually be added.',
367
                 * 'boolean' => true
368
                 * ),
369
                 */
370
            ]
371
        ];
372
        $subcommandParserFull = $subcommandParser;
373
        $subcommandParserFull['options']['type'] = [
374
            'short' => 't',
375
            'help' => 'Type (comma separated list possible)',
376
            'default' => null
377
        ];
378
379
        return parent::getOptionParser()->setDescription($this->getDescription())
380
            ->addSubcommand('clean', [
381
                'help' => 'Remove old jobs (cleanup)',
382
                'parser' => $subcommandParser
383
            ])
384
            ->addSubcommand('add', [
385
                'help' => 'Add Job',
386
                'parser' => $subcommandParser
387
            ])
388
            ->addSubcommand('stats', [
389
                'help' => 'Stats',
390
                'parser' => $subcommandParserFull
391
            ])
392
            ->addSubcommand('settings', [
393
                'help' => 'Settings',
394
                'parser' => $subcommandParserFull
395
            ])
396
            ->addSubcommand('runworker', [
397
                'help' => 'Run Worker',
398
                'parser' => $subcommandParserFull
399
            ]);
400
    }
401
402
    /**
403
     * Timestamped log.
404
     *
405
     * @param string $message Log type
406
     * @param string|null $pid PID of the process
407
     * @param bool $addDetails Details
408
     * @return void
409
     */
410
    protected function _log($message, $pid = null, $addDetails = true)
411
    {
412
        if (!Configure::read('Queue.log')) {
413
            return;
414
        }
415
416
        if ($addDetails) {
417
            $timeNeeded = $this->_timeNeeded();
418
            $memoryUsage = $this->_memoryUsage();
419
            $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
420
        }
421
422
        if ($pid) {
423
            $message .= ' (pid ' . $pid . ')';
424
        }
425
        Log::write('info', $message, [
426
            'scope' => 'queue'
427
        ]);
428
    }
429
430
    /**
431
     *
432
     * @param string $message Message
433
     * @param string|null $pid PID of the process
434
     * @return void
435
     */
436
    protected function _logError($message, $pid = null)
437
    {
438
        $timeNeeded = $this->_timeNeeded();
439
        $memoryUsage = $this->_memoryUsage();
440
        $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
441
442
        if ($pid) {
443
            $message .= ' (pid ' . $pid . ')';
444
        }
445
446
        Log::write('error', $message);
447
    }
448
449
    /**
450
     * Returns a List of available QueueTasks and their individual configurations.
451
     *
452
     * @return array
453
     */
454
    protected function _getTaskConf()
455
    {
456
        if (!is_array($this->_taskConf)) {
457
            $this->_taskConf = [];
458
            foreach ($this->tasks as $task) {
459
                list ($pluginName, $taskName) = pluginSplit($task);
460
461
                $this->_taskConf[$taskName]['name'] = substr($taskName, 5);
462
                $this->_taskConf[$taskName]['plugin'] = $pluginName;
463
                if (property_exists($this->{$taskName}, 'timeout')) {
464
                    $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
465
                } else {
466
                    $this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
467
                }
468
                if (property_exists($this->{$taskName}, 'retries')) {
469
                    $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
470
                } else {
471
                    $this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
472
                }
473
                if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
474
                    $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
475
                } else {
476
                    $this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
477
                }
478
            }
479
        }
480
481
        return $this->_taskConf;
482
    }
483
484
    /**
485
     * Signal handling to queue worker for clean shutdown
486
     *
487
     * @param int $signal The signal
488
     * @return void
489
     */
490
    protected function _exit($signal)
491
    {
492
        $this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
493
        $this->_exit = true;
494
    }
495
496
    /**
497
     *
498
     * @return void
499
     */
500
    protected function _displayAvailableTasks()
501
    {
502
        $this->out('Available Tasks:');
503
        foreach ($this->taskNames as $loadedTask) {
504
            $this->out("\t" . '* ' . $this->_taskName($loadedTask));
505
        }
506
    }
507
508
    /**
509
     *
510
     * @return string
511
     */
512
    protected function _initPid()
513
    {
514
        $this->_pid = $this->_retrievePid();
0 ignored issues
show
Bug Best Practice introduced by
The property _pid does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
515
516
        return $this->_pid;
517
    }
518
519
    /**
520
     *
521
     * @return string
522
     */
523
    protected function _retrievePid()
524
    {
525
        if (function_exists('posix_getpid')) {
526
            $pid = (string)posix_getpid();
527
        } else {
528
            $pid = $this->QueuedTasks->key();
0 ignored issues
show
Bug introduced by
The method key() does not exist on Queue\Model\Table\QueuedTasksTable. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

528
            /** @scrutinizer ignore-call */ 
529
            $pid = $this->QueuedTasks->key();
Loading history...
529
        }
530
531
        return $pid;
532
    }
533
534
    /**
535
     *
536
     * @return string Memory usage in MB.
537
     */
538
    protected function _memoryUsage()
539
    {
540
        $limit = ini_get('memory_limit');
541
542
        $used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
543
        if ($limit !== '-1') {
544
            $used .= '/' . $limit;
545
        }
546
547
        return $used;
548
    }
549
550
    /**
551
     *
552
     * @param string|null $pid PID of the process
553
     *
554
     * @return void
555
     */
556
    protected function _deletePid($pid)
557
    {
558
        if (!$pid) {
559
            $pid = $this->_pid;
560
        }
561
        if (!$pid) {
562
            return;
563
        }
564
    }
565
566
    /**
567
     *
568
     * @return string
569
     */
570
    protected function _timeNeeded()
571
    {
572
        $diff = $this->_time() - $this->_time($this->_time);
573
        $seconds = max($diff, 1);
574
575
        return $seconds . 's';
576
    }
577
578
    /**
579
     *
580
     * @param int|null $providedTime Provided time
581
     *
582
     * @return int
583
     */
584
    protected function _time($providedTime = null)
585
    {
586
        if ($providedTime) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $providedTime of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
587
            return $providedTime;
588
        }
589
590
        return time();
591
    }
592
593
    /**
594
     *
595
     * @param string|null $param String to convert
596
     * @return array
597
     */
598
    protected function _stringToArray($param)
599
    {
600
        if (!$param) {
601
            return [];
602
        }
603
604
        $array = Text::tokenize($param);
605
606
        return array_filter($array);
607
    }
608
}
609