Completed
Pull Request — master (#35)
by
unknown
03:57
created

QueueShell::startup()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 0
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
<?php
2
namespace Queue\Shell;
3
4
use Cake\Console\Shell;
5
use Cake\Core\Configure;
6
use Cake\I18n\Number;
7
use Cake\Log\Log;
8
use Cake\ORM\Exception\PersistenceFailedException;
9
use Cake\Utility\Inflector;
10
use Cake\Utility\Text;
11
use Exception;
12
use Queue\Model\Entity\QueuedTask;
13
use Queue\Model\QueueException;
14
use Queue\Queue\Config;
15
use Queue\Queue\TaskFinder;
16
use Queue\Shell\Task\AddInterface;
17
use Queue\Shell\Task\QueueTaskInterface;
18
use RuntimeException;
19
use Throwable;
20
declare(ticks = 1);
21
22
/**
23
 * Main shell to init and run queue workers.
24
 *
25
 * @author [email protected]
26
 * @license http://www.opensource.org/licenses/mit-license.php The MIT License
27
 * @link http://github.com/MSeven/cakephp_queue
28
 * @property \Queue\Model\Table\QueuedTasksTable $QueuedTasks
29
 */
30
class QueueShell extends Shell
31
{
32
33
    /**
34
     *
35
     * @var string
36
     */
37
    public $modelClass = 'Queue.QueuedTasks';
38
39
    /**
40
     *
41
     * @var array|null
42
     */
43
    protected $_taskConf;
44
45
    /**
46
     *
47
     * @var int
48
     */
49
    protected $_time = 0;
50
51
    /**
52
     *
53
     * @var bool
54
     */
55
    protected $_exit = false;
56
57
    /**
58
     * Overwrite shell initialize to dynamically load all Queue Related Tasks.
59
     *
60
     * @return void
61
     */
62
    public function initialize()
63
    {
64
        $taskFinder = new TaskFinder();
65
        $this->tasks = $taskFinder->allAppAndPluginTasks();
66
67
        parent::initialize();
68
    }
69
70
    /**
71
     *
72
     * @return void
73
     */
74
    public function startup()
75
    {
76
        if ($this->param('quiet')) {
77
            $this->interactive = false;
78
        }
79
80
        parent::startup();
81
    }
82
83
    /**
84
     *
85
     * @return string
86
     */
87
    public function _getDescription()
88
    {
89
        $tasks = [];
90
        foreach ($this->taskNames as $loadedTask) {
91
            $tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
92
        }
93
        $tasks = implode(PHP_EOL, $tasks);
94
95
        $text = <<<TEXT
96
Simple and minimalistic job queue (or deferred-task) system.
97
98
Available Tasks:
99
$tasks
100
TEXT;
101
        return $text;
102
    }
103
104
    /**
105
     * Look for a Queue Task of hte passed name and try to call add() on it.
106
     * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
107
     *
108
     * @return void
109
     */
110
    public function add()
111
    {
112
        if (count($this->args) < 1) {
113
            $this->out('Please call like this:');
114
            $this->out('       bin/cake queue add <taskname>');
115
            $this->_displayAvailableTasks();
116
117
            return;
118
        }
119
120
        $name = Inflector::camelize($this->args[0]);
121
        if (in_array('Queue' . $name, $this->taskNames, true)) {
122
            /** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
123
            $task = $this->{'Queue' . $name};
124
            if (! ($task instanceof AddInterface)) {
125
                $this->abort('This task does not support adding via CLI call');
126
            }
127
            $task->add();
128
        } else {
129
            $this->out('Error: Task not found: ' . $name);
130
            $this->_displayAvailableTasks();
131
        }
132
    }
133
134
    /**
135
     * Output the task without Queue or Task
136
     * example: QueueImageTask becomes Image on display
137
     *
138
     * @param string $task
139
     *            Task name
140
     * @return string Cleaned task name
141
     */
142
    protected function _taskName($task)
143
    {
144
        if (strpos($task, 'Queue') === 0) {
145
            return substr($task, 5);
146
        }
147
        return $task;
148
    }
149
150
    /**
151
     * Run a QueueWorker loop.
152
     * Runs a Queue Worker process which will try to find unassigned jobs in the queue
153
     * which it may run and try to fetch and execute them.
154
     *
155
     * @return int|null
156
     */
157
    public function runworker()
158
    {
159
        try {
160
            $pid = $this->_initPid();
161
        } catch (PersistenceFailedException $exception) {
162
            $this->err($exception->getMessage());
163
            $limit = (int) Configure::read('Queue.maxWorkers');
164
            if ($limit) {
165
                $this->out('Cannot start worker: Too many workers already/still running on this server (' . $limit . '/' . $limit . ')');
166
            }
167
            return static::CODE_ERROR;
168
        }
169
170
        // Enable Garbage Collector (PHP >= 5.3)
171
        if (function_exists('gc_enable')) {
172
            gc_enable();
173
        }
174
        if (function_exists('pcntl_signal')) {
175
            pcntl_signal(SIGTERM, [
176
                &$this,
177
                '_exit'
178
            ]);
179
            pcntl_signal(SIGINT, [
180
                &$this,
181
                '_exit'
182
            ]);
183
            pcntl_signal(SIGTSTP, [
184
                &$this,
185
                '_exit'
186
            ]);
187
            pcntl_signal(SIGQUIT, [
188
                &$this,
189
                '_exit'
190
            ]);
191
        }
192
        $this->_exit = false;
193
194
        $startTime = time();
195
        $types = $this->_stringToArray($this->param('type'));
196
197
        while (! $this->_exit) {
198
            $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
199
200
            $QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
201
202
            if ($QueuedTask) {
203
                $this->runJob($QueuedTask, $pid);
204
            } elseif (Configure::read('Queue.exitWhenNothingToDo')) {
205
                $this->out('nothing to do, exiting.');
206
                $this->_exit = true;
207
            } else {
208
                $this->out('nothing to do, sleeping.');
209
                sleep(Config::sleepTime());
210
            }
211
212
            // check if we are over the maximum runtime and end processing if so.
213
            if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
214
                $this->_exit = true;
215
                $this->out('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
0 ignored issues
show
Bug introduced by
'Reached runtime of ' . ...e') . '), terminating.' of type string is incompatible with the type integer expected by parameter $newlines of Cake\Console\Shell::out(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

215
                $this->out('queue', /** @scrutinizer ignore-type */ 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
Loading history...
216
            }
217
            if ($this->_exit || mt_rand(0, 100) > (100 - (int) Config::gcprob())) {
218
                $this->out(__d('queue', 'Performing old job cleanup.'));
219
                $this->QueuedTasks->cleanOldJobs();
220
            }
221
            $this->hr();
222
        }
223
224
        $this->_deletePid($pid);
225
226
        if ($this->param('verbose')) {
227
            $this->_log('endworker', $pid);
228
        }
229
    }
230
231
    /**
232
     *
233
     * @param \Queue\Model\Entity\QueuedTask $QueuedTask
234
     * @param string $pid
235
     * @return void
236
     */
237
    protected function runJob(QueuedTask $QueuedTask, $pid)
238
    {
239
        $this->out('Running Job of type "' . $QueuedTask->task . '"');
240
        $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, $pid, false);
241
        $taskName = 'Queue' . $QueuedTask->task;
242
243
        try {
244
            $this->_time = time();
245
246
            $data = unserialize($QueuedTask->data);
247
            /** @var \Queue\Shell\Task\QueueTask $task */
248
            $task = $this->{$taskName};
249
            if (! $task instanceof QueueTaskInterface) {
0 ignored issues
show
introduced by
$task is always a sub-type of Queue\Shell\Task\QueueTaskInterface.
Loading history...
250
                throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
251
            }
252
253
            $return = $task->run((array) $data, $QueuedTask->id);
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $return is correct as $task->run((array)$data, $QueuedTask->id) targeting Queue\Shell\Task\QueueTaskInterface::run() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
254
            if ($return !== null) {
0 ignored issues
show
introduced by
The condition $return !== null is always true.
Loading history...
255
                trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
256
            }
257
            $failureMessage = $taskName . ' failed';
258
        } catch (Throwable $e) {
259
            $return = false;
260
261
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
262
            if (! ($e instanceof QueueException)) {
263
                $failureMessage .= "\n" . $e->getTraceAsString();
264
            }
265
266
            $this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage, $pid);
267
        } catch (Exception $e) {
268
            $return = false;
269
270
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
271
            $this->_logError($taskName . "\n" . $failureMessage, $pid);
272
        }
273
274
        if ($return === false) {
0 ignored issues
show
introduced by
The condition $return === false is always true.
Loading history...
275
            $this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
276
            $failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
277
            $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus, $pid);
278
            $this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
0 ignored issues
show
Bug Best Practice introduced by
The property failed does not exist on Queue\Model\Entity\QueuedTask. Since you implemented __get, consider adding a @property annotation.
Loading history...
279
            return;
280
        }
281
282
        $this->QueuedTasks->markJobDone($QueuedTask);
283
        $this->out('Job Finished.');
284
    }
285
286
    /**
287
     * Manually trigger a Finished job cleanup.
288
     *
289
     * @return void
290
     */
291
    public function clean()
292
    {
293
        if (! Configure::read('Queue.cleanupTimeout')) {
294
            $this->abort('You disabled cleanuptimout in config. Aborting.');
295
        }
296
297
        $this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int) Configure::read('Queue.cleanupTimeout')));
