Completed
Pull Request — master (#35)
by
unknown
03:07
created

QueueShell::_getTaskConf()   A

Complexity

Conditions 6
Paths 2

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 19
nc 2
nop 0
dl 0
loc 27
rs 9.0111
c 0
b 0
f 0
1
<?php
2
namespace Queue\Shell;
3
4
use Cake\Console\Shell;
5
use Cake\Core\Configure;
6
use Cake\I18n\Number;
7
use Cake\Log\Log;
8
use Cake\ORM\Exception\PersistenceFailedException;
9
use Cake\Utility\Inflector;
10
use Cake\Utility\Text;
11
use Exception;
12
use Queue\Model\Entity\QueuedTask;
13
use Queue\Model\QueueException;
14
use Queue\Queue\Config;
15
use Queue\Queue\TaskFinder;
16
use Queue\Shell\Task\AddInterface;
17
use Queue\Shell\Task\QueueTaskInterface;
18
use RuntimeException;
19
use Throwable;
20
declare(ticks = 1);
21
22
/**
23
 * Main shell to init and run queue workers.
24
 *
25
 * @author [email protected]
26
 * @license http://www.opensource.org/licenses/mit-license.php The MIT License
27
 * @link http://github.com/MSeven/cakephp_queue
28
 * @property \Queue\Model\Table\QueuedTasksTable $QueuedTasks
29
 */
30
class QueueShell extends Shell
31
{
32
33
    /**
34
     *
35
     * @var string
36
     */
37
    public $modelClass = 'Queue.QueuedTasks';
38
39
    /**
40
     *
41
     * @var array|null
42
     */
43
    protected $_taskConf;
44
45
    /**
46
     *
47
     * @var int
48
     */
49
    protected $_time = 0;
50
51
    /**
52
     *
53
     * @var bool
54
     */
55
    protected $_exit = false;
56
57
    /**
58
     * Overwrite shell initialize to dynamically load all Queue Related Tasks.
59
     *
60
     * @return void
61
     */
62
    public function initialize()
63
    {
64
        $taskFinder = new TaskFinder();
65
        $this->tasks = $taskFinder->allAppAndPluginTasks();
66
67
        parent::initialize();
68
    }
69
70
    /**
71
     *
72
     * @return void
73
     */
74
    public function startup()
75
    {
76
        if ($this->param('quiet')) {
77
            $this->interactive = false;
78
        }
79
80
        parent::startup();
81
    }
82
83
    /**
84
     *
85
     * @return string
86
     */
87
    public function _getDescription()
88
    {
89
        $tasks = [];
90
        foreach ($this->taskNames as $loadedTask) {
91
            $tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
92
        }
93
        $tasks = implode(PHP_EOL, $tasks);
94
95
        $text = <<<TEXT
96
Simple and minimalistic job queue (or deferred-task) system.
97
98
Available Tasks:
99
$tasks
100
TEXT;
101
        return $text;
102
    }
103
104
    /**
105
     * Look for a Queue Task of hte passed name and try to call add() on it.
106
     * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
107
     *
108
     * @return void
109
     */
110
    public function add()
111
    {
112
        if (count($this->args) < 1) {
113
            $this->out('Please call like this:');
114
            $this->out('       bin/cake queue add <taskname>');
115
            $this->_displayAvailableTasks();
116
117
            return;
118
        }
119
120
        $name = Inflector::camelize($this->args[0]);
121
        if (in_array('Queue' . $name, $this->taskNames, true)) {
122
            /** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
123
            $task = $this->{'Queue' . $name};
124
            if (!($task instanceof AddInterface)) {
125
                $this->abort('This task does not support adding via CLI call');
126
            }
127
            $task->add();
128
        } else {
129
            $this->out('Error: Task not found: ' . $name);
130
            $this->_displayAvailableTasks();
131
        }
132
    }
133
134
    /**
135
     * Output the task without Queue or Task
136
     * example: QueueImageTask becomes Image on display
137
     *
138
     * @param string $task Task name
139
     * @return string Cleaned task name
140
     */
141
    protected function _taskName($task)
142
    {
143
        if (strpos($task, 'Queue') === 0) {
144
            return substr($task, 5);
145
        }
146
        return $task;
147
    }
148
149
    /**
150
     * Run a QueueWorker loop.
151
     * Runs a Queue Worker process which will try to find unassigned jobs in the queue
152
     * which it may run and try to fetch and execute them.
153
     *
154
     * @return int|null
155
     */
156
    public function runworker()
157
    {
158
        try {
159
            $pid = $this->_initPid();
160
        } catch (PersistenceFailedException $exception) {
161
            $this->err($exception->getMessage());
162
            $limit = (int)Configure::read('Queue.maxWorkers');
163
            if ($limit) {
164
                $this->out('Cannot start worker: Too many workers already/still running on this server (' . $limit . '/' . $limit . ')');
165
            }
166
            return static::CODE_ERROR;
167
        }
168
169
        // Enable Garbage Collector (PHP >= 5.3)
170
        if (function_exists('gc_enable')) {
171
            gc_enable();
172
        }
173
        if (function_exists('pcntl_signal')) {
174
            pcntl_signal(SIGTERM, [
175
                &$this,
176
                '_exit'
177
            ]);
178
            pcntl_signal(SIGINT, [
179
                &$this,
180
                '_exit'
181
            ]);
182
            pcntl_signal(SIGTSTP, [
183
                &$this,
184
                '_exit'
185
            ]);
186
            pcntl_signal(SIGQUIT, [
187
                &$this,
188
                '_exit'
189
            ]);
190
        }
191
        $this->_exit = false;
192
193
        $startTime = time();
194
        $types = $this->_stringToArray($this->param('type'));
195
196
        while (!$this->_exit) {
197
            $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
198
199
            $QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
200
201
            if ($QueuedTask) {
202
                $this->runJob($QueuedTask, $pid);
203
            } elseif (Configure::read('Queue.exitWhenNothingToDo')) {
204
                $this->out('nothing to do, exiting.');
205
                $this->_exit = true;
206
            } else {
207
                $this->out('nothing to do, sleeping.');
208
                sleep(Config::sleepTime());
209
            }
210
211
            // check if we are over the maximum runtime and end processing if so.
212
            if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
213
                $this->_exit = true;
214
                $this->out('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
0 ignored issues
show
Bug introduced by
'Reached runtime of ' . ...e') . '), terminating.' of type string is incompatible with the type integer expected by parameter $newlines of Cake\Console\Shell::out(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

214
                $this->out('queue', /** @scrutinizer ignore-type */ 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
Loading history...
215
            }
216
            if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
217
                $this->out(__d('queue', 'Performing old job cleanup.'));
218
                $this->QueuedTasks->cleanOldJobs();
219
            }
220
            $this->hr();
221
        }
222
223
        $this->_deletePid($pid);
224
225
        if ($this->param('verbose')) {
226
            $this->_log('endworker', $pid);
227
        }
228
    }
229
230
    /**
231
     *
232
     * @param \Queue\Model\Entity\QueuedTask $QueuedTask
233
     * @param string $pid
234
     * @return void
235
     */
236
    protected function runJob(QueuedTask $QueuedTask, $pid)
237
    {
238
        $this->out('Running Job of type "' . $QueuedTask->task . '"');
239
        $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, $pid, false);
240
        $taskName = 'Queue' . $QueuedTask->task;
241
242
        try {
243
            $this->_time = time();
244
245
            $data = unserialize($QueuedTask->data);
246
            /** @var \Queue\Shell\Task\QueueTask $task */
247
            $task = $this->{$taskName};
248
            if (!$task instanceof QueueTaskInterface) {
0 ignored issues
show
introduced by
$task is always a sub-type of Queue\Shell\Task\QueueTaskInterface.
Loading history...
249
                throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
250
            }
251
252
            $return = $task->run((array)$data, $QueuedTask->id);
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $return is correct as $task->run((array)$data, $QueuedTask->id) targeting Queue\Shell\Task\QueueTaskInterface::run() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
253
            if ($return !== null) {
0 ignored issues
show
introduced by
The condition $return !== null is always true.
Loading history...
254
                trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
255
            }
256
            $failureMessage = $taskName . ' failed';
257
        } catch (Throwable $e) {
258
            $return = false;
259
260
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
261
            if (!($e instanceof QueueException)) {
262
                $failureMessage .= "\n" . $e->getTraceAsString();
263
            }
264
265
            $this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage, $pid);
266
        } catch (Exception $e) {
267
            $return = false;
268
269
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
270
            $this->_logError($taskName . "\n" . $failureMessage, $pid);
271
        }
272
273
        if ($return === false) {
0 ignored issues
show
introduced by
The condition $return === false is always true.
Loading history...
274
            $this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
275
            $failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
276
            $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus, $pid);
277
            $this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
0 ignored issues
show
Bug Best Practice introduced by
The property failed does not exist on Queue\Model\Entity\QueuedTask. Since you implemented __get, consider adding a @property annotation.
Loading history...
278
            return;
279
        }
280
281
        $this->QueuedTasks->markJobDone($QueuedTask);
282
        $this->out('Job Finished.');
283
    }
