Completed
Pull Request — master (#35)
by
unknown
03:07
created

QueuedTasksTable::key()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 6
nc 3
nop 0
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
<?php
2
namespace Queue\Model\Table;
3
4
use ArrayObject;
5
use Cake\Core\Configure;
6
use Cake\Event\Event;
7
use Cake\I18n\FrozenTime;
8
use Cake\ORM\Query;
9
use Cake\ORM\Table;
10
use InvalidArgumentException;
11
use Queue\Model\Entity\QueuedTask;
12
use RuntimeException;
13
14
/**
15
 * QueuedTasks Model
16
 *
17
 * @method \Queue\Model\Entity\QueuedTask get($primaryKey, $options = [])
18
 * @method \Queue\Model\Entity\QueuedTask newEntity($data = null, array $options = [])
19
 * @method \Queue\Model\Entity\QueuedTask[] newEntities(array $data, array $options = [])
20
 * @method \Queue\Model\Entity\QueuedTask|bool save(\Cake\Datasource\EntityInterface $entity, $options = [])
21
 * @method \Queue\Model\Entity\QueuedTask saveOrFail(\Cake\Datasource\EntityInterface $entity, $options = [])
22
 * @method \Queue\Model\Entity\QueuedTask patchEntity(\Cake\Datasource\EntityInterface $entity, array $data, array $options = [])
23
 * @method \Queue\Model\Entity\QueuedTask[] patchEntities($entities, array $data, array $options = [])
24
 * @method \Queue\Model\Entity\QueuedTask findOrCreate($search, callable $callback = null, $options = [])
25
 *
26
 * @mixin \Cake\ORM\Behavior\TimestampBehavior
27
 */
28
class QueuedTasksTable extends Table
29
{
30
31
    const DRIVER_MYSQL = 'Mysql';
32
33
    const DRIVER_POSTGRES = 'Postgres';
34
35
    const DRIVER_SQLSERVER = 'Sqlserver';
36
37
    const STATS_LIMIT = 100000;
38
39
    /**
40
     *
41
     * @var string|null
42
     */
43
    protected $_key;
44
45
    /**
46
     * Initialize method
47
     *
48
     * @param array $config The configuration for the Table.
49
     * @return void
50
     */
51
    public function initialize(array $config)
52
    {
53
        parent::initialize($config);
54
55
        $this->setTable('queued_tasks');
56
        $this->setDisplayField('id');
57
        $this->setPrimaryKey('id');
58
59
        $this->addBehavior('Timestamp');
60
    }
61
62
    /**
63
     * set connection name
64
     *
65
     * @return string
66
     */
67
    public static function defaultConnectionName()
68
    {
69
        $connection = Configure::read('Queue.connection');
70
        if (!empty($connection)) {
71
            return $connection;
72
        }
73
74
        return parent::defaultConnectionName();
75
    }
76
77
    /**
78
     *
79
     * @param \Cake\Event\Event $event
80
     * @param \ArrayObject $data
81
     * @param \ArrayObject $options
82
     * @return void
83
     */
84
    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

84
    public function beforeMarshal(Event $event, ArrayObject $data, /** @scrutinizer ignore-unused */ ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

84
    public function beforeMarshal(/** @scrutinizer ignore-unused */ Event $event, ArrayObject $data, ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
85
    {
86
        if (isset($data['data']) && $data['data'] === '') {
87
            $data['data'] = null;
88
        }
89
    }
90
91
    /**
92
     * Adds a new job to the queue.
93
     *
94
     * @param string $taskName Task name
95
     * @param array|null $data Array of data
96
     * @param string $notBefore A datetime which indicates when the job may be executed
97
     * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
     */
99
    public function createJob($taskName, array $data = null, string $notBefore = null)
100
    {
101
        $task = [
102
            'task' => $taskName,
103
            'data' => serialize($data),
104
            'not_before' => $this->getDateTime()
105
        ];
106
107
        if (!empty($notBefore)) {
108
            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
        }
110
111
        $queuedTask = $this->newEntity($task);
112
113
        return $this->saveOrFail($queuedTask);
114
    }
115
116
    /**
117
     *
118
     * @param string|null $taskName
119
     *
120
     * @return bool
121
     *
122
     * @throws \InvalidArgumentException
123
     */
124
    public function isQueued($taskName = null)
125
    {
126
        $conditions = [
127
            'completed IS' => null
128
        ];
129
        if ($taskName) {
130
            $conditions['task'] = $taskName;
131
        }
132
133
        return (bool)$this->find()
134
            ->where($conditions)
135
            ->select([
136
            'id'
137
        ])
138
            ->first();
139
    }
140
141
    /**
142
     * Returns the number of items in the queue.
143
     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
144
     *
145
     * @param string|null $taskName Task type to Count
146
     * @return int
147
     */
148
    public function getLength($taskName = null)
149
    {
150
        $findConf = [
151
            'conditions' => [
152
                'completed IS' => null
153
            ]
154
        ];
155
        if ($taskName !== null) {
156
            $findConf['conditions']['task'] = $taskName;
157
        }
158
159
        return $this->find('all', $findConf)->count();
160
    }
161
162
    /**
163
     * Return a list of all task types in the Queue.
164
     *
165
     * @return \Cake\ORM\Query
166
     */
167
    public function getTypes()
168
    {
169
        $findCond = [
170
            'fields' => [
171
                'task'
172
            ],
173
            'group' => [
174
                'task'
175
            ],
176
            'keyField' => 'task',
177
            'valueField' => 'task'
178
        ];
179
        return $this->find('list', $findCond);
180
    }
181
182
    /**
183
     * Return some statistics about finished jobs still in the Database.
184
     * TO-DO: rewrite as virtual field
185
     *
186
     * @return \Cake\ORM\Query
187
     */
188
    public function getStats()
189
    {
190
        $driverName = $this->_getDriverName();
191
        $options = [
192
            'fields' => function (Query $query) use ($driverName) {
193
                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
194
                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
195
                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
196
                switch ($driverName) {
197
                    case static::DRIVER_SQLSERVER:
198
                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
199
                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
200
                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
201
                        break;
202
                }
203
                /**
204
                 *
205
                 * @var \Cake\ORM\Query
206
                 */
207
                return [
208
                    'task',
209
                    'num' => $query->func()->count('*'),
210
                    'alltime' => $alltime,
211
                    'runtime' => $runtime,
212
                    'fetchdelay' => $fetchdelay
213
                ];
214
            },
215
            'conditions' => [
216
                'completed IS NOT' => null
217
            ],
218
            'group' => [
219
                'task'
220
            ]
221
        ];
222
        return $this->find('all', $options);
223
    }
224
225
    /**
226
     * Returns [
227
     * 'Task' => [
228
     * 'YYYY-MM-DD' => INT,
229
     * ...
230
     * ]
231
     * ]
232
     *
233
     * @param string|null $task
234
     * @return array
235
     */
236
    public function getFullStats($taskName = null)
237
    {
238
        $driverName = $this->_getDriverName();
239
        $fields = function (Query $query) use ($driverName) {
240
            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
241
            switch ($driverName) {
242
                case static::DRIVER_SQLSERVER:
243
                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
244
                    break;
245
            }
246
247
            return [
248
                'task',
249
                'created',
250
                'duration' => $runtime
251
            ];
252
        };
253
254
        $conditions = [
255
            'completed IS NOT' => null
256
        ];
257
        if ($taskName) {
258
            $conditions['task'] = $taskName;
259
        }
260
261
        $tasks = $this->find()
262
            ->select($fields)
263
            ->where($conditions)
264
            ->enableHydration(false)
265
            ->orderDesc('id')
266
            ->limit(static::STATS_LIMIT)
267
            ->all()
268
            ->toArray();
269
270
        $result = [];
271
272
        $days = [];
273
274
        foreach ($tasks as $task) {
275
            /** @var \DateTime $created */
276
            $created = $task['created'];
277
            $day = $created->format('Y-m-d');
278
            if (!isset($days[$day])) {
279
                $days[$day] = $day;
280
            }
281
282
            $result[$task['task']][$day][] = $task['duration'];
283
        }
284
285
        foreach ($result as $type => $tasks) {
286
            foreach ($tasks as $day => $durations) {
287
                $average = array_sum($durations) / count($durations);
288
                $result[$type][$day] = (int)$average;
289
            }
290
291
            foreach ($days as $day) {
292
                if (isset($result[$type][$day])) {
293
                    continue;
294
                }
295
296
                $result[$type][$day] = 0;
297
            }
298
299
            ksort($result[$type]);
300
        }
301
302
        return $result;
303
    }
304
305
    /**
306
     * Look for a new job that can be processed with the current abilities and
307
     * from the specified group (or any if null).
308
     *
309
     * @param array $capabilities Available QueueWorkerTasks.
310
     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
311
     * @return \Queue\Model\Entity\QueuedTask|null
312
     */
313
    public function requestJob(array $capabilities, array $types = [])
314
    {
315
        $now = $this->getDateTime();
316
        $nowStr = $now->toDateTimeString();
317
        $driverName = $this->_getDriverName();
318
319
        $query = $this->find();
320
        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
321
        switch ($driverName) {
322
            case static::DRIVER_SQLSERVER:
323
                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
324
                break;
325
            case static::DRIVER_POSTGRES:
326
                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
327
                break;
328
        }
329
        $options = [
330
            'conditions' => [
331
                'completed IS' => null,
332
                'OR' => []
333
            ],
334
            'fields' => [
335
                'age' => $age
336
            ],
337
            'order' => [
338
                'age' => 'ASC',
339
                'id' => 'ASC'
340
            ]
341
        ];
342
343
        if ($types) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $types of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
344
            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
345
        }
346
347
        // Generate the task specific conditions.
348
        foreach ($capabilities as $task) {
349
            list ($plugin, $name) = pluginSplit($task['name']);
350
            $timeoutAt = $now->copy();
351
            $tmp = [
352
                'task' => $name,
353
                'AND' => [
354
                    [
355
                        'OR' => [
356
                            'not_before <' => $nowStr,
357
                            'not_before IS' => null
358
                        ]
359
                    ],
360
                    [
361
                        'OR' => [
362
                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
363
                            'fetched IS' => null
364
                        ]
365
                    ]
366
                ],
367
                'failed_count <' => ($task['retries'] + 1)
368
            ];
369
            $options['conditions']['OR'][] = $tmp;
370
        }
371
372
        /** @var \Queue\Model\Entity\QueuedTask|null $task */
373
        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
374
            $task = $query->find('all', $options)
375
                ->enableAutoFields(true)
376
                ->epilog('FOR UPDATE')
377
                ->first();
378
379
            if (!$task) {
380
                return null;
381
            }
382
383
            $key = $this->key();
384
            $task = $this->patchEntity($task, [
0 ignored issues
show
Bug introduced by
It seems like $task can also be of type array; however, parameter $entity of Queue\Model\Table\QueuedTasksTable::patchEntity() does only seem to accept Cake\Datasource\EntityInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

384
            $task = $this->patchEntity(/** @scrutinizer ignore-type */ $task, [
Loading history...
385
                'worker_key' => $key,
386
                'fetched' => $now
387
            ]);
388
389
            return $this->saveOrFail($task);
390
        });
391
392
        if (!$task) {
393
            return null;
394
        }
395
396
        return $task;
397
    }
398
399
    /**
400
     * Mark a task as Completed, removing it from the queue.
401
     *
402
     * @param \Queue\Model\Entity\QueuedTask $task Task
403
     * @return bool Success
404
     */
405
    public function markJobDone(QueuedTask $task)
406
    {
407
        $fields = [
408
            'completed' => $this->getDateTime()
409
        ];
410
        $task = $this->patchEntity($task, $fields);
411
412
        return (bool)$this->save($task);
413
    }
414
415
    /**
416
     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
417
     *
418
     * @param \Queue\Model\Entity\QueuedTask $task Task
419
     * @param string|null $failureMessage Optional message to append to the failure_message field.
420
     * @return bool Success
421
     */
422
    public function markJobFailed(QueuedTask $task, $failureMessage = null)
423
    {
424
        $fields = [
425
            'failed_count' => $task->failed_count + 1,
426
            'failure_message' => $failureMessage
427
        ];
428
        $task = $this->patchEntity($task, $fields);
429
430
        return (bool)$this->save($task);
431
    }
432
433
    /**
434
     * Reset current jobs
435
     *
436
     * @param int|null $id
437
     *
438
     * @return int Success
439
     */
440
    public function reset($id = null)
441
    {
442
        $fields = [
443
            'completed' => null,
444
            'fetched' => null,
445
            'failed_count' => 0,
446
            'worker_key' => null,
447
            'failure_message' => null
448
        ];
449
        $conditions = [
450
            'completed IS' => null
451
        ];
452
        if ($id) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $id of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
453
            $conditions['id'] = $id;
454
        }
455
456
        return $this->updateAll($fields, $conditions);
457
    }
458
459
    /**
460
     *
461
     * @param string $taskName
462
     *
463
     * @return int
464
     */
465
    public function rerun($taskName)
466
    {
467
        $fields = [
468
            'completed' => null,
469
            'fetched' => null,
470
            'failed_count' => 0,
471
            'worker_key' => null,
472
            'failure_message' => null
473
        ];
474
        $conditions = [
475
            'completed IS NOT' => null,
476
            'task' => $taskName
477
        ];
478
479
        return $this->updateAll($fields, $conditions);
480
    }
481
482
    /**
483
     * Cleanup/Delete Completed Tasks.
484
     *
485
     * @return void
486
     */
487
    public function cleanOldJobs()
488
    {
489
        if (!Configure::read('Queue.cleanuptimeout')) {
490
            return;
491
        }
492
493
        $this->deleteAll([
494
            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
495
        ]);
496
    }
497
498
    /**
499
     *
500
     * @param \Queue\Model\Entity\QueuedTask $queuedTask
501
     * @param array $taskConfiguration
502
     * @return string
503
     */
504
    public function getFailedStatus($queuedTask, array $taskConfiguration)
505
    {
506
        $failureMessageRequeued = 'requeued';
507
508
        $queuedTaskName = 'Queue' . $queuedTask->task;
509
        if (empty($taskConfiguration[$queuedTaskName])) {
510
            return $failureMessageRequeued;
511
        }
512
        $retries = $taskConfiguration[$queuedTaskName]['retries'];
513
        if ($queuedTask->failed_count <= $retries) {
514
            return $failureMessageRequeued;
515
        }
516
517
        return 'aborted';
518
    }
519
520
    /**
521
     * Generates a unique Identifier for the current worker thread.
522
     *
523
     * Useful to identify the currently running processes for this thread.
524
     *
525
     * @return string Identifier
526
     */
527
    public function key()
528
    {
529
        if ($this->_key !== null) {
530
            return $this->_key;
531
        }
532
        $this->_key = sha1(microtime());
533
        if (!$this->_key) {
534
            throw new RuntimeException('Invalid key generated');
535
        }
536
537
        return $this->_key;
538
    }
539
540
    /**
541
     * Resets worker Identifier
542
     *
543
     * @return void
544
     */
545
    public function clearKey()
546
    {
547
        $this->_key = null;
548
    }
549
550
    /**
551
     * truncate()
552
     *
553
     * @return void
554
     */
555
    public function truncate()
556
    {
557
        $sql = $this->getSchema()->truncateSql($this->_connection);
558
        foreach ($sql as $snippet) {
559
            $this->_connection->execute($snippet);
560
        }
561
    }
562
563
    /**
564
     * get the name of the driver
565
     *
566
     * @return string
567
     */
568
    protected function _getDriverName()
569
    {
570
        $className = explode('\\', $this->getConnection()->config()['driver']);
571
        $name = end($className);
572
573
        return $name;
574
    }
575
576
    /**
577
     *
578
     * @param array $conditions
579
     * @param string $key
580
     * @param array $values
581
     * @return array
582
     */
583
    protected function addFilter(array $conditions, $key, array $values)
584
    {
585
        $include = [];
586
        $exclude = [];
587
        foreach ($values as $value) {
588
            if (substr($value, 0, 1) === '-') {
589
                $exclude[] = substr($value, 1);
590
            } else {
591
                $include[] = $value;
592
            }
593
        }
594
595
        if ($include) {
596
            $conditions[$key . ' IN'] = $include;
597
        }
598
        if ($exclude) {
599
            $conditions[$key . ' NOT IN'] = $exclude;
600
        }
601
602
        return $conditions;
603
    }
604
605
    /**
606
     * Returns a DateTime object from different input.
607
     *
608
     * Without argument this will be "now".
609
     *
610
     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore
611
     *
612
     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
613
     */
614
    protected function getDateTime($notBefore = null)
615
    {
616
        if (is_object($notBefore)) {
617
            return $notBefore;
618
        }
619
620
        return new FrozenTime($notBefore);
621
    }
622
}
623