298
        $this->QueuedTasks->cleanOldJobs();
299
    }
300
301
    /**
302
     * Display current settings
303
     *
304
     * @return void
305
     */
306
    public function settings()
307
    {
308
        $this->out('Current Settings:');
309
        $conf = (array) Configure::read('Queue');
310
        foreach ($conf as $key => $val) {
311
            if ($val === false) {
312
                $val = 'no';
313
            }
314
            if ($val === true) {
315
                $val = 'yes';
316
            }
317
            $this->out('* ' . $key . ': ' . print_r($val, true));
318
        }
319
320
        $this->out();
321
    }
322
323
    /**
324
     * Display some statistics about Finished Jobs.
325
     *
326
     * @return void
327
     */
328
    public function stats()
329
    {
330
        $this->out('Jobs currently in the queue:');
331
332
        $types = $this->QueuedTasks->getTypes()->toArray();
333
        foreach ($types as $type) {
334
            $this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
335
        }
336
        $this->hr();
337
        $this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
338
        $this->hr();
339
        $this->out('Finished job statistics:');
340
        $data = $this->QueuedTasks->getStats();
341
        foreach ($data as $item) {
342
            $this->out(' ' . $item['task'] . ': ');
343
            $this->out('   Finished Jobs in Database: ' . $item['num']);
344
            $this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
345
            $this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
346
            $this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
347
        }
348
    }
