Completed
Pull Request — master (#35)
by
unknown
03:32
created

QueueShell::runJob()   B

Complexity

Conditions 7
Paths 34

Size

Total Lines 48
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 31
nc 34
nop 1
dl 0
loc 48
rs 8.4906
c 0
b 0
f 0
1
<?php
2
namespace Queue\Shell;
3
4
use Cake\Console\Shell;
5
use Cake\Core\Configure;
6
use Cake\I18n\Number;
7
use Cake\Log\Log;
8
use Cake\ORM\Exception\PersistenceFailedException;
9
use Cake\Utility\Inflector;
10
use Cake\Utility\Text;
11
use Exception;
12
use Queue\Model\Entity\QueuedTask;
13
use Queue\Model\QueueException;
14
use Queue\Queue\Config;
15
use Queue\Queue\TaskFinder;
16
use Queue\Shell\Task\AddInterface;
17
use Queue\Shell\Task\QueueTaskInterface;
18
use RuntimeException;
19
use Throwable;
20
21
declare(ticks = 1);
22
23
/**
24
 * Main shell to init and run queue workers.
25
 *
26
 * @author [email protected]
27
 * @license http://www.opensource.org/licenses/mit-license.php The MIT License
28
 * @link http://github.com/MSeven/cakephp_queue
29
 * @property \Queue\Model\Table\QueuedTasksTable $QueuedTasks
30
 */
31
class QueueShell extends Shell
32
{
33
34
    /**
35
     *
36
     * @var string
37
     */
38
    public $modelClass = 'Queue.QueuedTasks';
39
40
    /**
41
     *
42
     * @var array|null
43
     */
44
    protected $_taskConf;
45
46
    /**
47
     *
48
     * @var int
49
     */
50
    protected $_time = 0;
51
52
    /**
53
     *
54
     * @var bool
55
     */
56
    protected $_exit = false;
57
58
    /**
59
     * Overwrite shell initialize to dynamically load all Queue Related Tasks.
60
     *
61
     * @return void
62
     */
63
    public function initialize()
64
    {
65
        $taskFinder = new TaskFinder();
66
        $this->tasks = $taskFinder->allAppAndPluginTasks();
67
68
        parent::initialize();
69
    }
70
71
    /**
72
     *
73
     * @return void
74
     */
75
    public function startup()
76
    {
77
        if ($this->param('quiet')) {
78
            $this->interactive = false;
79
        }
80
81
        parent::startup();
82
    }
83
84
    /**
85
     *
86
     * @return string
87
     */
88
    public function getDescription()
89
    {
90
        $tasks = [];
91
        foreach ($this->taskNames as $loadedTask) {
92
            $tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
93
        }
94
        $tasks = implode(PHP_EOL, $tasks);
95
96
        $text = <<<TEXT
97
Simple and minimalistic job queue (or deferred-task) system.
98
99
Available Tasks:
100
$tasks
101
TEXT;
102
103
        return $text;
104
    }
105
106
    /**
107
     * Look for a Queue Task of hte passed name and try to call add() on it.
108
     * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
109
     *
110
     * @return void
111
     */
112
    public function add()
113
    {
114
        if (count($this->args) < 1) {
115
            $this->out('Please call like this:');
116
            $this->out('       bin/cake queue add <taskname>');
117
            $this->_displayAvailableTasks();
118
119
            return;
120
        }
121
122
        $name = Inflector::camelize($this->args[0]);
123
        if (in_array('Queue' . $name, $this->taskNames, true)) {
124
            /** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
125
            $task = $this->{'Queue' . $name};
126
            if (!($task instanceof AddInterface)) {
127
                $this->abort('This task does not support adding via CLI call');
128
            }
129
            $task->add();
130
        } else {
131
            $this->out('Error: Task not found: ' . $name);
132
            $this->_displayAvailableTasks();
133
        }
134
    }
135
136
    /**
137
     * Output the task without Queue or Task
138
     * example: QueueImageTask becomes Image on display
139
     *
140
     * @param string $task Task name
141
     * @return string Cleaned task name
142
     */
143
    protected function _taskName($task)
144
    {
145
        if (strpos($task, 'Queue') === 0) {
146
            return substr($task, 5);
147
        }
148
149
        return $task;
150
    }
151
152
    /**
153
     * Run a QueueWorker loop.
154
     * Runs a Queue Worker process which will try to find unassigned jobs in the queue
155
     * which it may run and try to fetch and execute them.
156
     *
157
     * @return int|null
158
     */
159
    public function runworker()
160
    {
161
        // Enable Garbage Collector (PHP >= 5.3)
162
        if (function_exists('gc_enable')) {
163
            gc_enable();
164
        }
165
        if (function_exists('pcntl_signal')) {
166
            pcntl_signal(SIGTERM, [
167
                &$this,
168
                '_exit'
169
            ]);
170
            pcntl_signal(SIGINT, [
171
                &$this,
172
                '_exit'
173
            ]);
174
            pcntl_signal(SIGTSTP, [
175
                &$this,
176
                '_exit'
177
            ]);
178
            pcntl_signal(SIGQUIT, [
179
                &$this,
180
                '_exit'
181
            ]);
182
        }
183
        $this->_exit = false;
184
185
        $startTime = time();
186
        $types = $this->_stringToArray($this->param('type'));
187
188
        while (!$this->_exit) {
189
            $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
190
191
            $QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
192
193
            if ($QueuedTask) {
194
                $this->runJob($QueuedTask);
195
            } elseif (Configure::read('Queue.exitWhenNothingToDo')) {
196
                $this->out(__d('queue', 'nothing to do, exiting.'));
197
                $this->_exit = true;
198
            } else {
199
                $this->out(__d('queue', 'nothing to do, sleeping.'));
200
                sleep(Config::sleepTime());
201
            }
202
203
            // check if we are over the maximum runtime and end processing if so.
204
            if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
205
                $this->_exit = true;
206
                $this->out(__d('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.'));
207
            }
208
            if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
209
                $this->out(__d('queue', 'Performing old job cleanup.'));
210
                $this->QueuedTasks->cleanOldJobs();
211
            }
212
            $this->hr();
213
        }
214
215
        if ($this->param('verbose')) {
216
            $this->_log('endworker');
217
        }
218
    }
219
220
    /**
221
     *
222
     * @param \Queue\Model\Entity\QueuedTask $QueuedTask Queued task
223
     * @return void
224
     */
225
    protected function runJob(QueuedTask $QueuedTask)
226
    {
227
        $this->out('Running Job of type "' . $QueuedTask->task . '"');
228
        $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, null, false);
229
        $taskName = 'Queue' . $QueuedTask->task;
230
231
        try {
232
            $this->_time = time();
233
234
            $data = unserialize($QueuedTask->data);
235
            /** @var \Queue\Shell\Task\QueueTask $task */
236
            $task = $this->{$taskName};
237
            if (!$task instanceof QueueTaskInterface) {
0 ignored issues
show
introduced by
$task is always a sub-type of Queue\Shell\Task\QueueTaskInterface.
Loading history...
238
                throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
239
            }
240
241
            $return = $task->run((array)$data, $QueuedTask->id);
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $return is correct as $task->run((array)$data, $QueuedTask->id) targeting Queue\Shell\Task\QueueTaskInterface::run() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
242
            if ($return !== null) {
0 ignored issues
show
introduced by
The condition $return !== null is always true.
Loading history...
243
                trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
244
            }
245
            $failureMessage = $taskName . ' failed';
246
        } catch (Throwable $e) {
247
            $return = false;
248
249
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
250
            if (!($e instanceof QueueException)) {
251
                $failureMessage .= "\n" . $e->getTraceAsString();
252
            }
253
254
            $this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage);
255
        } catch (Exception $e) {
256
            $return = false;
257
258
            $failureMessage = get_class($e) . ': ' . $e->getMessage();
259
            $this->_logError($taskName . "\n" . $failureMessage);
260
        }
261
262
        if ($return === false) {
0 ignored issues
show
introduced by
The condition $return === false is always true.
Loading history...
263
            $this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
264
            $failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
265
            $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus);
266
            $this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
0 ignored issues
show
Bug Best Practice introduced by
The property failed does not exist on Queue\Model\Entity\QueuedTask. Since you implemented __get, consider adding a @property annotation.
Loading history...
267
268
            return;
269
        }
