Completed
Pull Request — master (#35)
by
unknown
03:37
created

QueuedTasksTable::getStats()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 36
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 23
nc 1
nop 0
dl 0
loc 36
rs 9.552
c 0
b 0
f 0
1
<?php
2
namespace Queue\Model\Table;
3
4
use ArrayObject;
5
use Cake\Core\Configure;
6
use Cake\Event\Event;
7
use Cake\I18n\FrozenTime;
8
use Cake\ORM\Query;
9
use Cake\ORM\Table;
10
use InvalidArgumentException;
11
use Queue\Model\Entity\QueuedTask;
12
use RuntimeException;
13
14
/**
15
 * QueuedTasks Model
16
 *
17
 * @method \Queue\Model\Entity\QueuedTask get($primaryKey, $options = [])
18
 * @method \Queue\Model\Entity\QueuedTask newEntity($data = null, array $options = [])
19
 * @method \Queue\Model\Entity\QueuedTask[] newEntities(array $data, array $options = [])
20
 * @method \Queue\Model\Entity\QueuedTask|bool save(\Cake\Datasource\EntityInterface $entity, $options = [])
21
 * @method \Queue\Model\Entity\QueuedTask saveOrFail(\Cake\Datasource\EntityInterface $entity, $options = [])
22
 * @method \Queue\Model\Entity\QueuedTask patchEntity(\Cake\Datasource\EntityInterface $entity, array $data, array $options = [])
23
 * @method \Queue\Model\Entity\QueuedTask[] patchEntities($entities, array $data, array $options = [])
24
 * @method \Queue\Model\Entity\QueuedTask findOrCreate($search, callable $callback = null, $options = [])
25
 *
26
 * @mixin \Cake\ORM\Behavior\TimestampBehavior
27
 */
28
class QueuedTasksTable extends Table
29
{
30
31
    const DRIVER_MYSQL = 'Mysql';
32
33
    const DRIVER_POSTGRES = 'Postgres';
34
35
    const DRIVER_SQLSERVER = 'Sqlserver';
36
37
    const STATS_LIMIT = 100000;
38
39
    /**
40
     *
41
     * @var string|null
42
     */
43
    protected $_key;
44
45
    /**
46
     * Initialize method
47
     *
48
     * @param array $config The configuration for the Table.
49
     * @return void
50
     */
51
    public function initialize(array $config)
52
    {
53
        parent::initialize($config);
54
55
        $this->setTable('queued_tasks');
56
        $this->setDisplayField('id');
57
        $this->setPrimaryKey('id');
58
59
        $this->addBehavior('Timestamp');
60
    }
61
62
    /**
63
     * set connection name
64
     *
65
     * @return string
66
     */
67
    public static function defaultConnectionName()
68
    {
69
        $connection = Configure::read('Queue.connection');
70
        if (!empty($connection)) {
71
            return $connection;
72
        }
73
74
        return parent::defaultConnectionName();
75
    }
76
77
    /**
78
     *
79
     * @param \Cake\Event\Event $event Model event
80
     * @param \ArrayObject $data The data
81
     * @param \ArrayObject $options The options
82
     * @return void
83
     */
84
    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

84
    public function beforeMarshal(Event $event, ArrayObject $data, /** @scrutinizer ignore-unused */ ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

84
    public function beforeMarshal(/** @scrutinizer ignore-unused */ Event $event, ArrayObject $data, ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
85
    {
86
        if (isset($data['data']) && $data['data'] === '') {
87
            $data['data'] = null;
88
        }
89
    }
90
91
    /**
92
     * Adds a new job to the queue.
93
     *
94
     * @param string $taskName Task name
95
     * @param array|null $data Array of data
96
     * @param string $notBefore A datetime which indicates when the job may be executed
97
     * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
     */
99
    public function createJob($taskName, array $data = null, string $notBefore = null)
100
    {
101
        $task = [
102
            'task' => $taskName,
103
            'data' => serialize($data),
104
            'not_before' => $this->getDateTime()
105
        ];
106
107
        if (!empty($notBefore)) {
108
            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
        }
110
111
        $queuedTask = $this->newEntity($task);
112
113
        return $this->saveOrFail($queuedTask);
114
    }
115
116
    /**
117
     *
118
     * @param string|null $taskName Task name
119
     * @return bool
120
     */
121
    public function isQueued($taskName = null)
122
    {
123
        $conditions = [
124
            'completed IS' => null
125
        ];
126
        if ($taskName) {
127
            $conditions['task'] = $taskName;
128
        }
129
130
        return (bool)$this->find()
131
            ->where($conditions)
132
            ->select([
133
                'id'
134
            ])
135
            ->first();
136
    }
137
138
    /**
139
     * Returns the number of items in the queue.
140
     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
141
     *
142
     * @param string|null $taskName Task type to Count
143
     * @return int
144
     */
145
    public function getLength($taskName = null)
146
    {
147
        $findConf = [
148
            'conditions' => [
149
                'completed IS' => null
150
            ]
151
        ];
152
        if ($taskName !== null) {
153
            $findConf['conditions']['task'] = $taskName;
154
        }
155
156
        return $this->find('all', $findConf)->count();
157
    }
158
159
    /**
160
     * Return a list of all task types in the Queue.
161
     *
162
     * @return \Cake\ORM\Query
163
     */
164
    public function getTypes()
165
    {
166
        $findCond = [
167
            'fields' => [
168
                'task'
169
            ],
170
            'group' => [
171
                'task'
172
            ],
173
            'keyField' => 'task',
174
            'valueField' => 'task'
175
        ];
176
177
        return $this->find('list', $findCond);
178
    }
179
180
    /**
181
     * Return some statistics about finished jobs still in the Database.
182
     * TO-DO: rewrite as virtual field
183
     *
184
     * @return \Cake\ORM\Query
185
     */
186
    public function getStats()
187
    {
188
        $driverName = $this->_getDriverName();
189
        $options = [
190
            'fields' => function (Query $query) use ($driverName) {
191
                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
192
                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
193
                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
194
                switch ($driverName) {
195
                    case static::DRIVER_SQLSERVER:
196
                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
197
                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
198
                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
199
                        break;
200
                }
201
                /**
202
                 *
203
                 * @var \Cake\ORM\Query
204
                 */
205
                return [
206
                    'task',
207
                    'num' => $query->func()->count('*'),
208
                    'alltime' => $alltime,
209
                    'runtime' => $runtime,
210
                    'fetchdelay' => $fetchdelay
211
                ];
212
            },
213
            'conditions' => [
214
                'completed IS NOT' => null
215
            ],
216
            'group' => [
217
                'task'
218
            ]
219
        ];
220
221
        return $this->find('all', $options);
222
    }
223
224
    /**
225
     * Returns [
226
     * 'Task' => [
227
     * 'YYYY-MM-DD' => INT,
228
     * ...
229
     * ]
230
     * ]
231
     *
232
     * @param string|null $taskName The task name
233
     * @return array
234
     */
235
    public function getFullStats($taskName = null)
236
    {
237
        $driverName = $this->_getDriverName();
238
        $fields = function (Query $query) use ($driverName) {
239
            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
240
            switch ($driverName) {
241
                case static::DRIVER_SQLSERVER:
242
                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
243
                    break;
244
            }
245
246
            return [
247
                'task',
248
                'created',
249
                'duration' => $runtime
250
            ];
251
        };
252
253
        $conditions = [
254
            'completed IS NOT' => null
255
        ];
256
        if ($taskName) {
257
            $conditions['task'] = $taskName;
258
        }
259
260
        $tasks = $this->find()
261
            ->select($fields)
262
            ->where($conditions)
263
            ->enableHydration(false)
264
            ->orderDesc('id')
265
            ->limit(static::STATS_LIMIT)
266
            ->all()
267
            ->toArray();
268
269
        $result = [];
270
271
        $days = [];
272
273
        foreach ($tasks as $task) {
274
            /** @var \DateTime $created */
275
            $created = $task['created'];
276
            $day = $created->format('Y-m-d');
277
            if (!isset($days[$day])) {
278
                $days[$day] = $day;
279
            }
280
281
            $result[$task['task']][$day][] = $task['duration'];
282
        }
283
284
        foreach ($result as $type => $tasks) {
285
            foreach ($tasks as $day => $durations) {
286
                $average = array_sum($durations) / count($durations);
287
                $result[$type][$day] = (int)$average;
288
            }
289
290
            foreach ($days as $day) {
291
                if (isset($result[$type][$day])) {
292
                    continue;
293
                }
294
295
                $result[$type][$day] = 0;
296
            }
297
298
            ksort($result[$type]);
299
        }
300
301
        return $result;
302
    }
303
304
    /**
305
     * Look for a new job that can be processed with the current abilities and
306
     * from the specified group (or any if null).
307
     *
308
     * @param array $capabilities Available QueueWorkerTasks.
309
     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
310
     * @return \Queue\Model\Entity\QueuedTask|null
311
     */
312
    public function requestJob(array $capabilities, array $types = [])
313
    {
314
        $now = $this->getDateTime();
315
        $nowStr = $now->toDateTimeString();
316
        $driverName = $this->_getDriverName();
317
318
        $query = $this->find();
319
        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
320
        switch ($driverName) {
321
            case static::DRIVER_SQLSERVER:
322
                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
323
                break;
324
            case static::DRIVER_POSTGRES:
325
                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
326
                break;
327
        }
328
        $options = [
329
            'conditions' => [
330
                'completed IS' => null,
331
                'OR' => []
332
            ],
333
            'fields' => [
334
                'age' => $age
335
            ],
336
            'order' => [
337
                'age' => 'ASC',
338
                'id' => 'ASC'
339
            ]
340
        ];
341
342
        if ($types) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $types of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
343
            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
344
        }
345
346
        // Generate the task specific conditions.
347
        foreach ($capabilities as $task) {
348
            list ($plugin, $name) = pluginSplit($task['name']);
349
            $timeoutAt = $now->copy();
350
            $tmp = [
351
                'task' => $name,
352
                'AND' => [
353
                    [
354
                        'OR' => [
355
                            'not_before <' => $nowStr,
356
                            'not_before IS' => null
357
                        ]
358
                    ],
359
                    [
360
                        'OR' => [
361
                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
362
                            'fetched IS' => null
363
                        ]
364
                    ]
365
                ],
366
                'failed_count <' => ($task['retries'] + 1)
367
            ];
368
            $options['conditions']['OR'][] = $tmp;
369
        }
370
371
        /** @var \Queue\Model\Entity\QueuedTask|null $task */
372
        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
373
            $task = $query->find('all', $options)
374
                ->enableAutoFields(true)
375
                ->epilog('FOR UPDATE')
376
                ->first();
377
378
            if (!$task) {
379
                return null;
380
            }
381
382
            $key = $this->key();
383
            $task = $this->patchEntity($task, [
0 ignored issues
show
Bug introduced by
It seems like $task can also be of type array; however, parameter $entity of Queue\Model\Table\QueuedTasksTable::patchEntity() does only seem to accept Cake\Datasource\EntityInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

383
            $task = $this->patchEntity(/** @scrutinizer ignore-type */ $task, [
Loading history...
384
                'worker_key' => $key,
385
                'fetched' => $now
386
            ]);
387
388
            return $this->saveOrFail($task);
389
        });
390
391
        if (!$task) {
392
            return null;
393
        }
394
395
        return $task;
396
    }
397
398
    /**
399
     * Mark a task as Completed, removing it from the queue.
400
     *
401
     * @param \Queue\Model\Entity\QueuedTask $task Task
402
     * @return bool Success
403
     */
404
    public function markJobDone(QueuedTask $task)
405
    {
406
        $fields = [
407
            'completed' => $this->getDateTime()
408
        ];
409
        $task = $this->patchEntity($task, $fields);
410
411
        return (bool)$this->save($task);
412
    }
413
414
    /**
415
     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
416
     *
417
     * @param \Queue\Model\Entity\QueuedTask $task Task
418
     * @param string|null $failureMessage Optional message to append to the failure_message field.
419
     * @return bool Success
420
     */
421
    public function markJobFailed(QueuedTask $task, $failureMessage = null)
422
    {
423
        $fields = [
424
            'failed_count' => $task->failed_count + 1,
425
            'failure_message' => $failureMessage
426
        ];
427
        $task = $this->patchEntity($task, $fields);
428
429
        return (bool)$this->save($task);
430
    }
431
432
    /**
433
     * Reset current jobs
434
     *
435
     * @param int|null $id ID
436
     *
437
     * @return int Success
438
     */
439
    public function reset($id = null)
440
    {
441
        $fields = [
442
            'completed' => null,
443
            'fetched' => null,
444
            'failed_count' => 0,
445
            'worker_key' => null,
446
            'failure_message' => null
447
        ];
448
        $conditions = [
449
            'completed IS' => null
450
        ];
451
        if ($id) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $id of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
452
            $conditions['id'] = $id;
453
        }
454
455
        return $this->updateAll($fields, $conditions);
456
    }
457
458
    /**
459
     *
460
     * @param string $taskName Task name
461
     *
462
     * @return int
463
     */
464
    public function rerun($taskName)
465
    {
466
        $fields = [
467
            'completed' => null,
468
            'fetched' => null,
469
            'failed_count' => 0,
470
            'worker_key' => null,
471
            'failure_message' => null
472
        ];
473
        $conditions = [
474
            'completed IS NOT' => null,
475
            'task' => $taskName
476
        ];
477
478
        return $this->updateAll($fields, $conditions);
479
    }
480
481
    /**
482
     * Cleanup/Delete Completed Tasks.
483
     *
484
     * @return void
485
     */
486
    public function cleanOldJobs()
487
    {
488
        if (!Configure::read('Queue.cleanuptimeout')) {
489
            return;
490
        }
491
492
        $this->deleteAll([
493
            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
494
        ]);
495
    }
496
497
    /**
498
     *
499
     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
500
     * @param array $taskConfiguration Task configuration
501
     * @return string
502
     */
503
    public function getFailedStatus($queuedTask, array $taskConfiguration)
504
    {
505
        $failureMessageRequeued = 'requeued';
506
507
        $queuedTaskName = 'Queue' . $queuedTask->task;
508
        if (empty($taskConfiguration[$queuedTaskName])) {
509
            return $failureMessageRequeued;
510
        }
511
        $retries = $taskConfiguration[$queuedTaskName]['retries'];
512
        if ($queuedTask->failed_count <= $retries) {
513
            return $failureMessageRequeued;
514
        }
515
516
        return 'aborted';
517
    }
518
519
    /**
520
     * Generates a unique Identifier for the current worker thread.
521
     *
522
     * Useful to identify the currently running processes for this thread.
523
     *
524
     * @return string Identifier
525
     */
526
    public function key()
527
    {
528
        if ($this->_key !== null) {
529
            return $this->_key;
530
        }
531
        $this->_key = sha1(microtime());
532
        if (!$this->_key) {
533
            throw new RuntimeException('Invalid key generated');
534
        }
535
536
        return $this->_key;
537
    }
538
539
    /**
540
     * Resets worker Identifier
541
     *
542
     * @return void
543
     */
544
    public function clearKey()
545
    {
546
        $this->_key = null;
547
    }
548
549
    /**
550
     * truncate()
551
     *
552
     * @return void
553
     */
554
    public function truncate()
555
    {
556
        $sql = $this->getSchema()->truncateSql($this->_connection);
557
        foreach ($sql as $snippet) {
558
            $this->_connection->execute($snippet);
559
        }
560
    }
561
562
    /**
563
     * get the name of the driver
564
     *
565
     * @return string
566
     */
567
    protected function _getDriverName()
568
    {
569
        $className = explode('\\', $this->getConnection()->config()['driver']);
570
        $name = end($className);
571
572
        return $name;
573
    }
574
575
    /**
576
     *
577
     * @param array $conditions Conditions
578
     * @param string $key Key
579
     * @param array $values Values
580
     * @return array
581
     */
582
    protected function addFilter(array $conditions, $key, array $values)
583
    {
584
        $include = [];
585
        $exclude = [];
586
        foreach ($values as $value) {
587
            if (substr($value, 0, 1) === '-') {
588
                $exclude[] = substr($value, 1);
589
            } else {
590
                $include[] = $value;
591
            }
592
        }
593
594
        if ($include) {
595
            $conditions[$key . ' IN'] = $include;
596
        }
597
        if ($exclude) {
598
            $conditions[$key . ' NOT IN'] = $exclude;
599
        }
600
601
        return $conditions;
602
    }
603
604
    /**
605
     * Returns a DateTime object from different input.
606
     *
607
     * Without argument this will be "now".
608
     *
609
     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
610
     *
611
     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
612
     */
613
    protected function getDateTime($notBefore = null)
614
    {
615
        if (is_object($notBefore)) {
616
            return $notBefore;
617
        }
618
619
        return new FrozenTime($notBefore);
620
    }
621
}
622