Completed
Pull Request — master (#35)
by
unknown
03:01
created

QueuedTasksTable::truncate()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 0
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
<?php
2
namespace Queue\Model\Table;
3
4
use ArrayObject;
5
use Cake\Core\Configure;
6
use Cake\Event\Event;
7
use Cake\I18n\FrozenTime;
8
use Cake\ORM\Query;
9
use Cake\ORM\Table;
10
use InvalidArgumentException;
11
use Queue\Model\Entity\QueuedTask;
12
use RuntimeException;
13
14
/**
15
 * QueuedTasks Model
16
 *
17
 * @method \Queue\Model\Entity\QueuedTask get($primaryKey, $options = [])
18
 * @method \Queue\Model\Entity\QueuedTask newEntity($data = null, array $options = [])
19
 * @method \Queue\Model\Entity\QueuedTask[] newEntities(array $data, array $options = [])
20
 * @method \Queue\Model\Entity\QueuedTask|bool save(\Cake\Datasource\EntityInterface $entity, $options = [])
21
 * @method \Queue\Model\Entity\QueuedTask saveOrFail(\Cake\Datasource\EntityInterface $entity, $options = [])
22
 * @method \Queue\Model\Entity\QueuedTask patchEntity(\Cake\Datasource\EntityInterface $entity, array $data, array $options = [])
23
 * @method \Queue\Model\Entity\QueuedTask[] patchEntities($entities, array $data, array $options = [])
24
 * @method \Queue\Model\Entity\QueuedTask findOrCreate($search, callable $callback = null, $options = [])
25
 *
26
 * @mixin \Cake\ORM\Behavior\TimestampBehavior
27
 */
28
class QueuedTasksTable extends Table
29
{
30
31
    const DRIVER_MYSQL = 'Mysql';
32
33
    const DRIVER_POSTGRES = 'Postgres';
34
35
    const DRIVER_SQLSERVER = 'Sqlserver';
36
37
    const STATS_LIMIT = 100000;
38
39
    /**
40
     *
41
     * @var string|null
42
     */
43
    protected $_key;
44
45
    /**
46
     * Initialize method
47
     *
48
     * @param array $config The configuration for the Table.
49
     * @return void
50
     */
51
    public function initialize(array $config)
52
    {
53
        parent::initialize($config);
54
55
        $this->setTable('queued_tasks');
56
        $this->setDisplayField('id');
57
        $this->setPrimaryKey('id');
58
59
        $this->addBehavior('Timestamp');
60
    }
61
62
    /**
63
     * set connection name
64
     *
65
     * @return string
66
     */
67
    public static function defaultConnectionName()
68
    {
69
        $connection = Configure::read('Queue.connection');
70
        if (!empty($connection)) {
71
            return $connection;
72
        }
73
74
        return parent::defaultConnectionName();
75
    }
76
77
    /**
78
     *
79
     * @param \Cake\Event\Event $event Model event
80
     * @param \ArrayObject $data The data
81
     * @param \ArrayObject $options The options
82
     * @return void
83
     */
84
    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

84
    public function beforeMarshal(Event $event, ArrayObject $data, /** @scrutinizer ignore-unused */ ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

84
    public function beforeMarshal(/** @scrutinizer ignore-unused */ Event $event, ArrayObject $data, ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
85
    {
86
        if (isset($data['data']) && $data['data'] === '') {
87
            $data['data'] = null;
88
        }
89
    }
90
91
    /**
92
     * Adds a new job to the queue.
93
     *
94
     * @param string $taskName Task name
95
     * @param array|null $data Array of data
96
     * @param string $notBefore A datetime which indicates when the job may be executed
97
     * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
     */
99
    public function createJob($taskName, array $data = null, string $notBefore = null)
100
    {
101
        $task = [
102
            'task' => $taskName,
103
            'data' => serialize($data),
104
            'not_before' => $this->getDateTime()
105
        ];
106
107
        if (!empty($notBefore)) {
108
            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
        }
110
111
        $queuedTask = $this->newEntity($task);
112
        var_dump($task);
0 ignored issues
show
Security Debugging Code introduced by
var_dump($task) looks like debug code. Are you sure you do not want to remove it?
Loading history...
113
        var_dump($queuedTask);
114
115
        return $this->saveOrFail($queuedTask);
116
    }
117
118
    /**
119
     *
120
     * @param string|null $taskName Task name
121
     * @return bool
122
     */
123
    public function isQueued($taskName = null)
124
    {
125
        $conditions = [
126
            'completed IS' => null
127
        ];
128
        if ($taskName) {
129
            $conditions['task'] = $taskName;
130
        }
131
132
        return (bool)$this->find()
133
            ->where($conditions)
134
            ->select([
135
                'id'
136
            ])
137
            ->first();
138
    }
139
140
    /**
141
     * Returns the number of items in the queue.
142
     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
143
     *
144
     * @param string|null $taskName Task type to Count
145
     * @return int
146
     */
147
    public function getLength($taskName = null)
148
    {
149
        $findConf = [
150
            'conditions' => [
151
                'completed IS' => null
152
            ]
153
        ];
154
        if ($taskName !== null) {
155
            $findConf['conditions']['task'] = $taskName;
156
        }
157
158
        return $this->find('all', $findConf)->count();
159
    }
160
161
    /**
162
     * Return a list of all task types in the Queue.
163
     *
164
     * @return \Cake\ORM\Query
165
     */
166
    public function getTypes()
167
    {
168
        $findCond = [
169
            'fields' => [
170
                'task'
171
            ],
172
            'group' => [
173
                'task'
174
            ],
175
            'keyField' => 'task',
176
            'valueField' => 'task'
177
        ];
178
179
        return $this->find('list', $findCond);
180
    }
181
182
    /**
183
     * Return some statistics about finished jobs still in the Database.
184
     * TO-DO: rewrite as virtual field
185
     *
186
     * @return \Cake\ORM\Query
187
     */
188
    public function getStats()
189
    {
190
        $driverName = $this->_getDriverName();
191
        $options = [
192
            'fields' => function (Query $query) use ($driverName) {
193
                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
194
                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
195
                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
196
                switch ($driverName) {
197
                    case static::DRIVER_SQLSERVER:
198
                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
199
                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
200
                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
201
                        break;
202
                }
203
                /**
204
                 *
205
                 * @var \Cake\ORM\Query
206
                 */
207
                return [
208
                    'task',
209
                    'num' => $query->func()->count('*'),
210
                    'alltime' => $alltime,
211
                    'runtime' => $runtime,
212
                    'fetchdelay' => $fetchdelay
213
                ];
214
            },
215
            'conditions' => [
216
                'completed IS NOT' => null
217
            ],
218
            'group' => [
219
                'task'
220
            ]
221
        ];
222
223
        return $this->find('all', $options);
224
    }
225
226
    /**
227
     * Returns [
228
     * 'Task' => [
229
     * 'YYYY-MM-DD' => INT,
230
     * ...
231
     * ]
232
     * ]
233
     *
234
     * @param string|null $taskName The task name
235
     * @return array
236
     */
237
    public function getFullStats($taskName = null)
238
    {
239
        $driverName = $this->_getDriverName();
240
        $fields = function (Query $query) use ($driverName) {
241
            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
242
            switch ($driverName) {
243
                case static::DRIVER_SQLSERVER:
244
                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
245
                    break;
246
            }
247
248
            return [
249
                'task',
250
                'created',
251
                'duration' => $runtime
252
            ];
253
        };
254
255
        $conditions = [
256
            'completed IS NOT' => null
257
        ];
258
        if ($taskName) {
259
            $conditions['task'] = $taskName;
260
        }
261
262
        $tasks = $this->find()
263
            ->select($fields)
264
            ->where($conditions)
265
            ->enableHydration(false)
266
            ->orderDesc('id')
267
            ->limit(static::STATS_LIMIT)
268
            ->all()
269
            ->toArray();
270
271
        $result = [];
272
273
        $days = [];
274
275
        foreach ($tasks as $task) {
276
            /** @var \DateTime $created */
277
            $created = $task['created'];
278
            $day = $created->format('Y-m-d');
279
            if (!isset($days[$day])) {
280
                $days[$day] = $day;
281
            }
282
283
            $result[$task['task']][$day][] = $task['duration'];
284
        }
285
286
        foreach ($result as $type => $tasks) {
287
            foreach ($tasks as $day => $durations) {
288
                $average = array_sum($durations) / count($durations);
289
                $result[$type][$day] = (int)$average;
290
            }
291
292
            foreach ($days as $day) {
293
                if (isset($result[$type][$day])) {
294
                    continue;
295
                }
296
297
                $result[$type][$day] = 0;
298
            }
299
300
            ksort($result[$type]);
301
        }
302
303
        return $result;
304
    }
305
306
    /**
307
     * Look for a new job that can be processed with the current abilities and
308
     * from the specified group (or any if null).
309
     *
310
     * @param array $capabilities Available QueueWorkerTasks.
311
     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
312
     * @return \Queue\Model\Entity\QueuedTask|null
313
     */
314
    public function requestJob(array $capabilities, array $types = [])
315
    {
316
        $now = $this->getDateTime();
317
        $nowStr = $now->toDateTimeString();
318
        $driverName = $this->_getDriverName();
319
320
        $query = $this->find();
321
        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
322
        switch ($driverName) {
323
            case static::DRIVER_SQLSERVER:
324
                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
325
                break;
326
            case static::DRIVER_POSTGRES:
327
                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
328
                break;
329
        }
330
        $options = [
331
            'conditions' => [
332
                'completed IS' => null,
333
                'OR' => []
334
            ],
335
            'fields' => [
336
                'age' => $age
337
            ],
338
            'order' => [
339
                'age' => 'ASC',
340
                'id' => 'ASC'
341
            ]
342
        ];
343
344
        if ($types) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $types of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
345
            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
346
        }
347
348
        // Generate the task specific conditions.
349
        foreach ($capabilities as $task) {
350
            list ($plugin, $name) = pluginSplit($task['name']);
351
            $timeoutAt = $now->copy();
352
            $tmp = [
353
                'task' => $name,
354
                'AND' => [
355
                    [
356
                        'OR' => [
357
                            'not_before <' => $nowStr,
358
                            'not_before IS' => null
359
                        ]
360
                    ],
361
                    [
362
                        'OR' => [
363
                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
364
                            'fetched IS' => null
365
                        ]
366
                    ]
367
                ],
368
                'failed_count <' => ($task['retries'] + 1)
369
            ];
370
            $options['conditions']['OR'][] = $tmp;
371
        }
372
373
        /** @var \Queue\Model\Entity\QueuedTask|null $task */
374
        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
375
            $task = $query->find('all', $options)
376
                ->enableAutoFields(true)
377
                ->epilog('FOR UPDATE')
378
                ->first();
379
380
            if (!$task) {
381
                return null;
382
            }
383
384
            $key = $this->key();
385
            $task = $this->patchEntity($task, [
0 ignored issues
show
Bug introduced by
It seems like $task can also be of type array; however, parameter $entity of Queue\Model\Table\QueuedTasksTable::patchEntity() does only seem to accept Cake\Datasource\EntityInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

385
            $task = $this->patchEntity(/** @scrutinizer ignore-type */ $task, [
Loading history...
386
                'worker_key' => $key,
387
                'fetched' => $now
388
            ]);
389
390
            return $this->saveOrFail($task);
391
        });
392
393
        if (!$task) {
394
            return null;
395
        }
396
397
        return $task;
398
    }
399
400
    /**
401
     * Mark a task as Completed, removing it from the queue.
402
     *
403
     * @param \Queue\Model\Entity\QueuedTask $task Task
404
     * @return bool Success
405
     */
406
    public function markJobDone(QueuedTask $task)
407
    {
408
        $fields = [
409
            'completed' => $this->getDateTime()
410
        ];
411
        $task = $this->patchEntity($task, $fields);
412
413
        return (bool)$this->save($task);
414
    }
415
416
    /**
417
     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
418
     *
419
     * @param \Queue\Model\Entity\QueuedTask $task Task
420
     * @param string|null $failureMessage Optional message to append to the failure_message field.
421
     * @return bool Success
422
     */
423
    public function markJobFailed(QueuedTask $task, $failureMessage = null)
424
    {
425
        $fields = [
426
            'failed_count' => $task->failed_count + 1,
427
            'failure_message' => $failureMessage
428
        ];
429
        $task = $this->patchEntity($task, $fields);
430
431
        return (bool)$this->save($task);
432
    }
433
434
    /**
435
     * Reset current jobs
436
     *
437
     * @param int|null $id ID
438
     *
439
     * @return int Success
440
     */
441
    public function reset($id = null)
442
    {
443
        $fields = [
444
            'completed' => null,
445
            'fetched' => null,
446
            'failed_count' => 0,
447
            'worker_key' => null,
448
            'failure_message' => null
449
        ];
450
        $conditions = [
451
            'completed IS' => null
452
        ];
453
        if ($id) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $id of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
454
            $conditions['id'] = $id;
455
        }
456
457
        return $this->updateAll($fields, $conditions);
458
    }
459
460
    /**
461
     *
462
     * @param string $taskName Task name
463
     *
464
     * @return int
465
     */
466
    public function rerun($taskName)
467
    {
468
        $fields = [
469
            'completed' => null,
470
            'fetched' => null,
471
            'failed_count' => 0,
472
            'worker_key' => null,
473
            'failure_message' => null
474
        ];
475
        $conditions = [
476
            'completed IS NOT' => null,
477
            'task' => $taskName
478
        ];
479
480
        return $this->updateAll($fields, $conditions);
481
    }
482
483
    /**
484
     * Cleanup/Delete Completed Tasks.
485
     *
486
     * @return void
487
     */
488
    public function cleanOldJobs()
489
    {
490
        if (!Configure::read('Queue.cleanuptimeout')) {
491
            return;
492
        }
493
494
        $this->deleteAll([
495
            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
496
        ]);
497
    }
498
499
    /**
500
     *
501
     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
502
     * @param array $taskConfiguration Task configuration
503
     * @return string
504
     */
505
    public function getFailedStatus($queuedTask, array $taskConfiguration)
506
    {
507
        $failureMessageRequeued = 'requeued';
508
509
        $queuedTaskName = 'Queue' . $queuedTask->task;
510
        if (empty($taskConfiguration[$queuedTaskName])) {
511
            return $failureMessageRequeued;
512
        }
513
        $retries = $taskConfiguration[$queuedTaskName]['retries'];
514
        if ($queuedTask->failed_count <= $retries) {
515
            return $failureMessageRequeued;
516
        }
517
518
        return 'aborted';
519
    }
520
521
    /**
522
     * Generates a unique Identifier for the current worker thread.
523
     *
524
     * Useful to identify the currently running processes for this thread.
525
     *
526
     * @return string Identifier
527
     */
528
    public function key()
529
    {
530
        if ($this->_key !== null) {
531
            return $this->_key;
532
        }
533
        $this->_key = sha1(microtime());
534
        if (!$this->_key) {
535
            throw new RuntimeException('Invalid key generated');
536
        }
537
538
        return $this->_key;
539
    }
540
541
    /**
542
     * Resets worker Identifier
543
     *
544
     * @return void
545
     */
546
    public function clearKey()
547
    {
548
        $this->_key = null;
549
    }
550
551
    /**
552
     * truncate()
553
     *
554
     * @return void
555
     */
556
    public function truncate()
557
    {
558
        $sql = $this->getSchema()->truncateSql($this->_connection);
559
        foreach ($sql as $snippet) {
560
            $this->_connection->execute($snippet);
561
        }
562
    }
563
564
    /**
565
     * get the name of the driver
566
     *
567
     * @return string
568
     */
569
    protected function _getDriverName()
570
    {
571
        $className = explode('\\', $this->getConnection()->config()['driver']);
572
        $name = end($className);
573
574
        return $name;
575
    }
576
577
    /**
578
     *
579
     * @param array $conditions Conditions
580
     * @param string $key Key
581
     * @param array $values Values
582
     * @return array
583
     */
584
    protected function addFilter(array $conditions, $key, array $values)
585
    {
586
        $include = [];
587
        $exclude = [];
588
        foreach ($values as $value) {
589
            if (substr($value, 0, 1) === '-') {
590
                $exclude[] = substr($value, 1);
591
            } else {
592
                $include[] = $value;
593
            }
594
        }
595
596
        if ($include) {
597
            $conditions[$key . ' IN'] = $include;
598
        }
599
        if ($exclude) {
600
            $conditions[$key . ' NOT IN'] = $exclude;
601
        }
602
603
        return $conditions;
604
    }
605
606
    /**
607
     * Returns a DateTime object from different input.
608
     *
609
     * Without argument this will be "now".
610
     *
611
     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
612
     *
613
     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
614
     */
615
    protected function getDateTime($notBefore = null)
616
    {
617
        if (is_object($notBefore)) {
618
            return $notBefore;
619
        }
620
621
        return new FrozenTime($notBefore);
622
    }
623
}
624