270
271
        $this->QueuedTasks->markJobDone($QueuedTask);
272
        $this->out('Job Finished.');
273
    }
274
275
    /**
276
     * Manually trigger a Finished job cleanup.
277
     *
278
     * @return void
279
     */
280
    public function clean()
281
    {
282
        if (!Configure::read('Queue.cleanupTimeout')) {
283
            $this->abort('You disabled cleanuptimout in config. Aborting.');
284
        }
285
286
        $this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
287
        $this->QueuedTasks->cleanOldJobs();
288
    }
289
290
    /**
291
     * Display current settings
292
     *
293
     * @return void
294
     */
295
    public function settings()
296
    {
297
        $this->out('Current Settings:');
298
        $conf = (array)Configure::read('Queue');
299
        foreach ($conf as $key => $val) {
300
            if ($val === false) {
301
                $val = 'no';
302
            }
303
            if ($val === true) {
304
                $val = 'yes';
305
            }
306
            $this->out('* ' . $key . ': ' . print_r($val, true));
307
        }
308
309
        $this->out();
310
    }
311
312
    /**
313
     * Display some statistics about Finished Jobs.
314
     *
315
     * @return void
316
     */
317
    public function stats()
318
    {
319
        $this->out('Jobs currently in the queue:');
320
321
        $types = $this->QueuedTasks->getTypes()->toArray();
322
        foreach ($types as $type) {
323
            $this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
324
        }
325
        $this->hr();
326
        $this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
327
        $this->hr();
328
        $this->out('Finished job statistics:');
329
        $data = $this->QueuedTasks->getStats();
330
        foreach ($data as $item) {
331
            $this->out(' ' . $item['task'] . ': ');
332
            $this->out('   Finished Jobs in Database: ' . $item['num']);
333
            $this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
334
            $this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
335
            $this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
336
        }
337
    }
