Completed
Pull Request — master (#35)
by
unknown
03:32
created

QueuedTasksTable::requestJob()   B

Complexity

Conditions 7
Paths 24

Size

Total Lines 84
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 52
nc 24
nop 2
dl 0
loc 84
rs 8.1138
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
namespace Queue\Model\Table;
3
4
use ArrayObject;
5
use Cake\Core\Configure;
6
use Cake\Event\Event;
7
use Cake\I18n\FrozenTime;
8
use Cake\ORM\Query;
9
use Cake\ORM\Table;
10
use InvalidArgumentException;
11
use Queue\Model\Entity\QueuedTask;
12
use RuntimeException;
13
14
/**
15
 * QueuedTasks Model
16
 *
17
 * @method \Queue\Model\Entity\QueuedTask get($primaryKey, $options = [])
18
 * @method \Queue\Model\Entity\QueuedTask newEntity($data = null, array $options = [])
19
 * @method \Queue\Model\Entity\QueuedTask[] newEntities(array $data, array $options = [])
20
 * @method \Queue\Model\Entity\QueuedTask|bool save(\Cake\Datasource\EntityInterface $entity, $options = [])
21
 * @method \Queue\Model\Entity\QueuedTask saveOrFail(\Cake\Datasource\EntityInterface $entity, $options = [])
22
 * @method \Queue\Model\Entity\QueuedTask patchEntity(\Cake\Datasource\EntityInterface $entity, array $data, array $options = [])
23
 * @method \Queue\Model\Entity\QueuedTask[] patchEntities($entities, array $data, array $options = [])
24
 * @method \Queue\Model\Entity\QueuedTask findOrCreate($search, callable $callback = null, $options = [])
25
 *
26
 * @mixin \Cake\ORM\Behavior\TimestampBehavior
27
 */
28
class QueuedTasksTable extends Table
29
{
30
31
    const DRIVER_MYSQL = 'Mysql';
32
33
    const DRIVER_POSTGRES = 'Postgres';
34
35
    const DRIVER_SQLSERVER = 'Sqlserver';
36
37
    const STATS_LIMIT = 100000;
38
39
    /**
40
     * Initialize method
41
     *
42
     * @param array $config The configuration for the Table.
43
     * @return void
44
     */
45
    public function initialize(array $config)
46
    {
47
        parent::initialize($config);
48
49
        $this->setTable('queued_tasks');
50
        $this->setDisplayField('id');
51
        $this->setPrimaryKey('id');
52
53
        $this->addBehavior('Timestamp');
54
    }
55
56
    /**
57
     * set connection name
58
     *
59
     * @return string
60
     */
61
    public static function defaultConnectionName()
62
    {
63
        $connection = Configure::read('Queue.connection');
64
        if (!empty($connection)) {
65
            return $connection;
66
        }
67
68
        return parent::defaultConnectionName();
69
    }
70
71
    /**
72
     *
73
     * @param \Cake\Event\Event $event Model event
74
     * @param \ArrayObject $data The data
75
     * @param \ArrayObject $options The options
76
     * @return void
77
     */
78
    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

78
    public function beforeMarshal(Event $event, ArrayObject $data, /** @scrutinizer ignore-unused */ ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

78
    public function beforeMarshal(/** @scrutinizer ignore-unused */ Event $event, ArrayObject $data, ArrayObject $options)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
79
    {
80
        if (isset($data['data']) && $data['data'] === '') {
81
            $data['data'] = null;
82
        }
83
    }
84
85
    /**
86
     * Adds a new job to the queue.
87
     *
88
     * @param string $taskName Task name
89
     * @param array|null $data Array of data
90
     * @param string $notBefore A datetime which indicates when the job may be executed
91
     * @return \Queue\Model\Entity\QueuedTask Saved job entity
92
     */
93
    public function createJob($taskName, array $data = null, string $notBefore = null)
94
    {
95
        $task = [
96
            'task' => $taskName,
97
            'data' => serialize($data),
98
            'not_before' => $this->getDateTime()
99
        ];
100
101
        if (!empty($notBefore)) {
102
            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
103
        }
104
105
        $queuedTask = $this->newEntity($task);
106
107
        return $this->saveOrFail($queuedTask);
108
    }
109
110
    /**
111
     * Returns the number of items in the queue.
112
     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
113
     *
114
     * @param string|null $taskName Task type to Count
115
     * @return int
116
     */
117
    public function getLength($taskName = null)
118
    {
119
        $findConf = [
120
            'conditions' => [
121
                'completed IS' => null
122
            ]
123
        ];
124
        if ($taskName !== null) {
125
            $findConf['conditions']['task'] = $taskName;
126
        }
127
128
        return $this->find('all', $findConf)->count();
129
    }
130
131
    /**
132
     * Return a list of all task types in the Queue.
133
     *
134
     * @return \Cake\ORM\Query
135
     */
136
    public function getTypes()
137
    {
138
        $findCond = [
139
            'fields' => [
140
                'task'
141
            ],
142
            'group' => [
143
                'task'
144
            ],
145
            'keyField' => 'task',
146
            'valueField' => 'task'
147
        ];
148
149
        return $this->find('list', $findCond);
150
    }
151
152
    /**
153
     * Return some statistics about finished jobs still in the Database.
154
     * TO-DO: rewrite as virtual field
155
     *
156
     * @return \Cake\ORM\Query
157
     */
158
    public function getStats()
159
    {
160
        $driverName = $this->_getDriverName();
161
        $options = [
162
            'fields' => function (Query $query) use ($driverName) {
163
                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
164
                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
165
                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
166
                switch ($driverName) {
167
                    case static::DRIVER_SQLSERVER:
168
                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
169
                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
170
                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
171
                        break;
172
                }
173
                /**
174
                 *
175
                 * @var \Cake\ORM\Query
176
                 */
177
                return [
178
                    'task',
179
                    'num' => $query->func()->count('*'),
180
                    'alltime' => $alltime,
181
                    'runtime' => $runtime,
182
                    'fetchdelay' => $fetchdelay
183
                ];
184
            },
185
            'conditions' => [
186
                'completed IS NOT' => null
187
            ],
188
            'group' => [
189
                'task'
190
            ]
191
        ];
192
193
        return $this->find('all', $options);
194
    }
195
196
    /**
197
     * Returns [
198
     * 'Task' => [
199
     * 'YYYY-MM-DD' => INT,
200
     * ...
201
     * ]
202
     * ]
203
     *
204
     * @param string|null $taskName The task name
205
     * @return array
206
     */
207
    public function getFullStats($taskName = null)
208
    {
209
        $driverName = $this->_getDriverName();
210
        $fields = function (Query $query) use ($driverName) {
211
            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
212
            switch ($driverName) {
213
                case static::DRIVER_SQLSERVER:
214
                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
215
                    break;
216
            }
217
218
            return [
219
                'task',
220
                'created',
221
                'duration' => $runtime
222
            ];
223
        };
224
225
        $conditions = [
226
            'completed IS NOT' => null
227
        ];
228
        if ($taskName) {
229
            $conditions['task'] = $taskName;
230
        }
231
232
        $tasks = $this->find()
233
            ->select($fields)
234
            ->where($conditions)
235
            ->enableHydration(false)
236
            ->orderDesc('id')
237
            ->limit(static::STATS_LIMIT)
238
            ->all()
239
            ->toArray();
240
241
        $result = [];
242
243
        $days = [];
244
245
        foreach ($tasks as $task) {
246
            /** @var \DateTime $created */
247
            $created = $task['created'];
248
            $day = $created->format('Y-m-d');
249
            if (!isset($days[$day])) {
250
                $days[$day] = $day;
251
            }
252
253
            $result[$task['task']][$day][] = $task['duration'];
254
        }
255
256
        foreach ($result as $type => $tasks) {
257
            foreach ($tasks as $day => $durations) {
258
                $average = array_sum($durations) / count($durations);
259
                $result[$type][$day] = (int)$average;
260
            }
261
262
            foreach ($days as $day) {
263
                if (isset($result[$type][$day])) {
264
                    continue;
265
                }
266
267
                $result[$type][$day] = 0;
268
            }
269
270
            ksort($result[$type]);
271
        }
272
273
        return $result;
274
    }
275
276
    /**
277
     * Look for a new job that can be processed with the current abilities and
278
     * from the specified group (or any if null).
279
     *
280
     * @param array $capabilities Available QueueWorkerTasks.
281
     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
282
     * @return \Queue\Model\Entity\QueuedTask|null
283
     */
284
    public function requestJob(array $capabilities, array $types = [])
285
    {
286
        $now = $this->getDateTime();
287
        $nowStr = $now->toDateTimeString();
288
        $driverName = $this->_getDriverName();
289
290
        $query = $this->find();
291
        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
292
        switch ($driverName) {
293
            case static::DRIVER_SQLSERVER:
294
                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
295
                break;
296
            case static::DRIVER_POSTGRES:
297
                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
298
                break;
299
        }
300
        $options = [
301
            'conditions' => [
302
                'completed IS' => null,
303
                'OR' => []
304
            ],
305
            'fields' => [
306
                'age' => $age
307
            ],
308
            'order' => [
309
                'age' => 'ASC',
310
                'id' => 'ASC'
311
            ]
312
        ];
313
314
        if ($types) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $types of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
315
            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
316
        }
317
318
        // Generate the task specific conditions.
319
        foreach ($capabilities as $task) {
320
            list ($plugin, $name) = pluginSplit($task['name']);
321
            $timeoutAt = $now->copy();
322
            $tmp = [
323
                'task' => $name,
324
                'AND' => [
325
                    [
326
                        'OR' => [
327
                            'not_before <=' => $nowStr,
328
                            'not_before IS' => null
329
                        ]
330
                    ],
331
                    [
332
                        'OR' => [
333
                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
334
                            'fetched IS' => null
335
                        ]
336
                    ]
337
                ],
338
                'failed_count <' => ($task['retries'] + 1)
339
            ];
340
            $options['conditions']['OR'][] = $tmp;
341
        }
342
343
        /** @var \Queue\Model\Entity\QueuedTask|null $task */
344
        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
345
            $task = $query->find('all', $options)
346
                ->enableAutoFields(true)
347
                ->epilog('FOR UPDATE')
348
                ->first();
349
350
            if (!$task) {
351
                return null;
352
            }
353
354
            $key = sha1(microtime());
355
            $task = $this->patchEntity($task, [
0 ignored issues
show
Bug introduced by
It seems like $task can also be of type array; however, parameter $entity of Queue\Model\Table\QueuedTasksTable::patchEntity() does only seem to accept Cake\Datasource\EntityInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

355
            $task = $this->patchEntity(/** @scrutinizer ignore-type */ $task, [
Loading history...
356
                'worker_key' => $key,
357
                'fetched' => $now
358
            ]);
359
360
            return $this->saveOrFail($task);
361
        });
362
363
        if (!$task) {
364
            return null;
365
        }
366
367
        return $task;
368
    }
369
370
    /**
371
     * Mark a task as Completed, removing it from the queue.
372
     *
373
     * @param \Queue\Model\Entity\QueuedTask $task Task
374
     * @return bool Success
375
     */
376
    public function markJobDone(QueuedTask $task)
377
    {
378
        $fields = [
379
            'completed' => $this->getDateTime()
380
        ];
381
        $task = $this->patchEntity($task, $fields);
382
383
        return (bool)$this->save($task);
384
    }
385
386
    /**
387
     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
388
     *
389
     * @param \Queue\Model\Entity\QueuedTask $task Task
390
     * @param string|null $failureMessage Optional message to append to the failure_message field.
391
     * @return bool Success
392
     */
393
    public function markJobFailed(QueuedTask $task, $failureMessage = null)
394
    {
395
        $fields = [
396
            'failed_count' => $task->failed_count + 1,
397
            'failure_message' => $failureMessage
398
        ];
399
        $task = $this->patchEntity($task, $fields);
400
401
        return (bool)$this->save($task);
402
    }
403
404
    /**
405
     * Reset current jobs
406
     *
407
     * @param int|null $id ID
408
     *
409
     * @return int Success
410
     */
411
    public function reset($id = null)
412
    {
413
        $fields = [
414
            'completed' => null,
415
            'fetched' => null,
416
            'failed_count' => 0,
417
            'worker_key' => null,
418
            'failure_message' => null
419
        ];
420
        $conditions = [
421
            'completed IS' => null
422
        ];
423
        if ($id) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $id of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
424
            $conditions['id'] = $id;
425
        }
426
427
        return $this->updateAll($fields, $conditions);
428
    }
429
430
    /**
431
     *
432
     * @param string $taskName Task name
433
     *
434
     * @return int
435
     */
436
    public function rerun($taskName)
437
    {
438
        $fields = [
439
            'completed' => null,
440
            'fetched' => null,
441
            'failed_count' => 0,
442
            'worker_key' => null,
443
            'failure_message' => null
444
        ];
445
        $conditions = [
446
            'completed IS NOT' => null,
447
            'task' => $taskName
448
        ];
449
450
        return $this->updateAll($fields, $conditions);
451
    }
452
453
    /**
454
     * Cleanup/Delete Completed Tasks.
455
     *
456
     * @return void
457
     */
458
    public function cleanOldJobs()
459
    {
460
        if (!Configure::read('Queue.cleanuptimeout')) {
461
            return;
462
        }
463
464
        $this->deleteAll([
465
            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
466
        ]);
467
    }
468
469
    /**
470
     *
471
     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
472
     * @param array $taskConfiguration Task configuration
473
     * @return string
474
     */
475
    public function getFailedStatus($queuedTask, array $taskConfiguration)
476
    {
477
        $failureMessageRequeued = 'requeued';
478
479
        $queuedTaskName = 'Queue' . $queuedTask->task;
480
        if (empty($taskConfiguration[$queuedTaskName])) {
481
            return $failureMessageRequeued;
482
        }
483
        $retries = $taskConfiguration[$queuedTaskName]['retries'];
484
        if ($queuedTask->failed_count <= $retries) {
485
            return $failureMessageRequeued;
486
        }
487
488
        return 'aborted';
489
    }
490
491
    /**
492
     * truncate()
493
     *
494
     * @return void
495
     */
496
    public function truncate()
497
    {
498
        $sql = $this->getSchema()->truncateSql($this->_connection);
499
        foreach ($sql as $snippet) {
500
            $this->_connection->execute($snippet);
501
        }
502
    }
503
504
    /**
505
     * get the name of the driver
506
     *
507
     * @return string
508
     */
509
    protected function _getDriverName()
510
    {
511
        $className = explode('\\', $this->getConnection()->config()['driver']);
512
        $name = end($className);
513
514
        return $name;
515
    }
516
517
    /**
518
     *
519
     * @param array $conditions Conditions
520
     * @param string $key Key
521
     * @param array $values Values
522
     * @return array
523
     */
524
    protected function addFilter(array $conditions, $key, array $values)
525
    {
526
        $include = [];
527
        $exclude = [];
528
        foreach ($values as $value) {
529
            if (substr($value, 0, 1) === '-') {
530
                $exclude[] = substr($value, 1);
531
            } else {
532
                $include[] = $value;
533
            }
534
        }
535
536
        if ($include) {
537
            $conditions[$key . ' IN'] = $include;
538
        }
539
        if ($exclude) {
540
            $conditions[$key . ' NOT IN'] = $exclude;
541
        }
542
543
        return $conditions;
544
    }
545
546
    /**
547
     * Returns a DateTime object from different input.
548
     *
549
     * Without argument this will be "now".
550
     *
551
     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
552
     *
553
     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
554
     */
555
    protected function getDateTime($notBefore = null)
556
    {
557
        if (is_object($notBefore)) {
558
            return $notBefore;
559
        }
560
561
        return new FrozenTime($notBefore);
562
    }
563
}
564