Completed
Pull Request — master (#35)
by
unknown
03:38
created

QueuedTasksTable::getDateTime()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
<?php
2
namespace Queue\Model\Table;
3
4
use ArrayObject;
5
use Cake\Core\Configure;
6
use Cake\Event\Event;
7
use Cake\I18n\FrozenTime;
8
use Cake\ORM\Query;
9
use Cake\ORM\Table;
10
use InvalidArgumentException;
11
use Queue\Model\Entity\QueuedTask;
12
use RuntimeException;
13
14
/**
15
 * QueuedTasks Model
16
 *
17
 * @method \Queue\Model\Entity\QueuedTask get($primaryKey, $options = [])
18
 * @method \Queue\Model\Entity\QueuedTask newEntity($data = null, array $options = [])
19
 * @method \Queue\Model\Entity\QueuedTask[] newEntities(array $data, array $options = [])
20
 * @method \Queue\Model\Entity\QueuedTask|bool save(\Cake\Datasource\EntityInterface $entity, $options = [])
21
 * @method \Queue\Model\Entity\QueuedTask saveOrFail(\Cake\Datasource\EntityInterface $entity, $options = [])
22
 * @method \Queue\Model\Entity\QueuedTask patchEntity(\Cake\Datasource\EntityInterface $entity, array $data, array $options = [])
23
 * @method \Queue\Model\Entity\QueuedTask[] patchEntities($entities, array $data, array $options = [])
24
 * @method \Queue\Model\Entity\QueuedTask findOrCreate($search, callable $callback = null, $options = [])
25
 *
26
 * @mixin \Cake\ORM\Behavior\TimestampBehavior
27
 */
28
class QueuedTasksTable extends Table
29
{
30
31
    const DRIVER_MYSQL = 'Mysql';
32
33
    const DRIVER_POSTGRES = 'Postgres';
34
35
    const DRIVER_SQLSERVER = 'Sqlserver';
36
37
    const STATS_LIMIT = 100000;
38
39
    /**
40
     *
41
     * @var string|null
42
     */
43
    protected $_key;
44
45
    /**
46
     * Initialize method
47
     *
48
     * @param array $config The configuration for the Table.
49
     * @return void
50
     */
51
    public function initialize(array $config)
52
    {
53
        parent::initialize($config);
54
55
        $this->setTable('queued_tasks');
56
        $this->setDisplayField('id');
57
        $this->setPrimaryKey('id');
58
59
        $this->addBehavior('Timestamp');
60
    }
61
62
    /**
63
     * set connection name
64
     *
65
     * @return string
66
     */
67
    public static function defaultConnectionName()
68
    {
69
        $connection = Configure::read('Queue.connection');
70
        if (!empty($connection)) {
71
            return $connection;
72
        }
73
74
        return parent::defaultConnectionName();
75
    }
76
77
    /**
78
     *
79
     * @param \Cake\Event\Event $event Model event
80
     * @param \ArrayObject $data The data
81
     * @param \ArrayObject $options The options
82
     * @return void
83
     */
84
    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

84
    public function beforeMarshal(Event $event, ArrayObject $data, /** @scrutinizer ignore-unused */ ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

84
    public function beforeMarshal(/** @scrutinizer ignore-unused */ Event $event, ArrayObject $data, ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
85
    {
86
        if (isset($data['data']) && $data['data'] === '') {
87
            $data['data'] = null;
88
        }
89
    }
90
91
    /**
92
     * Adds a new job to the queue.
93
     *
94
     * @param string $taskName Task name
95
     * @param array|null $data Array of data
96
     * @param string $notBefore A datetime which indicates when the job may be executed
97
     * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
     */
99
    public function createJob($taskName, array $data = null, string $notBefore = null)
100
    {
101
        $task = [
102
            'task' => $taskName,
103
            'data' => serialize($data),
104
            'not_before' => $this->getDateTime()
105
        ];
106
107
        if (!empty($notBefore)) {
108
            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
        }
110
111
        $queuedTask = $this->newEntity($task);
112
113
        return $this->saveOrFail($queuedTask);
114
    }
115
116
    /**
117
     *
118
     * @param string|null $taskName
119
     *
120
     * @return bool
121
     *
122
     * @throws \InvalidArgumentException
123
     */
124
    public function isQueued($taskName = null)
125
    {
126
        $conditions = [
127
            'completed IS' => null
128
        ];
129
        if ($taskName) {
130
            $conditions['task'] = $taskName;
131
        }
132
133
        return (bool)$this->find()
134
            ->where($conditions)
135
            ->select([
136
                'id'
137
            ])
138
            ->first();
139
    }
140
141
    /**
142
     * Returns the number of items in the queue.
143
     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
144
     *
145
     * @param string|null $taskName Task type to Count
146
     * @return int
147
     */
148
    public function getLength($taskName = null)
149
    {
150
        $findConf = [
151
            'conditions' => [
152
                'completed IS' => null
153
            ]
154
        ];
155
        if ($taskName !== null) {
156
            $findConf['conditions']['task'] = $taskName;
157
        }
158
159
        return $this->find('all', $findConf)->count();
160
    }
161
162
    /**
163
     * Return a list of all task types in the Queue.
164
     *
165
     * @return \Cake\ORM\Query
166
     */
167
    public function getTypes()
168
    {
169
        $findCond = [
170
            'fields' => [
171
                'task'
172
            ],
173
            'group' => [
174
                'task'
175
            ],
176
            'keyField' => 'task',
177
            'valueField' => 'task'
178
        ];
179
180
        return $this->find('list', $findCond);
181
    }
182
183
    /**
184
     * Return some statistics about finished jobs still in the Database.
185
     * TO-DO: rewrite as virtual field
186
     *
187
     * @return \Cake\ORM\Query
188
     */
189
    public function getStats()
190
    {
191
        $driverName = $this->_getDriverName();
192
        $options = [
193
            'fields' => function (Query $query) use ($driverName) {
194
                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
195
                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
196
                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
197
                switch ($driverName) {
198
                    case static::DRIVER_SQLSERVER:
199
                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
200
                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
201
                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
202
                        break;
203
                }
204
                /**
205
                 *
206
                 * @var \Cake\ORM\Query
207
                 */
208
                return [
209
                    'task',
210
                    'num' => $query->func()->count('*'),
211
                    'alltime' => $alltime,
212
                    'runtime' => $runtime,
213
                    'fetchdelay' => $fetchdelay
214
                ];
215
            },
216
            'conditions' => [
217
                'completed IS NOT' => null
218
            ],
219
            'group' => [
220
                'task'
221
            ]
222
        ];
223
224
        return $this->find('all', $options);
225
    }
226
227
    /**
228
     * Returns [
229
     * 'Task' => [
230
     * 'YYYY-MM-DD' => INT,
231
     * ...
232
     * ]
233
     * ]
234
     *
235
     * @param string|null $taskName The task name
236
     * @return array
237
     */
238
    public function getFullStats($taskName = null)
239
    {
240
        $driverName = $this->_getDriverName();
241
        $fields = function (Query $query) use ($driverName) {
242
            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
243
            switch ($driverName) {
244
                case static::DRIVER_SQLSERVER:
245
                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
246
                    break;
247
            }
248
249
            return [
250
                'task',
251
                'created',
252
                'duration' => $runtime
253
            ];
254
        };
255
256
        $conditions = [
257
            'completed IS NOT' => null
258
        ];
259
        if ($taskName) {
260
            $conditions['task'] = $taskName;
261
        }
262
263
        $tasks = $this->find()
264
            ->select($fields)
265
            ->where($conditions)
266
            ->enableHydration(false)
267
            ->orderDesc('id')
268
            ->limit(static::STATS_LIMIT)
269
            ->all()
270
            ->toArray();
271
272
        $result = [];
273
274
        $days = [];
275
276
        foreach ($tasks as $task) {
277
            /** @var \DateTime $created */
278
            $created = $task['created'];
279
            $day = $created->format('Y-m-d');
280
            if (!isset($days[$day])) {
281
                $days[$day] = $day;
282
            }
283
284
            $result[$task['task']][$day][] = $task['duration'];
285
        }
286
287
        foreach ($result as $type => $tasks) {
288
            foreach ($tasks as $day => $durations) {
289
                $average = array_sum($durations) / count($durations);
290
                $result[$type][$day] = (int)$average;
291
            }
292
293
            foreach ($days as $day) {
294
                if (isset($result[$type][$day])) {
295
                    continue;
296
                }
297
298
                $result[$type][$day] = 0;
299
            }
300
301
            ksort($result[$type]);
302
        }
303
304
        return $result;
305
    }
306
307
    /**
308
     * Look for a new job that can be processed with the current abilities and
309
     * from the specified group (or any if null).
310
     *
311
     * @param array $capabilities Available QueueWorkerTasks.
312
     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
313
     * @return \Queue\Model\Entity\QueuedTask|null
314
     */
315
    public function requestJob(array $capabilities, array $types = [])
316
    {
317
        $now = $this->getDateTime();
318
        $nowStr = $now->toDateTimeString();
319
        $driverName = $this->_getDriverName();
320
321
        $query = $this->find();
322
        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
323
        switch ($driverName) {
324
            case static::DRIVER_SQLSERVER:
325
                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
326
                break;
327
            case static::DRIVER_POSTGRES:
328
                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
329
                break;
330
        }
331
        $options = [
332
            'conditions' => [
333
                'completed IS' => null,
334
                'OR' => []
335
            ],
336
            'fields' => [
337
                'age' => $age
338
            ],
339
            'order' => [
340
                'age' => 'ASC',
341
                'id' => 'ASC'
342
            ]
343
        ];
344
345
        if ($types) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $types of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
346
            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
347
        }
348
349
        // Generate the task specific conditions.
350
        foreach ($capabilities as $task) {
351
            list ($plugin, $name) = pluginSplit($task['name']);
352
            $timeoutAt = $now->copy();
353
            $tmp = [
354
                'task' => $name,
355
                'AND' => [
356
                    [
357
                        'OR' => [
358
                            'not_before <' => $nowStr,
359
                            'not_before IS' => null
360
                        ]
361
                    ],
362
                    [
363
                        'OR' => [
364
                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
365
                            'fetched IS' => null
366
                        ]
367
                    ]
368
                ],
369
                'failed_count <' => ($task['retries'] + 1)
370
            ];
371
            $options['conditions']['OR'][] = $tmp;
372
        }
373
374
        /** @var \Queue\Model\Entity\QueuedTask|null $task */
375
        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
376
            $task = $query->find('all', $options)
377
                ->enableAutoFields(true)
378
                ->epilog('FOR UPDATE')
379
                ->first();
380
381
            if (!$task) {
382
                return null;
383
            }
384
385
            $key = $this->key();
386
            $task = $this->patchEntity($task, [
0 ignored issues
show
Bug introduced by
It seems like $task can also be of type array; however, parameter $entity of Queue\Model\Table\QueuedTasksTable::patchEntity() does only seem to accept Cake\Datasource\EntityInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

386
            $task = $this->patchEntity(/** @scrutinizer ignore-type */ $task, [
Loading history...
387
                'worker_key' => $key,
388
                'fetched' => $now
389
            ]);
390
391
            return $this->saveOrFail($task);
392
        });
393
394
        if (!$task) {
395
            return null;
396
        }
397
398
        return $task;
399
    }
400
401
    /**
402
     * Mark a task as Completed, removing it from the queue.
403
     *
404
     * @param \Queue\Model\Entity\QueuedTask $task Task
405
     * @return bool Success
406
     */
407
    public function markJobDone(QueuedTask $task)
408
    {
409
        $fields = [
410
            'completed' => $this->getDateTime()
411
        ];
412
        $task = $this->patchEntity($task, $fields);
413
414
        return (bool)$this->save($task);
415
    }
416
417
    /**
418
     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
419
     *
420
     * @param \Queue\Model\Entity\QueuedTask $task Task
421
     * @param string|null $failureMessage Optional message to append to the failure_message field.
422
     * @return bool Success
423
     */
424
    public function markJobFailed(QueuedTask $task, $failureMessage = null)
425
    {
426
        $fields = [
427
            'failed_count' => $task->failed_count + 1,
428
            'failure_message' => $failureMessage
429
        ];
430
        $task = $this->patchEntity($task, $fields);
431
432
        return (bool)$this->save($task);
433
    }
434
435
    /**
436
     * Reset current jobs
437
     *
438
     * @param int|null $id ID
439
     *
440
     * @return int Success
441
     */
442
    public function reset($id = null)
443
    {
444
        $fields = [
445
            'completed' => null,
446
            'fetched' => null,
447
            'failed_count' => 0,
448
            'worker_key' => null,
449
            'failure_message' => null
450
        ];
451
        $conditions = [
452
            'completed IS' => null
453
        ];
454
        if ($id) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $id of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
455
            $conditions['id'] = $id;
456
        }
457
458
        return $this->updateAll($fields, $conditions);
459
    }
460
461
    /**
462
     *
463
     * @param string $taskName Task name
464
     *
465
     * @return int
466
     */
467
    public function rerun($taskName)
468
    {
469
        $fields = [
470
            'completed' => null,
471
            'fetched' => null,
472
            'failed_count' => 0,
473
            'worker_key' => null,
474
            'failure_message' => null
475
        ];
476
        $conditions = [
477
            'completed IS NOT' => null,
478
            'task' => $taskName
479
        ];
480
481
        return $this->updateAll($fields, $conditions);
482
    }
483
484
    /**
485
     * Cleanup/Delete Completed Tasks.
486
     *
487
     * @return void
488
     */
489
    public function cleanOldJobs()
490
    {
491
        if (!Configure::read('Queue.cleanuptimeout')) {
492
            return;
493
        }
494
495
        $this->deleteAll([
496
            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
497
        ]);
498
    }
499
500
    /**
501
     *
502
     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
503
     * @param array $taskConfiguration Task configuration
504
     * @return string
505
     */
506
    public function getFailedStatus($queuedTask, array $taskConfiguration)
507
    {
508
        $failureMessageRequeued = 'requeued';
509
510
        $queuedTaskName = 'Queue' . $queuedTask->task;
511
        if (empty($taskConfiguration[$queuedTaskName])) {
512
            return $failureMessageRequeued;
513
        }
514
        $retries = $taskConfiguration[$queuedTaskName]['retries'];
515
        if ($queuedTask->failed_count <= $retries) {
516
            return $failureMessageRequeued;
517
        }
518
519
        return 'aborted';
520
    }
521
522
    /**
523
     * Generates a unique Identifier for the current worker thread.
524
     *
525
     * Useful to identify the currently running processes for this thread.
526
     *
527
     * @return string Identifier
528
     */
529
    public function key()
530
    {
531
        if ($this->_key !== null) {
532
            return $this->_key;
533
        }
534
        $this->_key = sha1(microtime());
535
        if (!$this->_key) {
536
            throw new RuntimeException('Invalid key generated');
537
        }
538
539
        return $this->_key;
540
    }
541
542
    /**
543
     * Resets worker Identifier
544
     *
545
     * @return void
546
     */
547
    public function clearKey()
548
    {
549
        $this->_key = null;
550
    }
551
552
    /**
553
     * truncate()
554
     *
555
     * @return void
556
     */
557
    public function truncate()
558
    {
559
        $sql = $this->getSchema()->truncateSql($this->_connection);
560
        foreach ($sql as $snippet) {
561
            $this->_connection->execute($snippet);
562
        }
563
    }
564
565
    /**
566
     * get the name of the driver
567
     *
568
     * @return string
569
     */
570
    protected function _getDriverName()
571
    {
572
        $className = explode('\\', $this->getConnection()->config()['driver']);
573
        $name = end($className);
574
575
        return $name;
576
    }
577
578
    /**
579
     *
580
     * @param array $conditions Conditions
581
     * @param string $key Key
582
     * @param array $values Values
583
     * @return array
584
     */
585
    protected function addFilter(array $conditions, $key, array $values)
586
    {
587
        $include = [];
588
        $exclude = [];
589
        foreach ($values as $value) {
590
            if (substr($value, 0, 1) === '-') {
591
                $exclude[] = substr($value, 1);
592
            } else {
593
                $include[] = $value;
594
            }
595
        }
596
597
        if ($include) {
598
            $conditions[$key . ' IN'] = $include;
599
        }
600
        if ($exclude) {
601
            $conditions[$key . ' NOT IN'] = $exclude;
602
        }
603
604
        return $conditions;
605
    }
606
607
    /**
608
     * Returns a DateTime object from different input.
609
     *
610
     * Without argument this will be "now".
611
     *
612
     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
613
     *
614
     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
615
     */
616
    protected function getDateTime($notBefore = null)
617
    {
618
        if (is_object($notBefore)) {
619
            return $notBefore;
620
        }
621
622
        return new FrozenTime($notBefore);
623
    }
624
}
625