338
339
    /**
340
     * Get option parser method to parse commandline options
341
     *
342
     * @return \Cake\Console\ConsoleOptionParser
343
     */
344
    public function getOptionParser()
345
    {
346
        $subcommandParser = [
347
            'options' => [
348
                /*
349
                 * 'dry-run'=> array(
350
                 * 'short' => 'd',
351
                 * 'help' => 'Dry run the update, no jobs will actually be added.',
352
                 * 'boolean' => true
353
                 * ),
354
                 */
355
            ]
356
        ];
357
        $subcommandParserFull = $subcommandParser;
358
        $subcommandParserFull['options']['type'] = [
359
            'short' => 't',
360
            'help' => 'Type (comma separated list possible)',
361
            'default' => null
362
        ];
363
364
        return parent::getOptionParser()->setDescription($this->getDescription())
365
            ->addSubcommand('clean', [
366
                'help' => 'Remove old jobs (cleanup)',
367
                'parser' => $subcommandParser
368
            ])
369
            ->addSubcommand('add', [
370
                'help' => 'Add Job',
371
                'parser' => $subcommandParser
372
            ])
373
            ->addSubcommand('stats', [
374
                'help' => 'Stats',
375
                'parser' => $subcommandParserFull
376
            ])
377
            ->addSubcommand('settings', [
378
                'help' => 'Settings',
379
                'parser' => $subcommandParserFull
380
            ])
381
            ->addSubcommand('runworker', [
382
                'help' => 'Run Worker',
383
                'parser' => $subcommandParserFull
384
            ]);
385
    }