349
350
    /**
351
     * Get option parser method to parse commandline options
352
     *
353
     * @return \Cake\Console\ConsoleOptionParser
354
     */
355
    public function getOptionParser()
356
    {
357
        $subcommandParser = [
358
            'options' => [ /*
359
                             * 'dry-run'=> array(
360
                             * 'short' => 'd',
361
                             * 'help' => 'Dry run the update, no jobs will actually be added.',
362
                             * 'boolean' => true
363
                             * ),
364
                             */
365
            ]
366
        ];
367
        $subcommandParserFull = $subcommandParser;
368
        $subcommandParserFull['options']['type'] = [
369
            'short' => 't',
370
            'help' => 'Type (comma separated list possible)',
371
            'default' => null
372
        ];
373
374
        return parent::getOptionParser()->setDescription($this->_getDescription())
375
            ->addSubcommand('clean', [
376
            'help' => 'Remove old jobs (cleanup)',
377
            'parser' => $subcommandParser
378
        ])
379
            ->addSubcommand('add', [
380
            'help' => 'Add Job',
381
            'parser' => $subcommandParser
382
        ])
383
            ->addSubcommand('stats', [
384
            'help' => 'Stats',
385
            'parser' => $subcommandParserFull
386
        ])
387
            ->addSubcommand('settings', [
388
            'help' => 'Settings',
389
            'parser' => $subcommandParserFull
390
        ])
391
            ->addSubcommand('runworker', [
392
            'help' => 'Run Worker',
393
            'parser' => $subcommandParserFull
394
        ]);
395
    }
396
397
    /**
398
     * Timestamped log.
399
     *
400
     * @param string $message
401
     *            Log type
402
     * @param string|null $pid
403
     *            PID of the process
404
     * @param bool $addDetails
405
     * @return void
406
     */
407
    protected function _log($message, $pid = null, $addDetails = true)
408
    {
409
        if (! Configure::read('Queue.log')) {
410
            return;
411
        }
412
413
        if ($addDetails) {
414
            $timeNeeded = $this->_timeNeeded();
415
            $memoryUsage = $this->_memoryUsage();
416
            $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
417
        }
418
419
        if ($pid) {
420
            $message .= ' (pid ' . $pid . ')';
421
        }
422
        Log::write('info', $message, [
423
            'scope' => 'queue'
424
        ]);
425
    }
