Completed
Pull Request — master (#35)
by
unknown
03:34
created

QueuedTasksTable::createJob()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 8
nc 2
nop 3
dl 0
loc 15
rs 10
c 0
b 0
f 0
1
<?php
2
namespace Queue\Model\Table;
3
4
use ArrayObject;
5
use Cake\Core\Configure;
6
use Cake\Event\Event;
7
use Cake\I18n\FrozenTime;
8
use Cake\ORM\Query;
9
use Cake\ORM\Table;
10
use InvalidArgumentException;
11
use Queue\Model\Entity\QueuedTask;
12
use RuntimeException;
13
14
/**
15
 * QueuedTasks Model
16
 *
17
 * @method \Queue\Model\Entity\QueuedTask get($primaryKey, $options = [])
18
 * @method \Queue\Model\Entity\QueuedTask newEntity($data = null, array $options = [])
19
 * @method \Queue\Model\Entity\QueuedTask[] newEntities(array $data, array $options = [])
20
 * @method \Queue\Model\Entity\QueuedTask|bool save(\Cake\Datasource\EntityInterface $entity, $options = [])
21
 * @method \Queue\Model\Entity\QueuedTask saveOrFail(\Cake\Datasource\EntityInterface $entity, $options = [])
22
 * @method \Queue\Model\Entity\QueuedTask patchEntity(\Cake\Datasource\EntityInterface $entity, array $data, array $options = [])
23
 * @method \Queue\Model\Entity\QueuedTask[] patchEntities($entities, array $data, array $options = [])
24
 * @method \Queue\Model\Entity\QueuedTask findOrCreate($search, callable $callback = null, $options = [])
25
 *
26
 * @mixin \Cake\ORM\Behavior\TimestampBehavior
27
 */
28
class QueuedTasksTable extends Table
29
{
30
31
    const DRIVER_MYSQL = 'Mysql';
32
33
    const DRIVER_POSTGRES = 'Postgres';
34
35
    const DRIVER_SQLSERVER = 'Sqlserver';
36
37
    const STATS_LIMIT = 100000;
38
39
    /**
40
     *
41
     * @var string|null
42
     */
43
    protected $_key;
44
45
    /**
46
     * Initialize method
47
     *
48
     * @param array $config
49
     *            The configuration for the Table.
50
     * @return void
51
     */
52
    public function initialize(array $config)
53
    {
54
        parent::initialize($config);
55
56
        $this->setTable('queued_tasks');
57
        $this->setDisplayField('id');
58
        $this->setPrimaryKey('id');
59
60
        $this->addBehavior('Timestamp');
61
    }
62
63
    /**
64
     * set connection name
65
     *
66
     * @return string
67
     */
68
    public static function defaultConnectionName()
69
    {
70
        $connection = Configure::read('Queue.connection');
71
        if (! empty($connection)) {
72
            return $connection;
73
        }
74
75
        return parent::defaultConnectionName();
76
    }
77
78
    /**
79
     *
80
     * @param \Cake\Event\Event $event
81
     * @param \ArrayObject $data
82
     * @param \ArrayObject $options
83
     * @return void
84
     */
85
    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

85
    public function beforeMarshal(Event $event, ArrayObject $data, /** @scrutinizer ignore-unused */ ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

85
    public function beforeMarshal(/** @scrutinizer ignore-unused */ Event $event, ArrayObject $data, ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
86
    {
87
        if (isset($data['data']) && $data['data'] === '') {
88
            $data['data'] = null;
89
        }
90
    }
91
92
    /**
93
     * Adds a new job to the queue.
94
     *
95
     * @param string $taskName
96
     *            Task name
97
     * @param array|null $data
98
     *            Array of data
99
     * @param string $notBefore
100
     *            A datetime which indicates when the job may be executed
101
     * @return \Queue\Model\Entity\QueuedTask Saved job entity
102
     */
103
    public function createJob($taskName, array $data = null, string $notBefore = null)
104
    {
105
        $task = [
106
            'task' => $taskName,
107
            'data' => serialize($data),
108
            'not_before' => $this->getDateTime()
109
        ];
110
111
        if (! empty($notBefore)) {
112
            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
113
        }
114
115
        $queuedTask = $this->newEntity($task);
116
117
        return $this->saveOrFail($queuedTask);
118
    }
119
120
    /**
121
     *
122
     * @param string|null $taskName
123
     *
124
     * @return bool
125
     *
126
     * @throws \InvalidArgumentException
127
     */
128
    public function isQueued($taskName = null)
129
    {
130
        $conditions = [
131
            'completed IS' => null
132
        ];
133
        if ($taskName) {
134
            $conditions['task'] = $taskName;
135
        }
136
137
        return (bool) $this->find()
138
            ->where($conditions)
139
            ->select([
140
            'id'
141
        ])
142
            ->first();
143
    }
144
145
    /**
146
     * Returns the number of items in the queue.
147
     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
148
     *
149
     * @param string|null $taskName
150
     *            Task type to Count
151
     * @return int
152
     */
153
    public function getLength($taskName = null)
154
    {
155
        $findConf = [
156
            'conditions' => [
157
                'completed IS' => null
158
            ]
159
        ];
160
        if ($taskName !== null) {
161
            $findConf['conditions']['task'] = $taskName;
162
        }
163
164
        return $this->find('all', $findConf)->count();
165
    }
166
167
    /**
168
     * Return a list of all task types in the Queue.
169
     *
170
     * @return \Cake\ORM\Query
171
     */
172
    public function getTypes()
173
    {
174
        $findCond = [
175
            'fields' => [
176
                'task'
177
            ],
178
            'group' => [
179
                'task'
180
            ],
181
            'keyField' => 'task',
182
            'valueField' => 'task'
183
        ];
184
        return $this->find('list', $findCond);
185
    }
186
187
    /**
188
     * Return some statistics about finished jobs still in the Database.
189
     * TO-DO: rewrite as virtual field
190
     *
191
     * @return \Cake\ORM\Query
192
     */
193
    public function getStats()
194
    {
195
        $driverName = $this->_getDriverName();
196
        $options = [
197
            'fields' => function (Query $query) use ($driverName) {
198
                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
199
                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
200
                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
201
                switch ($driverName) {
202
                    case static::DRIVER_SQLSERVER:
203
                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
204
                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
205
                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
206
                        break;
207
                }
208
                /**
209
                 *
210
                 * @var \Cake\ORM\Query
211
                 */
212
                return [
213
                    'task',
214
                    'num' => $query->func()->count('*'),
215
                    'alltime' => $alltime,
216
                    'runtime' => $runtime,
217
                    'fetchdelay' => $fetchdelay
218
                ];
219
            },
220
            'conditions' => [
221
                'completed IS NOT' => null
222
            ],
223
            'group' => [
224
                'task'
225
            ]
226
        ];
227
        return $this->find('all', $options);
228
    }
229
230
    /**
231
     * Returns [
232
     * 'Task' => [
233
     * 'YYYY-MM-DD' => INT,
234
     * ...
235
     * ]
236
     * ]
237
     *
238
     * @param string|null $task
239
     * @return array
240
     */
241
    public function getFullStats($taskName = null)
242
    {
243
        $driverName = $this->_getDriverName();
244
        $fields = function (Query $query) use ($driverName) {
245
            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
246
            switch ($driverName) {
247
                case static::DRIVER_SQLSERVER:
248
                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
249
                    break;
250
            }
251
252
            return [
253
                'task',
254
                'created',
255
                'duration' => $runtime
256
            ];
257
        };
258
259
        $conditions = [
260
            'completed IS NOT' => null
261
        ];
262
        if ($taskName) {
263
            $conditions['task'] = $taskName;
264
        }
265
266
        $tasks = $this->find()
267
            ->select($fields)
268
            ->where($conditions)
269
            ->enableHydration(false)
270
            ->orderDesc('id')
271
            ->limit(static::STATS_LIMIT)
272
            ->all()
273
            ->toArray();
274
275
        $result = [];
276
277
        $days = [];
278
279
        foreach ($tasks as $task) {
280
            /** @var \DateTime $created */
281
            $created = $task['created'];
282
            $day = $created->format('Y-m-d');
283
            if (! isset($days[$day])) {
284
                $days[$day] = $day;
285
            }
286
287
            $result[$task['task']][$day][] = $task['duration'];
288
        }
289
290
        foreach ($result as $type => $tasks) {
291
            foreach ($tasks as $day => $durations) {
292
                $average = array_sum($durations) / count($durations);
293
                $result[$type][$day] = (int) $average;
294
            }
295
296
            foreach ($days as $day) {
297
                if (isset($result[$type][$day])) {
298
                    continue;
299
                }
300
301
                $result[$type][$day] = 0;
302
            }
303
304
            ksort($result[$type]);
305
        }
306
307
        return $result;
308
    }
309
310
    /**
311
     * Look for a new job that can be processed with the current abilities and
312
     * from the specified group (or any if null).
313
     *
314
     * @param array $capabilities
315
     *            Available QueueWorkerTasks.
316
     * @param array $types
317
     *            Request a job from these types (or exclude certain types), or any otherwise.
318
     * @return \Queue\Model\Entity\QueuedTask|null
319
     */
320
    public function requestJob(array $capabilities, array $types = [])
321
    {
322
        $now = $this->getDateTime();
323
        $nowStr = $now->toDateTimeString();
324
        $driverName = $this->_getDriverName();
325
326
        $query = $this->find();
327
        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
328
        switch ($driverName) {
329
            case static::DRIVER_SQLSERVER:
330
                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
331
                break;
332
            case static::DRIVER_POSTGRES:
333
                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
334
                break;
335
        }
336
        $options = [
337
            'conditions' => [
338
                'completed IS' => null,
339
                'OR' => []
340
            ],
341
            'fields' => [
342
                'age' => $age
343
            ],
344
            'order' => [
345
                'age' => 'ASC',
346
                'id' => 'ASC'
347
            ]
348
        ];
349
350
        if ($types) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $types of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
351
            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
352
        }
353
354
        // Generate the task specific conditions.
355
        foreach ($capabilities as $task) {
356
            list ($plugin, $name) = pluginSplit($task['name']);
357
            $timeoutAt = $now->copy();
358
            $tmp = [
359
                'task' => $name,
360
                'AND' => [
361
                    [
362
                        'OR' => [
363
                            'not_before <' => $nowStr,
364
                            'not_before IS' => null
365
                        ]
366
                    ],
367
                    [
368
                        'OR' => [
369
                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
370
                            'fetched IS' => null
371
                        ]
372
                    ]
373
                ],
374
                'failed_count <' => ($task['retries'] + 1)
375
            ];
376
            $options['conditions']['OR'][] = $tmp;
377
        }
378
379
        /** @var \Queue\Model\Entity\QueuedTask|null $task */
380
        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
381
            $task = $query->find('all', $options)
382
                ->enableAutoFields(true)
383
                ->epilog('FOR UPDATE')
384
                ->first();
385
386
            if (! $task) {
387
                return null;
388
            }
389
390
            $key = $this->key();
391
            $task = $this->patchEntity($task, [
0 ignored issues
show
Bug introduced by
It seems like $task can also be of type array; however, parameter $entity of Queue\Model\Table\QueuedTasksTable::patchEntity() does only seem to accept Cake\Datasource\EntityInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

391
            $task = $this->patchEntity(/** @scrutinizer ignore-type */ $task, [
Loading history...
392
                'worker_key' => $key,
393
                'fetched' => $now
394
            ]);
395
396
            return $this->saveOrFail($task);
397
        });
398
399
        if (! $task) {
400
            return null;
401
        }
402
403
        return $task;
404
    }
405
406
    /**
407
     * Mark a task as Completed, removing it from the queue.
408
     *
409
     * @param \Queue\Model\Entity\QueuedTask $task
410
     *            Task
411
     * @return bool Success
412
     */
413
    public function markJobDone(QueuedTask $task)
414
    {
415
        $fields = [
416
            'completed' => $this->getDateTime()
417
        ];
418
        $task = $this->patchEntity($task, $fields);
419
420
        return (bool) $this->save($task);
421
    }
422
423
    /**
424
     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
425
     *
426
     * @param \Queue\Model\Entity\QueuedTask $task
427
     *            Task
428
     * @param string|null $failureMessage
429
     *            Optional message to append to the failure_message field.
430
     * @return bool Success
431
     */
432
    public function markJobFailed(QueuedTask $task, $failureMessage = null)
433
    {
434
        $fields = [
435
            'failed_count' => $task->failed_count + 1,
436
            'failure_message' => $failureMessage
437
        ];
438
        $task = $this->patchEntity($task, $fields);
439
440
        return (bool) $this->save($task);
441
    }
442
443
    /**
444
     * Reset current jobs
445
     *
446
     * @param int|null $id
447
     *
448
     * @return int Success
449
     */
450
    public function reset($id = null)
451
    {
452
        $fields = [
453
            'completed' => null,
454
            'fetched' => null,
455
            'failed_count' => 0,
456
            'worker_key' => null,
457
            'failure_message' => null
458
        ];
459
        $conditions = [
460
            'completed IS' => null
461
        ];
462
        if ($id) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $id of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
463
            $conditions['id'] = $id;
464
        }
465
466
        return $this->updateAll($fields, $conditions);
467
    }
468
469
    /**
470
     *
471
     * @param string $taskName
472
     *
473
     * @return int
474
     */
475
    public function rerun($taskName)
476
    {
477
        $fields = [
478
            'completed' => null,
479
            'fetched' => null,
480
            'failed_count' => 0,
481
            'worker_key' => null,
482
            'failure_message' => null
483
        ];
484
        $conditions = [
485
            'completed IS NOT' => null,
486
            'task' => $taskName
487
        ];
488
489
        return $this->updateAll($fields, $conditions);
490
    }
491
492
    /**
493
     * Cleanup/Delete Completed Tasks.
494
     *
495
     * @return void
496
     */
497
    public function cleanOldJobs()
498
    {
499
        if (! Configure::read('Queue.cleanuptimeout')) {
500
            return;
501
        }
502
503
        $this->deleteAll([
504
            'completed <' => time() - (int) Configure::read('Queue.cleanuptimeout')
505
        ]);
506
    }
507
508
    /**
509
     *
510
     * @param \Queue\Model\Entity\QueuedTask $queuedTask
511
     * @param array $taskConfiguration
512
     * @return string
513
     */
514
    public function getFailedStatus($queuedTask, array $taskConfiguration)
515
    {
516
        $failureMessageRequeued = 'requeued';
517
518
        $queuedTaskName = 'Queue' . $queuedTask->task;
519
        if (empty($taskConfiguration[$queuedTaskName])) {
520
            return $failureMessageRequeued;
521
        }
522
        $retries = $taskConfiguration[$queuedTaskName]['retries'];
523
        if ($queuedTask->failed_count <= $retries) {
524
            return $failureMessageRequeued;
525
        }
526
527
        return 'aborted';
528
    }
529
530
    /**
531
     * Generates a unique Identifier for the current worker thread.
532
     *
533
     * Useful to identify the currently running processes for this thread.
534
     *
535
     * @return string Identifier
536
     */
537
    public function key()
538
    {
539
        if ($this->_key !== null) {
540
            return $this->_key;
541
        }
542
        $this->_key = sha1(microtime());
543
        if (! $this->_key) {
544
            throw new RuntimeException('Invalid key generated');
545
        }
546
547
        return $this->_key;
548
    }
549
550
    /**
551
     * Resets worker Identifier
552
     *
553
     * @return void
554
     */
555
    public function clearKey()
556
    {
557
        $this->_key = null;
558
    }
559
560
    /**
561
     * truncate()
562
     *
563
     * @return void
564
     */
565
    public function truncate()
566
    {
567
        $sql = $this->getSchema()->truncateSql($this->_connection);
568
        foreach ($sql as $snippet) {
569
            $this->_connection->execute($snippet);
570
        }
571
    }
572
573
    /**
574
     * get the name of the driver
575
     *
576
     * @return string
577
     */
578
    protected function _getDriverName()
579
    {
580
        $className = explode('\\', $this->getConnection()->config()['driver']);
581
        $name = end($className);
582
583
        return $name;
584
    }
585
586
    /**
587
     *
588
     * @param array $conditions
589
     * @param string $key
590
     * @param array $values
591
     * @return array
592
     */
593
    protected function addFilter(array $conditions, $key, array $values)
594
    {
595
        $include = [];
596
        $exclude = [];
597
        foreach ($values as $value) {
598
            if (substr($value, 0, 1) === '-') {
599
                $exclude[] = substr($value, 1);
600
            } else {
601
                $include[] = $value;
602
            }
603
        }
604
605
        if ($include) {
606
            $conditions[$key . ' IN'] = $include;
607
        }
608
        if ($exclude) {
609
            $conditions[$key . ' NOT IN'] = $exclude;
610
        }
611
612
        return $conditions;
613
    }
614
615
    /**
616
     * Returns a DateTime object from different input.
617
     *
618
     * Without argument this will be "now".
619
     *
620
     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore
621
     *
622
     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
623
     */
624
    protected function getDateTime($notBefore = null)
625
    {
626
        if (is_object($notBefore)) {
627
            return $notBefore;
628
        }
629
630
        return new FrozenTime($notBefore);
631
    }
632
}
633