284
285
    /**
286
     * Manually trigger a Finished job cleanup.
287
     *
288
     * @return void
289
     */
290
    public function clean()
291
    {
292
        if (!Configure::read('Queue.cleanupTimeout')) {
293
            $this->abort('You disabled cleanuptimout in config. Aborting.');
294
        }
295
296
        $this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
297
        $this->QueuedTasks->cleanOldJobs();
298
    }
299
300
    /**
301
     * Display current settings
302
     *
303
     * @return void
304
     */
305
    public function settings()
306
    {
307
        $this->out('Current Settings:');
308
        $conf = (array)Configure::read('Queue');
309
        foreach ($conf as $key => $val) {
310
            if ($val === false) {
311
                $val = 'no';
312
            }
313
            if ($val === true) {
314
                $val = 'yes';
315
            }
316
            $this->out('* ' . $key . ': ' . print_r($val, true));
317
        }
318
319
        $this->out();
320
    }
321
322
    /**
323
     * Display some statistics about Finished Jobs.
324
     *
325
     * @return void
326
     */
327
    public function stats()
328
    {
329
        $this->out('Jobs currently in the queue:');
330
331
        $types = $this->QueuedTasks->getTypes()->toArray();
332
        foreach ($types as $type) {
333
            $this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
334
        }
335
        $this->hr();
336
        $this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
337
        $this->hr();
338
        $this->out('Finished job statistics:');
339
        $data = $this->QueuedTasks->getStats();
340
        foreach ($data as $item) {
341
            $this->out(' ' . $item['task'] . ': ');
342
            $this->out('   Finished Jobs in Database: ' . $item['num']);
343
            $this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
344
            $this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
345
            $this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
346
        }
347
    }