426
427
    /**
428
     *
429
     * @param string $message
430
     * @param string|null $pid
431
     *            PID of the process
432
     * @return void
433
     */
434
    protected function _logError($message, $pid = null)
435
    {
436
        $timeNeeded = $this->_timeNeeded();
437
        $memoryUsage = $this->_memoryUsage();
438
        $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
439
440
        if ($pid) {
441
            $message .= ' (pid ' . $pid . ')';
442
        }
443
444
        Log::write('error', $message);
445
    }
446
447
    /**
448
     * Returns a List of available QueueTasks and their individual configurations.
449
     *
450
     * @return array
451
     */
452
    protected function _getTaskConf()
453
    {
454
        if (! is_array($this->_taskConf)) {
455
            $this->_taskConf = [];
456
            foreach ($this->tasks as $task) {
457
                list ($pluginName, $taskName) = pluginSplit($task);
458
459
                $this->_taskConf[$taskName]['name'] = substr($taskName, 5);
460
                $this->_taskConf[$taskName]['plugin'] = $pluginName;
461
                if (property_exists($this->{$taskName}, 'timeout')) {
462
                    $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
463
                } else {
464
                    $this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
465
                }
466
                if (property_exists($this->{$taskName}, 'retries')) {
467
                    $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
468
                } else {
469
                    $this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
470
                }
471
                if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
472
                    $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
473
                } else {
474
                    $this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
475
                }
476
            }
477
        }
478
        return $this->_taskConf;
479
    }
480
481
    /**
482
     * Signal handling to queue worker for clean shutdown
483
     *
484
     * @param int $signal
485
     * @return void
486
     */
487
    protected function _exit($signal)
488
    {
489
        $this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
490
        $this->_exit = true;
491
    }
492
493
    /**
494
     *
495
     * @return void
496
     */
497
    protected function _displayAvailableTasks()
498
    {
499
        $this->out('Available Tasks:');
500
        foreach ($this->taskNames as $loadedTask) {
501
            $this->out("\t" . '* ' . $this->_taskName($loadedTask));
502
        }
503
    }
504
505
    /**
506
     *
507
     * @return string
508
     */
509
    protected function _initPid()
510
    {
511
        $this->_pid = $this->_retrievePid();
0 ignored issues
show
Bug Best Practice introduced by
The property _pid does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
512
        return $this->_pid;
513
    }
514
515
    /**
516
     *
517
     * @return string
518
     */
519
    protected function _retrievePid()
520
    {
521
        if (function_exists('posix_getpid')) {
522
            $pid = (string) posix_getpid();
523
        } else {
524
            $pid = $this->QueuedTasks->key();
525
        }
526
        return $pid;
527
    }
528
529
    /**
530
     *
531
     * @return string Memory usage in MB.
532
     */
533
    protected function _memoryUsage()
534
    {
535
        $limit = ini_get('memory_limit');
536
537
        $used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
538
        if ($limit !== '-1') {
539
            $used .= '/' . $limit;
540
        }
541
542
        return $used;
543
    }
544
545
    /**
546
     *
547
     * @param string|null $pid
548
     *
549
     * @return void
550
     */
551
    protected function _deletePid($pid)
552
    {
553
        if (! $pid) {
554
            $pid = $this->_pid;
555
        }
556
        if (! $pid) {
557
            return;
558
        }
559
    }
560
561
    /**
562
     *
563
     * @return string
564
     */
565
    protected function _timeNeeded()
566
    {
567
        $diff = $this->_time() - $this->_time($this->_time);
568
        $seconds = max($diff, 1);
569
570
        return $seconds . 's';
571
    }
572
573
    /**
574
     *
575
     * @param int|null $providedTime
576
     *
577
     * @return int
578
     */
579
    protected function _time($providedTime = null)
580
    {
581
        if ($providedTime) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $providedTime of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
582
            return $providedTime;
583
        }
584
585
        return time();
586
    }
587
588
    /**
589
     *
590
     * @param string|null $param
591
     * @return array
592
     */
593
    protected function _stringToArray($param)
594
    {
595
        if (! $param) {
596
            return [];
597
        }
598
599
        $array = Text::tokenize($param);
600
601
        return array_filter($array);
602
    }
603
}
604