Completed
Pull Request — master (#35)
by
unknown
03:29
created

QueuedTasksTable::getFailedStatus()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 8
nc 3
nop 2
dl 0
loc 14
rs 10
c 0
b 0
f 0
1
<?php
2
namespace Queue\Model\Table;
3
4
use ArrayObject;
5
use Cake\Core\Configure;
6
use Cake\Event\Event;
7
use Cake\I18n\FrozenTime;
8
use Cake\ORM\Query;
9
use Cake\ORM\Table;
10
use InvalidArgumentException;
11
use Queue\Model\Entity\QueuedTask;
12
use RuntimeException;
13
14
/**
15
 * QueuedTasks Model
16
 *
17
 * @method \Queue\Model\Entity\QueuedTask get($primaryKey, $options = [])
18
 * @method \Queue\Model\Entity\QueuedTask newEntity($data = null, array $options = [])
19
 * @method \Queue\Model\Entity\QueuedTask[] newEntities(array $data, array $options = [])
20
 * @method \Queue\Model\Entity\QueuedTask|bool save(\Cake\Datasource\EntityInterface $entity, $options = [])
21
 * @method \Queue\Model\Entity\QueuedTask saveOrFail(\Cake\Datasource\EntityInterface $entity, $options = [])
22
 * @method \Queue\Model\Entity\QueuedTask patchEntity(\Cake\Datasource\EntityInterface $entity, array $data, array $options = [])
23
 * @method \Queue\Model\Entity\QueuedTask[] patchEntities($entities, array $data, array $options = [])
24
 * @method \Queue\Model\Entity\QueuedTask findOrCreate($search, callable $callback = null, $options = [])
25
 *
26
 * @mixin \Cake\ORM\Behavior\TimestampBehavior
27
 */
28
class QueuedTasksTable extends Table
29
{
30
31
    const DRIVER_MYSQL = 'Mysql';
32
33
    const DRIVER_POSTGRES = 'Postgres';
34
35
    const DRIVER_SQLSERVER = 'Sqlserver';
36
37
    const STATS_LIMIT = 100000;
38
39
    /**
40
     * Initialize method
41
     *
42
     * @param array $config The configuration for the Table.
43
     * @return void
44
     */
45
    public function initialize(array $config)
46
    {
47
        parent::initialize($config);
48
49
        $this->setTable('queued_tasks');
50
        $this->setDisplayField('id');
51
        $this->setPrimaryKey('id');
52
53
        $this->addBehavior('Timestamp');
54
    }
55
56
    /**
57
     * set connection name
58
     *
59
     * @return string
60
     */
61
    public static function defaultConnectionName()
62
    {
63
        $connection = Configure::read('Queue.connection');
64
        if (!empty($connection)) {
65
            return $connection;
66
        }
67
68
        return parent::defaultConnectionName();
69
    }
70
71
    /**
72
     *
73
     * @param \Cake\Event\Event $event Model event
74
     * @param \ArrayObject $data The data
75
     * @param \ArrayObject $options The options
76
     * @return void
77
     */
78
    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

78
    public function beforeMarshal(Event $event, ArrayObject $data, /** @scrutinizer ignore-unused */ ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

78
    public function beforeMarshal(/** @scrutinizer ignore-unused */ Event $event, ArrayObject $data, ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
79
    {
80
        if (isset($data['data']) && $data['data'] === '') {
81
            $data['data'] = null;
82
        }
83
    }
84
85
    /**
86
     * Adds a new job to the queue.
87
     *
88
     * @param string $taskName Task name
89
     * @param array|null $data Array of data
90
     * @param string $notBefore A datetime which indicates when the job may be executed
91
     * @return \Queue\Model\Entity\QueuedTask Saved job entity
92
     */
93
    public function createJob($taskName, array $data = null, string $notBefore = null)
94
    {
95
        $task = [
96
            'task' => $taskName,
97
            'data' => serialize($data),
98
            'not_before' => $this->getDateTime()
99
        ];
100
101
        if (!empty($notBefore)) {
102
            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
103
        }
104
105
        $queuedTask = $this->newEntity($task);
106
107
        return $this->saveOrFail($queuedTask);
108
    }
109
110
    /**
111
     * Returns the number of items in the queue.
112
     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
113
     *
114
     * @param string|null $taskName Task type to Count
115
     * @return int
116
     */
117
    public function getLength($taskName = null)
118
    {
119
        $findConf = [
120
            'conditions' => [
121
                'completed IS' => null
122
            ]
123
        ];
124
        if ($taskName !== null) {
125
            $findConf['conditions']['task'] = $taskName;
126
        }
127
128
        return $this->find('all', $findConf)->count();
129
    }
130
131
    /**
132
     * Return a list of all task types in the Queue.
133
     *
134
     * @return \Cake\ORM\Query
135
     */
136
    public function getTypes()
137
    {
138
        $findCond = [
139
            'fields' => [
140
                'task'
141
            ],
142
            'group' => [
143
                'task'
144
            ],
145
            'keyField' => 'task',
146
            'valueField' => 'task'
147
        ];
148
149
        return $this->find('list', $findCond);
150
    }
151
152
    /**
153
     * Return some statistics about finished jobs still in the Database.
154
     * TO-DO: rewrite as virtual field
155
     *
156
     * @return \Cake\ORM\Query
157
     */
158
    public function getStats()
159
    {
160
        $driverName = $this->_getDriverName();
161
        $options = [
162
            'fields' => function (Query $query) use ($driverName) {
163
                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
164
                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
165
                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
166
                switch ($driverName) {
167
                    case static::DRIVER_SQLSERVER:
168
                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
169
                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
170
                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
171
                        break;
172
                }
173
                /**
174
                 *
175
                 * @var \Cake\ORM\Query
176
                 */
177
                return [
178
                    'task',
179
                    'num' => $query->func()->count('*'),
180
                    'alltime' => $alltime,
181
                    'runtime' => $runtime,
182
                    'fetchdelay' => $fetchdelay
183
                ];
184
            },
185
            'conditions' => [
186
                'completed IS NOT' => null
187
            ],
188
            'group' => [
189
                'task'
190
            ]
191
        ];
192
193
        return $this->find('all', $options);
194
    }
195
196
    /**
197
     * Returns [
198
     * 'Task' => [
199
     * 'YYYY-MM-DD' => INT,
200
     * ...
201
     * ]
202
     * ]
203
     *
204
     * @param string|null $taskName The task name
205
     * @return array
206
     */
207
    public function getFullStats($taskName = null)
208
    {
209
        $driverName = $this->_getDriverName();
210
        $fields = function (Query $query) use ($driverName) {
211
            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
212
            switch ($driverName) {
213
                case static::DRIVER_SQLSERVER:
214
                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
215
                    break;
216
            }
217
218
            return [
219
                'task',
220
                'created',
221
                'duration' => $runtime
222
            ];
223
        };
224
225
        $conditions = [
226
            'completed IS NOT' => null
227
        ];
228
        if ($taskName) {
229
            $conditions['task'] = $taskName;
230
        }
231
232
        $tasks = $this->find()
233
            ->select($fields)
234
            ->where($conditions)
235
            ->enableHydration(false)
236
            ->orderDesc('id')
237
            ->limit(static::STATS_LIMIT)
238
            ->all()
239
            ->toArray();
240
241
        $result = [];
242
243
        $days = [];
244
245
        foreach ($tasks as $task) {
246
            /** @var \DateTime $created */
247
            $created = $task['created'];
248
            $day = $created->format('Y-m-d');
249
            if (!isset($days[$day])) {
250
                $days[$day] = $day;
251
            }
252
253
            $result[$task['task']][$day][] = $task['duration'];
254
        }
255
256
        foreach ($result as $type => $tasks) {
257
            foreach ($tasks as $day => $durations) {
258
                $average = array_sum($durations) / count($durations);
259
                $result[$type][$day] = (int)$average;
260
            }
261
262
            foreach ($days as $day) {
263
                if (isset($result[$type][$day])) {
264
                    continue;
265
                }
266
267
                $result[$type][$day] = 0;
268
            }
269
270
            ksort($result[$type]);
271
        }
272
273
        return $result;
274
    }
275
276
    /**
277
     * Look for a new job that can be processed with the current abilities and
278
     * from the specified group (or any if null).
279
     *
280
     * @param array $capabilities Available QueueWorkerTasks.
281
     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
282
     * @return \Queue\Model\Entity\QueuedTask|null
283
     */
284
    public function requestJob(array $capabilities, array $types = [])
285
    {
286
        $now = $this->getDateTime();
287
        $nowStr = $now->toDateTimeString();
288
        $driverName = $this->_getDriverName();
289
290
        $query = $this->find();
291
        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
292
        switch ($driverName) {
293
            case static::DRIVER_SQLSERVER:
294
                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
295
                break;
296
            case static::DRIVER_POSTGRES:
297
                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
298
                break;
299
        }
300
        $options = [
301
            'conditions' => [
302
                'completed IS' => null,
303
                'OR' => []
304
            ],
305
            'fields' => [
306
                'age' => $age
307
            ],
308
            'order' => [
309
                'age' => 'ASC',
310
                'id' => 'ASC'
311
            ]
312
        ];
313
314
        if ($types) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $types of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
315
            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
316
        }
317
318
        // Generate the task specific conditions.
319
        foreach ($capabilities as $task) {
320
            list ($plugin, $name) = pluginSplit($task['name']);
321
            $timeoutAt = $now->copy();
322
            $tmp = [
323
                'task' => $name,
324
                'AND' => [
325
                    [
326
                        'OR' => [
327
                            'not_before <' => $nowStr,
328
                            'not_before IS' => null
329
                        ]
330
                    ],
331
                    [
332
                        'OR' => [
333
                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
334
                            'fetched IS' => null
335
                        ]
336
                    ]
337
                ],
338
                'failed_count <' => ($task['retries'] + 1)
339
            ];
340
            $options['conditions']['OR'][] = $tmp;
341
        }
342
        debug($query);
343
        debug($options);
344
        /** @var \Queue\Model\Entity\QueuedTask|null $task */
345
        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
346
            $task = $query->find('all', $options)
347
                ->enableAutoFields(true)
348
                ->epilog('FOR UPDATE')
349
                ->first();
350
351
            if (!$task) {
352
                return null;
353
            }
354
355
            $key = sha1(microtime());
356
            $task = $this->patchEntity($task, [
0 ignored issues
show
Bug introduced by
It seems like $task can also be of type array; however, parameter $entity of Queue\Model\Table\QueuedTasksTable::patchEntity() does only seem to accept Cake\Datasource\EntityInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

356
            $task = $this->patchEntity(/** @scrutinizer ignore-type */ $task, [
Loading history...
357
                'worker_key' => $key,
358
                'fetched' => $now
359
            ]);
360
361
            return $this->saveOrFail($task);
362
        });
363
364
        if (!$task) {
365
            return null;
366
        }
367
368
        return $task;
369
    }
370
371
    /**
372
     * Mark a task as Completed, removing it from the queue.
373
     *
374
     * @param \Queue\Model\Entity\QueuedTask $task Task
375
     * @return bool Success
376
     */
377
    public function markJobDone(QueuedTask $task)
378
    {
379
        $fields = [
380
            'completed' => $this->getDateTime()
381
        ];
382
        $task = $this->patchEntity($task, $fields);
383
384
        return (bool)$this->save($task);
385
    }
386
387
    /**
388
     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
389
     *
390
     * @param \Queue\Model\Entity\QueuedTask $task Task
391
     * @param string|null $failureMessage Optional message to append to the failure_message field.
392
     * @return bool Success
393
     */
394
    public function markJobFailed(QueuedTask $task, $failureMessage = null)
395
    {
396
        $fields = [
397
            'failed_count' => $task->failed_count + 1,
398
            'failure_message' => $failureMessage
399
        ];
400
        $task = $this->patchEntity($task, $fields);
401
402
        return (bool)$this->save($task);
403
    }
404
405
    /**
406
     * Reset current jobs
407
     *
408
     * @param int|null $id ID
409
     *
410
     * @return int Success
411
     */
412
    public function reset($id = null)
413
    {
414
        $fields = [
415
            'completed' => null,
416
            'fetched' => null,
417
            'failed_count' => 0,
418
            'worker_key' => null,
419
            'failure_message' => null
420
        ];
421
        $conditions = [
422
            'completed IS' => null
423
        ];
424
        if ($id) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $id of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
425
            $conditions['id'] = $id;
426
        }
427
428
        return $this->updateAll($fields, $conditions);
429
    }
430
431
    /**
432
     *
433
     * @param string $taskName Task name
434
     *
435
     * @return int
436
     */
437
    public function rerun($taskName)
438
    {
439
        $fields = [
440
            'completed' => null,
441
            'fetched' => null,
442
            'failed_count' => 0,
443
            'worker_key' => null,
444
            'failure_message' => null
445
        ];
446
        $conditions = [
447
            'completed IS NOT' => null,
448
            'task' => $taskName
449
        ];
450
451
        return $this->updateAll($fields, $conditions);
452
    }
453
454
    /**
455
     * Cleanup/Delete Completed Tasks.
456
     *
457
     * @return void
458
     */
459
    public function cleanOldJobs()
460
    {
461
        if (!Configure::read('Queue.cleanuptimeout')) {
462
            return;
463
        }
464
465
        $this->deleteAll([
466
            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
467
        ]);
468
    }
469
470
    /**
471
     *
472
     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
473
     * @param array $taskConfiguration Task configuration
474
     * @return string
475
     */
476
    public function getFailedStatus($queuedTask, array $taskConfiguration)
477
    {
478
        $failureMessageRequeued = 'requeued';
479
480
        $queuedTaskName = 'Queue' . $queuedTask->task;
481
        if (empty($taskConfiguration[$queuedTaskName])) {
482
            return $failureMessageRequeued;
483
        }
484
        $retries = $taskConfiguration[$queuedTaskName]['retries'];
485
        if ($queuedTask->failed_count <= $retries) {
486
            return $failureMessageRequeued;
487
        }
488
489
        return 'aborted';
490
    }
491
492
    /**
493
     * truncate()
494
     *
495
     * @return void
496
     */
497
    public function truncate()
498
    {
499
        $sql = $this->getSchema()->truncateSql($this->_connection);
500
        foreach ($sql as $snippet) {
501
            $this->_connection->execute($snippet);
502
        }
503
    }
504
505
    /**
506
     * get the name of the driver
507
     *
508
     * @return string
509
     */
510
    protected function _getDriverName()
511
    {
512
        $className = explode('\\', $this->getConnection()->config()['driver']);
513
        $name = end($className);
514
515
        return $name;
516
    }
517
518
    /**
519
     *
520
     * @param array $conditions Conditions
521
     * @param string $key Key
522
     * @param array $values Values
523
     * @return array
524
     */
525
    protected function addFilter(array $conditions, $key, array $values)
526
    {
527
        $include = [];
528
        $exclude = [];
529
        foreach ($values as $value) {
530
            if (substr($value, 0, 1) === '-') {
531
                $exclude[] = substr($value, 1);
532
            } else {
533
                $include[] = $value;
534
            }
535
        }
536
537
        if ($include) {
538
            $conditions[$key . ' IN'] = $include;
539
        }
540
        if ($exclude) {
541
            $conditions[$key . ' NOT IN'] = $exclude;
542
        }
543
544
        return $conditions;
545
    }
546
547
    /**
548
     * Returns a DateTime object from different input.
549
     *
550
     * Without argument this will be "now".
551
     *
552
     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
553
     *
554
     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
555
     */
556
    protected function getDateTime($notBefore = null)
557
    {
558
        if (is_object($notBefore)) {
559
            return $notBefore;
560
        }
561
562
        return new FrozenTime($notBefore);
563
    }
564
}
565