Completed
Pull Request — master (#35)
by
unknown
03:22
created

QueuedTasksTable::getFullStats()   B

Complexity

Conditions 9
Paths 42

Size

Total Lines 67
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 41
nc 42
nop 1
dl 0
loc 67
rs 7.7084
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
namespace Queue\Model\Table;
3
4
use ArrayObject;
5
use Cake\Core\Configure;
6
use Cake\Event\Event;
7
use Cake\I18n\FrozenTime;
8
use Cake\ORM\Query;
9
use Cake\ORM\Table;
10
use Queue\Model\Entity\QueuedTask;
11
12
/**
13
 * QueuedTasks Model
14
 *
15
 * @method \Queue\Model\Entity\QueuedTask get($primaryKey, $options = [])
16
 * @method \Queue\Model\Entity\QueuedTask newEntity($data = null, array $options = [])
17
 * @method \Queue\Model\Entity\QueuedTask[] newEntities(array $data, array $options = [])
18
 * @method \Queue\Model\Entity\QueuedTask|bool save(\Cake\Datasource\EntityInterface $entity, $options = [])
19
 * @method \Queue\Model\Entity\QueuedTask saveOrFail(\Cake\Datasource\EntityInterface $entity, $options = [])
20
 * @method \Queue\Model\Entity\QueuedTask patchEntity(\Cake\Datasource\EntityInterface $entity, array $data, array $options = [])
21
 * @method \Queue\Model\Entity\QueuedTask[] patchEntities($entities, array $data, array $options = [])
22
 * @method \Queue\Model\Entity\QueuedTask findOrCreate($search, callable $callback = null, $options = [])
23
 *
24
 * @mixin \Cake\ORM\Behavior\TimestampBehavior
25
 */
26
class QueuedTasksTable extends Table
27
{
28
29
    const DRIVER_MYSQL = 'Mysql';
30
31
    const DRIVER_POSTGRES = 'Postgres';
32
33
    const DRIVER_SQLSERVER = 'Sqlserver';
34
35
    const STATS_LIMIT = 100000;
36
37
    /**
38
     * Initialize method
39
     *
40
     * @param array $config The configuration for the Table.
41
     * @return void
42
     */
43
    public function initialize(array $config)
44
    {
45
        parent::initialize($config);
46
47
        $this->setTable('queued_tasks');
48
        $this->setDisplayField('id');
49
        $this->setPrimaryKey('id');
50
51
        $this->addBehavior('Timestamp');
52
    }
53
54
    /**
55
     * set connection name
56
     *
57
     * @return string
58
     */
59
    public static function defaultConnectionName()
60
    {
61
        $connection = Configure::read('Queue.connection');
62
        if (!empty($connection)) {
63
            return $connection;
64
        }
65
66
        return parent::defaultConnectionName();
67
    }
68
69
    /**
70
     *
71
     * @param \Cake\Event\Event $event Model event
72
     * @param \ArrayObject $data The data
73
     * @param \ArrayObject $options The options
74
     * @return void
75
     */
76
    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

76
    public function beforeMarshal(Event $event, ArrayObject $data, /** @scrutinizer ignore-unused */ ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

76
    public function beforeMarshal(/** @scrutinizer ignore-unused */ Event $event, ArrayObject $data, ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
77
    {
78
        if (isset($data['data']) && $data['data'] === '') {
79
            $data['data'] = null;
80
        }
81
    }
82
83
    /**
84
     * Adds a new job to the queue.
85
     *
86
     * @param string $taskName Task name
87
     * @param array|null $data Array of data
88
     * @param string $notBefore A datetime which indicates when the job may be executed
89
     * @return \Queue\Model\Entity\QueuedTask Saved job entity
90
     */
91
    public function createJob($taskName, array $data = null, string $notBefore = null)
92
    {
93
        $task = [
94
            'task' => $taskName,
95
            'data' => serialize($data),
96
            'not_before' => $this->getDateTime()
97
        ];
98
99
        if (!empty($notBefore)) {
100
            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
101
        }
102
103
        $queuedTask = $this->newEntity($task);
104
105
        return $this->saveOrFail($queuedTask);
106
    }
107
108
    /**
109
     * Returns the number of items in the queue.
110
     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
111
     *
112
     * @param string|null $taskName Task type to Count
113
     * @return int
114
     */
115
    public function getLength($taskName = null)
116
    {
117
        $findConf = [
118
            'conditions' => [
119
                'completed IS' => null
120
            ]
121
        ];
122
        if ($taskName !== null) {
123
            $findConf['conditions']['task'] = $taskName;
124
        }
125
126
        return $this->find('all', $findConf)->count();
127
    }
128
129
    /**
130
     * Return a list of all task types in the Queue.
131
     *
132
     * @return \Cake\ORM\Query
133
     */
134
    public function getTypes()
135
    {
136
        $findCond = [
137
            'fields' => [
138
                'task'
139
            ],
140
            'group' => [
141
                'task'
142
            ],
143
            'keyField' => 'task',
144
            'valueField' => 'task'
145
        ];
146
147
        return $this->find('list', $findCond);
148
    }
149
150
    /**
151
     * Return some statistics about finished jobs still in the Database.
152
     * TO-DO: rewrite as virtual field
153
     *
154
     * @return \Cake\ORM\Query
155
     */
156
    public function getStats()
157
    {
158
        $driverName = $this->_getDriverName();
159
        $options = [
160
            'fields' => function (Query $query) use ($driverName) {
161
                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
162
                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
163
                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
164
                switch ($driverName) {
165
                    case static::DRIVER_SQLSERVER:
166
                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
167
                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
168
                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
169
                        break;
170
                }
171
                /**
172
                 *
173
                 * @var \Cake\ORM\Query
174
                 */
175
                return [
176
                    'task',
177
                    'num' => $query->func()->count('*'),
178
                    'alltime' => $alltime,
179
                    'runtime' => $runtime,
180
                    'fetchdelay' => $fetchdelay
181
                ];
182
            },
183
            'conditions' => [
184
                'completed IS NOT' => null
185
            ],
186
            'group' => [
187
                'task'
188
            ]
189
        ];
190
191
        return $this->find('all', $options);
192
    }
193
194
    /**
195
     * Returns [
196
     * 'Task' => [
197
     * 'YYYY-MM-DD' => INT,
198
     * ...
199
     * ]
200
     * ]
201
     *
202
     * @param string|null $taskName The task name
203
     * @return array
204
     */
205
    public function getFullStats($taskName = null)
206
    {
207
        $driverName = $this->_getDriverName();
208
        $fields = function (Query $query) use ($driverName) {
209
            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
210
            switch ($driverName) {
211
                case static::DRIVER_SQLSERVER:
212
                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
213
                    break;
214
            }
215
216
            return [
217
                'task',
218
                'created',
219
                'duration' => $runtime
220
            ];
221
        };
222
223
        $conditions = [
224
            'completed IS NOT' => null
225
        ];
226
        if ($taskName) {
227
            $conditions['task'] = $taskName;
228
        }
229
230
        $tasks = $this->find()
231
            ->select($fields)
232
            ->where($conditions)
233
            ->enableHydration(false)
234
            ->orderDesc('id')
235
            ->limit(static::STATS_LIMIT)
236
            ->all()
237
            ->toArray();
238
239
        $result = [];
240
241
        $days = [];
242
243
        foreach ($tasks as $task) {
244
            /** @var \DateTime $created */
245
            $created = $task['created'];
246
            $day = $created->format('Y-m-d');
247
            if (!isset($days[$day])) {
248
                $days[$day] = $day;
249
            }
250
251
            $result[$task['task']][$day][] = $task['duration'];
252
        }
253
254
        foreach ($result as $type => $tasks) {
255
            foreach ($tasks as $day => $durations) {
256
                $average = array_sum($durations) / count($durations);
257
                $result[$type][$day] = (int)$average;
258
            }
259
260
            foreach ($days as $day) {
261
                if (isset($result[$type][$day])) {
262
                    continue;
263
                }
264
265
                $result[$type][$day] = 0;
266
            }
267
268
            ksort($result[$type]);
269
        }
270
271
        return $result;
272
    }
273
274
    /**
275
     * Look for a new job that can be processed with the current abilities and
276
     * from the specified group (or any if null).
277
     *
278
     * @param array $capabilities Available QueueWorkerTasks.
279
     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
280
     * @return \Queue\Model\Entity\QueuedTask|null
281
     */
282
    public function requestJob(array $capabilities, array $types = [])
283
    {
284
        $now = $this->getDateTime();
285
        $nowStr = $now->toDateTimeString();
286
        $driverName = $this->_getDriverName();
287
288
        $query = $this->find();
289
        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
290
        switch ($driverName) {
291
            case static::DRIVER_SQLSERVER:
292
                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
293
                break;
294
            case static::DRIVER_POSTGRES:
295
                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
296
                break;
297
        }
298
        $options = [
299
            'conditions' => [
300
                'completed IS' => null,
301
                'OR' => []
302
            ],
303
            'fields' => [
304
                'age' => $age
305
            ],
306
            'order' => [
307
                'age' => 'ASC',
308
                'id' => 'ASC'
309
            ]
310
        ];
311
312
        if ($types) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $types of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
313
            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
314
        }
315
316
        // Generate the task specific conditions.
317
        foreach ($capabilities as $task) {
318
            list ($plugin, $name) = pluginSplit($task['name']);
319
            $timeoutAt = $now->copy();
320
            $tmp = [
321
                'task' => $name,
322
                'AND' => [
323
                    [
324
                        'OR' => [
325
                            'not_before <=' => $nowStr,
326
                            'not_before IS' => null
327
                        ]
328
                    ],
329
                    [
330
                        'OR' => [
331
                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
332
                            'fetched IS' => null
333
                        ]
334
                    ]
335
                ],
336
                'failed_count <' => ($task['retries'] + 1)
337
            ];
338
            $options['conditions']['OR'][] = $tmp;
339
        }
340
341
        /** @var \Queue\Model\Entity\QueuedTask|null $task */
342
        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
343
            $task = $query->find('all', $options)
344
                ->enableAutoFields(true)
345
                ->epilog('FOR UPDATE')
346
                ->first();
347
348
            if (!$task) {
349
                return null;
350
            }
351
352
            $key = sha1(microtime());
353
            $task = $this->patchEntity($task, [
0 ignored issues
show
Bug introduced by
It seems like $task can also be of type array; however, parameter $entity of Queue\Model\Table\QueuedTasksTable::patchEntity() does only seem to accept Cake\Datasource\EntityInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

353
            $task = $this->patchEntity(/** @scrutinizer ignore-type */ $task, [
Loading history...
354
                'worker_key' => $key,
355
                'fetched' => $now
356
            ]);
357
358
            return $this->saveOrFail($task);
359
        });
360
361
        if (!$task) {
362
            return null;
363
        }
364
365
        return $task;
366
    }
367
368
    /**
369
     * Mark a task as Completed, removing it from the queue.
370
     *
371
     * @param \Queue\Model\Entity\QueuedTask $task Task
372
     * @return bool Success
373
     */
374
    public function markJobDone(QueuedTask $task)
375
    {
376
        $fields = [
377
            'completed' => $this->getDateTime()
378
        ];
379
        $task = $this->patchEntity($task, $fields);
380
381
        return (bool)$this->save($task);
382
    }
383
384
    /**
385
     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
386
     *
387
     * @param \Queue\Model\Entity\QueuedTask $task Task
388
     * @param string|null $failureMessage Optional message to append to the failure_message field.
389
     * @return bool Success
390
     */
391
    public function markJobFailed(QueuedTask $task, $failureMessage = null)
392
    {
393
        $fields = [
394
            'failed_count' => $task->failed_count + 1,
395
            'failure_message' => $failureMessage
396
        ];
397
        $task = $this->patchEntity($task, $fields);
398
399
        return (bool)$this->save($task);
400
    }
401
402
    /**
403
     * Reset current jobs
404
     *
405
     * @param int|null $id ID
406
     *
407
     * @return int Success
408
     */
409
    public function reset($id = null)
410
    {
411
        $fields = [
412
            'completed' => null,
413
            'fetched' => null,
414
            'failed_count' => 0,
415
            'worker_key' => null,
416
            'failure_message' => null
417
        ];
418
        $conditions = [
419
            'completed IS' => null
420
        ];
421
        if ($id) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $id of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
422
            $conditions['id'] = $id;
423
        }
424
425
        return $this->updateAll($fields, $conditions);
426
    }
427
428
    /**
429
     *
430
     * @param string $taskName Task name
431
     *
432
     * @return int
433
     */
434
    public function rerun($taskName)
435
    {
436
        $fields = [
437
            'completed' => null,
438
            'fetched' => null,
439
            'failed_count' => 0,
440
            'worker_key' => null,
441
            'failure_message' => null
442
        ];
443
        $conditions = [
444
            'completed IS NOT' => null,
445
            'task' => $taskName
446
        ];
447
448
        return $this->updateAll($fields, $conditions);
449
    }
450
451
    /**
452
     * Cleanup/Delete Completed Tasks.
453
     *
454
     * @return void
455
     */
456
    public function cleanOldJobs()
457
    {
458
        if (!Configure::read('Queue.cleanuptimeout')) {
459
            return;
460
        }
461
462
        $this->deleteAll([
463
            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
464
        ]);
465
    }
466
467
    /**
468
     *
469
     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
470
     * @param array $taskConfiguration Task configuration
471
     * @return string
472
     */
473
    public function getFailedStatus($queuedTask, array $taskConfiguration)
474
    {
475
        $failureMessageRequeued = 'requeued';
476
477
        $queuedTaskName = 'Queue' . $queuedTask->task;
478
        if (empty($taskConfiguration[$queuedTaskName])) {
479
            return $failureMessageRequeued;
480
        }
481
        $retries = $taskConfiguration[$queuedTaskName]['retries'];
482
        if ($queuedTask->failed_count <= $retries) {
483
            return $failureMessageRequeued;
484
        }
485
486
        return 'aborted';
487
    }
488
489
    /**
490
     * truncate()
491
     *
492
     * @return void
493
     */
494
    public function truncate()
495
    {
496
        $sql = $this->getSchema()->truncateSql($this->_connection);
497
        foreach ($sql as $snippet) {
498
            $this->_connection->execute($snippet);
499
        }
500
    }
501
502
    /**
503
     * get the name of the driver
504
     *
505
     * @return string
506
     */
507
    protected function _getDriverName()
508
    {
509
        $className = explode('\\', $this->getConnection()->config()['driver']);
510
        $name = end($className);
511
512
        return $name;
513
    }
514
515
    /**
516
     *
517
     * @param array $conditions Conditions
518
     * @param string $key Key
519
     * @param array $values Values
520
     * @return array
521
     */
522
    protected function addFilter(array $conditions, $key, array $values)
523
    {
524
        $include = [];
525
        $exclude = [];
526
        foreach ($values as $value) {
527
            if (substr($value, 0, 1) === '-') {
528
                $exclude[] = substr($value, 1);
529
            } else {
530
                $include[] = $value;
531
            }
532
        }
533
534
        if ($include) {
535
            $conditions[$key . ' IN'] = $include;
536
        }
537
        if ($exclude) {
538
            $conditions[$key . ' NOT IN'] = $exclude;
539
        }
540
541
        return $conditions;
542
    }
543
544
    /**
545
     * Returns a DateTime object from different input.
546
     *
547
     * Without argument this will be "now".
548
     *
549
     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
550
     *
551
     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
552
     */
553
    protected function getDateTime($notBefore = null)
554
    {
555
        if (is_object($notBefore)) {
556
            return $notBefore;
557
        }
558
559
        return new FrozenTime($notBefore);
560
    }
561
}
562