348
349
    /**
350
     * Get option parser method to parse commandline options
351
     *
352
     * @return \Cake\Console\ConsoleOptionParser
353
     */
354
    public function getOptionParser()
355
    {
356
        $subcommandParser = [
357
            'options' => [ /*
358
                             * 'dry-run'=> array(
359
                             * 'short' => 'd',
360
                             * 'help' => 'Dry run the update, no jobs will actually be added.',
361
                             * 'boolean' => true
362
                             * ),
363
                             */
364
            ]
365
        ];
366
        $subcommandParserFull = $subcommandParser;
367
        $subcommandParserFull['options']['type'] = [
368
            'short' => 't',
369
            'help' => 'Type (comma separated list possible)',
370
            'default' => null
371
        ];
372
373
        return parent::getOptionParser()->setDescription($this->_getDescription())
374
            ->addSubcommand('clean', [
375
            'help' => 'Remove old jobs (cleanup)',
376
            'parser' => $subcommandParser
377
        ])
378
            ->addSubcommand('add', [
379
            'help' => 'Add Job',
380
            'parser' => $subcommandParser
381
        ])
382
            ->addSubcommand('stats', [
383
            'help' => 'Stats',
384
            'parser' => $subcommandParserFull
385
        ])
386
            ->addSubcommand('settings', [
387
            'help' => 'Settings',
388
            'parser' => $subcommandParserFull
389
        ])
390
            ->addSubcommand('runworker', [
391
            'help' => 'Run Worker',
392
            'parser' => $subcommandParserFull
393
        ]);
394
    }
395
396
    /**
397
     * Timestamped log.
398
     *
399
     * @param string $message Log type
400
     * @param string|null $pid PID of the process
401
     * @param bool $addDetails
402
     * @return void
403
     */
404
    protected function _log($message, $pid = null, $addDetails = true)
405
    {
406
        if (!Configure::read('Queue.log')) {
407
            return;
408
        }
409
410
        if ($addDetails) {
411
            $timeNeeded = $this->_timeNeeded();
412
            $memoryUsage = $this->_memoryUsage();
413
            $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
414
        }
415
416
        if ($pid) {
417
            $message .= ' (pid ' . $pid . ')';
418
        }
419
        Log::write('info', $message, [
420
            'scope' => 'queue'
421
        ]);
422
    }