386
387
    /**
388
     * Timestamped log.
389
     *
390
     * @param string $message Log type
391
     * @param string|null $pid PID of the process
392
     * @param bool $addDetails Details
393
     * @return void
394
     */
395
    protected function _log($message, $pid = null, $addDetails = true)
396
    {
397
        if (!Configure::read('Queue.log')) {
398
            return;
399
        }
400
401
        if ($addDetails) {
402
            $timeNeeded = $this->_timeNeeded();
403
            $memoryUsage = $this->_memoryUsage();
404
            $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
405
        }
406
407
        if ($pid) {
408
            $message .= ' (pid ' . $pid . ')';
409
        }
410
        Log::write('info', $message, [
411
            'scope' => 'queue'
412
        ]);
413
    }
414
415
    /**
416
     *
417
     * @param string $message Message
418
     * @param string|null $pid PID of the process
419
     * @return void
420
     */
421
    protected function _logError($message, $pid = null)
422
    {
423
        $timeNeeded = $this->_timeNeeded();
424
        $memoryUsage = $this->_memoryUsage();
425
        $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
426
427
        if ($pid) {
428
            $message .= ' (pid ' . $pid . ')';
429
        }
430
431
        Log::write('error', $message);
432
    }
433
434
    /**
435
     * Returns a List of available QueueTasks and their individual configurations.
436
     *
437
     * @return array
438
     */
439
    protected function _getTaskConf()
440
    {
441
        if (!is_array($this->_taskConf)) {
442
            $this->_taskConf = [];
443
            foreach ($this->tasks as $task) {
444
                list ($pluginName, $taskName) = pluginSplit($task);
445
446
                $this->_taskConf[$taskName]['name'] = substr($taskName, 5);
447
                $this->_taskConf[$taskName]['plugin'] = $pluginName;
448
                if (property_exists($this->{$taskName}, 'timeout')) {
449
                    $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
450
                } else {
451
                    $this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
452
                }
453
                if (property_exists($this->{$taskName}, 'retries')) {
454
                    $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
455
                } else {
456
                    $this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
457
                }
458
                if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
459
                    $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
460
                } else {
461
                    $this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
462
                }
463
            }
464
        }
465
466
        return $this->_taskConf;
467
    }
468
469
    /**
470
     * Signal handling to queue worker for clean shutdown
471
     *
472
     * @param int $signal The signal
473
     * @return void
474
     */
475
    protected function _exit($signal)
476
    {
477
        $this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
478
        $this->_exit = true;
479
    }
480
481
    /**
482
     *
483
     * @return void
484
     */
485
    protected function _displayAvailableTasks()
486
    {
487
        $this->out('Available Tasks:');
488
        foreach ($this->taskNames as $loadedTask) {
489
            $this->out("\t" . '* ' . $this->_taskName($loadedTask));
490
        }
491
    }
492
493
    /**
494
     *
495
     * @return string Memory usage in MB.
496
     */
497
    protected function _memoryUsage()
498
    {
499
        $limit = ini_get('memory_limit');
500
501
        $used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
502
        if ($limit !== '-1') {
503
            $used .= '/' . $limit;
504
        }
505
506
        return $used;
507
    }
508
509
    /**
510
     *
511
     * @return string
512
     */
513
    protected function _timeNeeded()
514
    {
515
        $diff = $this->_time() - $this->_time($this->_time);
516
        $seconds = max($diff, 1);
517
518
        return $seconds . 's';
519
    }
520
521
    /**
522
     *
523
     * @param int|null $providedTime Provided time
524
     *
525
     * @return int
526
     */
527
    protected function _time($providedTime = null)
528
    {
529
        if ($providedTime !== null) {
530
            return $providedTime;
531
        }
532
533
        return time();
534
    }
535
536
    /**
537
     *
538
     * @param string|null $param String to convert
539
     * @return array
540
     */
541
    protected function _stringToArray($param)
542
    {
543
        if (!$param) {
544
            return [];
545
        }
546
547
        $array = Text::tokenize($param);
548
549
        return array_filter($array);
550
    }
551
}
552