423
424
    /**
425
     *
426
     * @param string $message
427
     * @param string|null $pid PID of the process
428
     * @return void
429
     */
430
    protected function _logError($message, $pid = null)
431
    {
432
        $timeNeeded = $this->_timeNeeded();
433
        $memoryUsage = $this->_memoryUsage();
434
        $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
435
436
        if ($pid) {
437
            $message .= ' (pid ' . $pid . ')';
438
        }
439
440
        Log::write('error', $message);
441
    }
442
443
    /**
444
     * Returns a List of available QueueTasks and their individual configurations.
445
     *
446
     * @return array
447
     */
448
    protected function _getTaskConf()
449
    {
450
        if (!is_array($this->_taskConf)) {
451
            $this->_taskConf = [];
452
            foreach ($this->tasks as $task) {
453
                list ($pluginName, $taskName) = pluginSplit($task);
454
455
                $this->_taskConf[$taskName]['name'] = substr($taskName, 5);
456
                $this->_taskConf[$taskName]['plugin'] = $pluginName;
457
                if (property_exists($this->{$taskName}, 'timeout')) {
458
                    $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
459
                } else {
460
                    $this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
461
                }
462
                if (property_exists($this->{$taskName}, 'retries')) {
463
                    $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
464
                } else {
465
                    $this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
466
                }
467
                if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
468
                    $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
469
                } else {
470
                    $this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
471
                }
472
            }
473
        }
474
        return $this->_taskConf;
475
    }
476
477
    /**
478
     * Signal handling to queue worker for clean shutdown
479
     *
480
     * @param int $signal
481
     * @return void
482
     */
483
    protected function _exit($signal)
484
    {
485
        $this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
486
        $this->_exit = true;
487
    }
488
489
    /**
490
     *
491
     * @return void
492
     */
493
    protected function _displayAvailableTasks()
494
    {
495
        $this->out('Available Tasks:');
496
        foreach ($this->taskNames as $loadedTask) {
497
            $this->out("\t" . '* ' . $this->_taskName($loadedTask));
498
        }
499
    }
500
501
    /**
502
     *
503
     * @return string
504
     */
505
    protected function _initPid()
506
    {
507
        $this->_pid = $this->_retrievePid();
0 ignored issues
show
Bug Best Practice introduced by
The property _pid does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
508
        return $this->_pid;
509
    }
510
511
    /**
512
     *
513
     * @return string
514
     */
515
    protected function _retrievePid()
516
    {
517
        if (function_exists('posix_getpid')) {
518
            $pid = (string)posix_getpid();
519
        } else {
520
            $pid = $this->QueuedTasks->key();
521
        }
522
        return $pid;
523
    }
524
525
    /**
526
     *
527
     * @return string Memory usage in MB.
528
     */
529
    protected function _memoryUsage()
530
    {
531
        $limit = ini_get('memory_limit');
532
533
        $used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
534
        if ($limit !== '-1') {
535
            $used .= '/' . $limit;
536
        }
537
538
        return $used;
539
    }
540
541
    /**
542
     *
543
     * @param string|null $pid
544
     *
545
     * @return void
546
     */
547
    protected function _deletePid($pid)
548
    {
549
        if (!$pid) {
550
            $pid = $this->_pid;
551
        }
552
        if (!$pid) {
553
            return;
554
        }
555
    }
556
557
    /**
558
     *
559
     * @return string
560
     */
561
    protected function _timeNeeded()
562
    {
563
        $diff = $this->_time() - $this->_time($this->_time);
564
        $seconds = max($diff, 1);
565
566
        return $seconds . 's';
567
    }
568
569
    /**
570
     *
571
     * @param int|null $providedTime
572
     *
573
     * @return int
574
     */
575
    protected function _time($providedTime = null)
576
    {
577
        if ($providedTime) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $providedTime of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
578
            return $providedTime;
579
        }
580
581
        return time();
582
    }
583
584
    /**
585
     *
586
     * @param string|null $param
587
     * @return array
588
     */
589
    protected function _stringToArray($param)
590
    {
591
        if (!$param) {
592
            return [];
593
        }
594
595
        $array = Text::tokenize($param);
596
597
        return array_filter($array);
598
    }
599
}
600