Passed
Pull Request — master (#35)
by
unknown
01:59
created
src/Model/Table/QueuedTasksTable.php 2 patches
Indentation   +601 added lines, -601 removed lines patch added patch discarded remove patch
@@ -28,605 +28,605 @@
 block discarded – undo
28 28
 class QueuedTasksTable extends Table
29 29
 {
30 30
 
31
-    const DRIVER_MYSQL = 'Mysql';
32
-
33
-    const DRIVER_POSTGRES = 'Postgres';
34
-
35
-    const DRIVER_SQLSERVER = 'Sqlserver';
36
-
37
-    const STATS_LIMIT = 100000;
38
-
39
-    /**
40
-     *
41
-     * @var string|null
42
-     */
43
-    protected $_key;
44
-
45
-    /**
46
-     * Initialize method
47
-     *
48
-     * @param array $config
49
-     *            The configuration for the Table.
50
-     * @return void
51
-     */
52
-    public function initialize(array $config)
53
-    {
54
-        parent::initialize($config);
55
-
56
-        $this->setTable('queued_tasks');
57
-        $this->setDisplayField('id');
58
-        $this->setPrimaryKey('id');
59
-
60
-        $this->addBehavior('Timestamp');
61
-    }
62
-
63
-    /**
64
-     * set connection name
65
-     *
66
-     * @return string
67
-     */
68
-    public static function defaultConnectionName()
69
-    {
70
-        $connection = Configure::read('Queue.connection');
71
-        if (! empty($connection)) {
72
-            return $connection;
73
-        }
74
-
75
-        return parent::defaultConnectionName();
76
-    }
77
-
78
-    /**
79
-     *
80
-     * @param \Cake\Event\Event $event
81
-     * @param \ArrayObject $data
82
-     * @param \ArrayObject $options
83
-     * @return void
84
-     */
85
-    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
86
-    {
87
-        if (isset($data['data']) && $data['data'] === '') {
88
-            $data['data'] = null;
89
-        }
90
-    }
91
-
92
-    /**
93
-     * Adds a new job to the queue.
94
-     *
95
-     * @param string $taskName
96
-     *            Task name
97
-     * @param array|null $data
98
-     *            Array of data
99
-     * @param string $notBefore
100
-     *            A datetime which indicates when the job may be executed
101
-     * @return \Queue\Model\Entity\QueuedTask Saved job entity
102
-     */
103
-    public function createJob($taskName, array $data = null, string $notBefore = null)
104
-    {
105
-        $task = [
106
-            'task' => $taskName,
107
-            'data' => serialize($data),
108
-            'not_before' => $this->getDateTime()
109
-        ];
110
-
111
-        if (! empty($notBefore)) {
112
-            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
113
-        }
114
-
115
-        $queuedTask = $this->newEntity($task);
116
-
117
-        return $this->saveOrFail($queuedTask);
118
-    }
119
-
120
-    /**
121
-     *
122
-     * @param string|null $taskName
123
-     *
124
-     * @return bool
125
-     *
126
-     * @throws \InvalidArgumentException
127
-     */
128
-    public function isQueued($taskName = null)
129
-    {
130
-        $conditions = [
131
-            'completed IS' => null
132
-        ];
133
-        if ($taskName) {
134
-            $conditions['task'] = $taskName;
135
-        }
136
-
137
-        return (bool) $this->find()
138
-            ->where($conditions)
139
-            ->select([
140
-            'id'
141
-        ])
142
-            ->first();
143
-    }
144
-
145
-    /**
146
-     * Returns the number of items in the queue.
147
-     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
148
-     *
149
-     * @param string|null $taskName
150
-     *            Task type to Count
151
-     * @return int
152
-     */
153
-    public function getLength($taskName = null)
154
-    {
155
-        $findConf = [
156
-            'conditions' => [
157
-                'completed IS' => null
158
-            ]
159
-        ];
160
-        if ($taskName !== null) {
161
-            $findConf['conditions']['task'] = $taskName;
162
-        }
163
-
164
-        return $this->find('all', $findConf)->count();
165
-    }
166
-
167
-    /**
168
-     * Return a list of all task types in the Queue.
169
-     *
170
-     * @return \Cake\ORM\Query
171
-     */
172
-    public function getTypes()
173
-    {
174
-        $findCond = [
175
-            'fields' => [
176
-                'task'
177
-            ],
178
-            'group' => [
179
-                'task'
180
-            ],
181
-            'keyField' => 'task',
182
-            'valueField' => 'task'
183
-        ];
184
-        return $this->find('list', $findCond);
185
-    }
186
-
187
-    /**
188
-     * Return some statistics about finished jobs still in the Database.
189
-     * TO-DO: rewrite as virtual field
190
-     *
191
-     * @return \Cake\ORM\Query
192
-     */
193
-    public function getStats()
194
-    {
195
-        $driverName = $this->_getDriverName();
196
-        $options = [
197
-            'fields' => function (Query $query) use ($driverName) {
198
-                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
199
-                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
200
-                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
201
-                switch ($driverName) {
202
-                    case static::DRIVER_SQLSERVER:
203
-                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
204
-                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
205
-                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
206
-                        break;
207
-                }
208
-                /**
209
-                 *
210
-                 * @var \Cake\ORM\Query
211
-                 */
212
-                return [
213
-                    'task',
214
-                    'num' => $query->func()->count('*'),
215
-                    'alltime' => $alltime,
216
-                    'runtime' => $runtime,
217
-                    'fetchdelay' => $fetchdelay
218
-                ];
219
-            },
220
-            'conditions' => [
221
-                'completed IS NOT' => null
222
-            ],
223
-            'group' => [
224
-                'task'
225
-            ]
226
-        ];
227
-        return $this->find('all', $options);
228
-    }
229
-
230
-    /**
231
-     * Returns [
232
-     * 'Task' => [
233
-     * 'YYYY-MM-DD' => INT,
234
-     * ...
235
-     * ]
236
-     * ]
237
-     *
238
-     * @param string|null $task
239
-     * @return array
240
-     */
241
-    public function getFullStats($taskName = null)
242
-    {
243
-        $driverName = $this->_getDriverName();
244
-        $fields = function (Query $query) use ($driverName) {
245
-            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
246
-            switch ($driverName) {
247
-                case static::DRIVER_SQLSERVER:
248
-                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
249
-                    break;
250
-            }
251
-
252
-            return [
253
-                'task',
254
-                'created',
255
-                'duration' => $runtime
256
-            ];
257
-        };
258
-
259
-        $conditions = [
260
-            'completed IS NOT' => null
261
-        ];
262
-        if ($taskName) {
263
-            $conditions['task'] = $taskName;
264
-        }
265
-
266
-        $tasks = $this->find()
267
-            ->select($fields)
268
-            ->where($conditions)
269
-            ->enableHydration(false)
270
-            ->orderDesc('id')
271
-            ->limit(static::STATS_LIMIT)
272
-            ->all()
273
-            ->toArray();
274
-
275
-        $result = [];
276
-
277
-        $days = [];
278
-
279
-        foreach ($tasks as $task) {
280
-            /** @var \DateTime $created */
281
-            $created = $task['created'];
282
-            $day = $created->format('Y-m-d');
283
-            if (! isset($days[$day])) {
284
-                $days[$day] = $day;
285
-            }
286
-
287
-            $result[$task['task']][$day][] = $task['duration'];
288
-        }
289
-
290
-        foreach ($result as $type => $tasks) {
291
-            foreach ($tasks as $day => $durations) {
292
-                $average = array_sum($durations) / count($durations);
293
-                $result[$type][$day] = (int) $average;
294
-            }
295
-
296
-            foreach ($days as $day) {
297
-                if (isset($result[$type][$day])) {
298
-                    continue;
299
-                }
300
-
301
-                $result[$type][$day] = 0;
302
-            }
303
-
304
-            ksort($result[$type]);
305
-        }
306
-
307
-        return $result;
308
-    }
309
-
310
-    /**
311
-     * Look for a new job that can be processed with the current abilities and
312
-     * from the specified group (or any if null).
313
-     *
314
-     * @param array $capabilities
315
-     *            Available QueueWorkerTasks.
316
-     * @param array $types
317
-     *            Request a job from these types (or exclude certain types), or any otherwise.
318
-     * @return \Queue\Model\Entity\QueuedTask|null
319
-     */
320
-    public function requestJob(array $capabilities, array $types = [])
321
-    {
322
-        $now = $this->getDateTime();
323
-        $nowStr = $now->toDateTimeString();
324
-        $driverName = $this->_getDriverName();
325
-
326
-        $query = $this->find();
327
-        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
328
-        switch ($driverName) {
329
-            case static::DRIVER_SQLSERVER:
330
-                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
331
-                break;
332
-            case static::DRIVER_POSTGRES:
333
-                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
334
-                break;
335
-        }
336
-        $options = [
337
-            'conditions' => [
338
-                'completed IS' => null,
339
-                'OR' => []
340
-            ],
341
-            'fields' => [
342
-                'age' => $age
343
-            ],
344
-            'order' => [
345
-                'age' => 'ASC',
346
-                'id' => 'ASC'
347
-            ]
348
-        ];
349
-
350
-        if ($types) {
351
-            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
352
-        }
353
-
354
-        // Generate the task specific conditions.
355
-        foreach ($capabilities as $task) {
356
-            list ($plugin, $name) = pluginSplit($task['name']);
357
-            $timeoutAt = $now->copy();
358
-            $tmp = [
359
-                'task' => $name,
360
-                'AND' => [
361
-                    [
362
-                        'OR' => [
363
-                            'not_before <' => $nowStr,
364
-                            'not_before IS' => null
365
-                        ]
366
-                    ],
367
-                    [
368
-                        'OR' => [
369
-                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
370
-                            'fetched IS' => null
371
-                        ]
372
-                    ]
373
-                ],
374
-                'failed_count <' => ($task['retries'] + 1)
375
-            ];
376
-            $options['conditions']['OR'][] = $tmp;
377
-        }
378
-
379
-        /** @var \Queue\Model\Entity\QueuedTask|null $task */
380
-        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
381
-            $task = $query->find('all', $options)
382
-                ->enableAutoFields(true)
383
-                ->epilog('FOR UPDATE')
384
-                ->first();
385
-
386
-            if (! $task) {
387
-                return null;
388
-            }
389
-
390
-            $key = $this->key();
391
-            $task = $this->patchEntity($task, [
392
-                'worker_key' => $key,
393
-                'fetched' => $now
394
-            ]);
395
-
396
-            return $this->saveOrFail($task);
397
-        });
398
-
399
-        if (! $task) {
400
-            return null;
401
-        }
402
-
403
-        return $task;
404
-    }
405
-
406
-    /**
407
-     * Mark a task as Completed, removing it from the queue.
408
-     *
409
-     * @param \Queue\Model\Entity\QueuedTask $task
410
-     *            Task
411
-     * @return bool Success
412
-     */
413
-    public function markJobDone(QueuedTask $task)
414
-    {
415
-        $fields = [
416
-            'completed' => $this->getDateTime()
417
-        ];
418
-        $task = $this->patchEntity($task, $fields);
419
-
420
-        return (bool) $this->save($task);
421
-    }
422
-
423
-    /**
424
-     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
425
-     *
426
-     * @param \Queue\Model\Entity\QueuedTask $task
427
-     *            Task
428
-     * @param string|null $failureMessage
429
-     *            Optional message to append to the failure_message field.
430
-     * @return bool Success
431
-     */
432
-    public function markJobFailed(QueuedTask $task, $failureMessage = null)
433
-    {
434
-        $fields = [
435
-            'failed_count' => $task->failed_count + 1,
436
-            'failure_message' => $failureMessage
437
-        ];
438
-        $task = $this->patchEntity($task, $fields);
439
-
440
-        return (bool) $this->save($task);
441
-    }
442
-
443
-    /**
444
-     * Reset current jobs
445
-     *
446
-     * @param int|null $id
447
-     *
448
-     * @return int Success
449
-     */
450
-    public function reset($id = null)
451
-    {
452
-        $fields = [
453
-            'completed' => null,
454
-            'fetched' => null,
455
-            'failed_count' => 0,
456
-            'worker_key' => null,
457
-            'failure_message' => null
458
-        ];
459
-        $conditions = [
460
-            'completed IS' => null
461
-        ];
462
-        if ($id) {
463
-            $conditions['id'] = $id;
464
-        }
465
-
466
-        return $this->updateAll($fields, $conditions);
467
-    }
468
-
469
-    /**
470
-     *
471
-     * @param string $taskName
472
-     *
473
-     * @return int
474
-     */
475
-    public function rerun($taskName)
476
-    {
477
-        $fields = [
478
-            'completed' => null,
479
-            'fetched' => null,
480
-            'failed_count' => 0,
481
-            'worker_key' => null,
482
-            'failure_message' => null
483
-        ];
484
-        $conditions = [
485
-            'completed IS NOT' => null,
486
-            'task' => $taskName
487
-        ];
488
-
489
-        return $this->updateAll($fields, $conditions);
490
-    }
491
-
492
-    /**
493
-     * Cleanup/Delete Completed Tasks.
494
-     *
495
-     * @return void
496
-     */
497
-    public function cleanOldJobs()
498
-    {
499
-        if (! Configure::read('Queue.cleanuptimeout')) {
500
-            return;
501
-        }
502
-
503
-        $this->deleteAll([
504
-            'completed <' => time() - (int) Configure::read('Queue.cleanuptimeout')
505
-        ]);
506
-    }
507
-
508
-    /**
509
-     *
510
-     * @param \Queue\Model\Entity\QueuedTask $queuedTask
511
-     * @param array $taskConfiguration
512
-     * @return string
513
-     */
514
-    public function getFailedStatus($queuedTask, array $taskConfiguration)
515
-    {
516
-        $failureMessageRequeued = 'requeued';
517
-
518
-        $queuedTaskName = 'Queue' . $queuedTask->task;
519
-        if (empty($taskConfiguration[$queuedTaskName])) {
520
-            return $failureMessageRequeued;
521
-        }
522
-        $retries = $taskConfiguration[$queuedTaskName]['retries'];
523
-        if ($queuedTask->failed_count <= $retries) {
524
-            return $failureMessageRequeued;
525
-        }
526
-
527
-        return 'aborted';
528
-    }
529
-
530
-    /**
531
-     * Generates a unique Identifier for the current worker thread.
532
-     *
533
-     * Useful to identify the currently running processes for this thread.
534
-     *
535
-     * @return string Identifier
536
-     */
537
-    public function key()
538
-    {
539
-        if ($this->_key !== null) {
540
-            return $this->_key;
541
-        }
542
-        $this->_key = sha1(microtime());
543
-        if (! $this->_key) {
544
-            throw new RuntimeException('Invalid key generated');
545
-        }
546
-
547
-        return $this->_key;
548
-    }
549
-
550
-    /**
551
-     * Resets worker Identifier
552
-     *
553
-     * @return void
554
-     */
555
-    public function clearKey()
556
-    {
557
-        $this->_key = null;
558
-    }
559
-
560
-    /**
561
-     * truncate()
562
-     *
563
-     * @return void
564
-     */
565
-    public function truncate()
566
-    {
567
-        $sql = $this->getSchema()->truncateSql($this->_connection);
568
-        foreach ($sql as $snippet) {
569
-            $this->_connection->execute($snippet);
570
-        }
571
-    }
572
-
573
-    /**
574
-     * get the name of the driver
575
-     *
576
-     * @return string
577
-     */
578
-    protected function _getDriverName()
579
-    {
580
-        $className = explode('\\', $this->getConnection()->config()['driver']);
581
-        $name = end($className);
582
-
583
-        return $name;
584
-    }
585
-
586
-    /**
587
-     *
588
-     * @param array $conditions
589
-     * @param string $key
590
-     * @param array $values
591
-     * @return array
592
-     */
593
-    protected function addFilter(array $conditions, $key, array $values)
594
-    {
595
-        $include = [];
596
-        $exclude = [];
597
-        foreach ($values as $value) {
598
-            if (substr($value, 0, 1) === '-') {
599
-                $exclude[] = substr($value, 1);
600
-            } else {
601
-                $include[] = $value;
602
-            }
603
-        }
604
-
605
-        if ($include) {
606
-            $conditions[$key . ' IN'] = $include;
607
-        }
608
-        if ($exclude) {
609
-            $conditions[$key . ' NOT IN'] = $exclude;
610
-        }
611
-
612
-        return $conditions;
613
-    }
614
-
615
-    /**
616
-     * Returns a DateTime object from different input.
617
-     *
618
-     * Without argument this will be "now".
619
-     *
620
-     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore
621
-     *
622
-     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
623
-     */
624
-    protected function getDateTime($notBefore = null)
625
-    {
626
-        if (is_object($notBefore)) {
627
-            return $notBefore;
628
-        }
629
-
630
-        return new FrozenTime($notBefore);
631
-    }
31
+	const DRIVER_MYSQL = 'Mysql';
32
+
33
+	const DRIVER_POSTGRES = 'Postgres';
34
+
35
+	const DRIVER_SQLSERVER = 'Sqlserver';
36
+
37
+	const STATS_LIMIT = 100000;
38
+
39
+	/**
40
+	 *
41
+	 * @var string|null
42
+	 */
43
+	protected $_key;
44
+
45
+	/**
46
+	 * Initialize method
47
+	 *
48
+	 * @param array $config
49
+	 *            The configuration for the Table.
50
+	 * @return void
51
+	 */
52
+	public function initialize(array $config)
53
+	{
54
+		parent::initialize($config);
55
+
56
+		$this->setTable('queued_tasks');
57
+		$this->setDisplayField('id');
58
+		$this->setPrimaryKey('id');
59
+
60
+		$this->addBehavior('Timestamp');
61
+	}
62
+
63
+	/**
64
+	 * set connection name
65
+	 *
66
+	 * @return string
67
+	 */
68
+	public static function defaultConnectionName()
69
+	{
70
+		$connection = Configure::read('Queue.connection');
71
+		if (! empty($connection)) {
72
+			return $connection;
73
+		}
74
+
75
+		return parent::defaultConnectionName();
76
+	}
77
+
78
+	/**
79
+	 *
80
+	 * @param \Cake\Event\Event $event
81
+	 * @param \ArrayObject $data
82
+	 * @param \ArrayObject $options
83
+	 * @return void
84
+	 */
85
+	public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
86
+	{
87
+		if (isset($data['data']) && $data['data'] === '') {
88
+			$data['data'] = null;
89
+		}
90
+	}
91
+
92
+	/**
93
+	 * Adds a new job to the queue.
94
+	 *
95
+	 * @param string $taskName
96
+	 *            Task name
97
+	 * @param array|null $data
98
+	 *            Array of data
99
+	 * @param string $notBefore
100
+	 *            A datetime which indicates when the job may be executed
101
+	 * @return \Queue\Model\Entity\QueuedTask Saved job entity
102
+	 */
103
+	public function createJob($taskName, array $data = null, string $notBefore = null)
104
+	{
105
+		$task = [
106
+			'task' => $taskName,
107
+			'data' => serialize($data),
108
+			'not_before' => $this->getDateTime()
109
+		];
110
+
111
+		if (! empty($notBefore)) {
112
+			$task['not_before'] = $this->getDateTime(strtotime($notBefore));
113
+		}
114
+
115
+		$queuedTask = $this->newEntity($task);
116
+
117
+		return $this->saveOrFail($queuedTask);
118
+	}
119
+
120
+	/**
121
+	 *
122
+	 * @param string|null $taskName
123
+	 *
124
+	 * @return bool
125
+	 *
126
+	 * @throws \InvalidArgumentException
127
+	 */
128
+	public function isQueued($taskName = null)
129
+	{
130
+		$conditions = [
131
+			'completed IS' => null
132
+		];
133
+		if ($taskName) {
134
+			$conditions['task'] = $taskName;
135
+		}
136
+
137
+		return (bool) $this->find()
138
+			->where($conditions)
139
+			->select([
140
+			'id'
141
+		])
142
+			->first();
143
+	}
144
+
145
+	/**
146
+	 * Returns the number of items in the queue.
147
+	 * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
148
+	 *
149
+	 * @param string|null $taskName
150
+	 *            Task type to Count
151
+	 * @return int
152
+	 */
153
+	public function getLength($taskName = null)
154
+	{
155
+		$findConf = [
156
+			'conditions' => [
157
+				'completed IS' => null
158
+			]
159
+		];
160
+		if ($taskName !== null) {
161
+			$findConf['conditions']['task'] = $taskName;
162
+		}
163
+
164
+		return $this->find('all', $findConf)->count();
165
+	}
166
+
167
+	/**
168
+	 * Return a list of all task types in the Queue.
169
+	 *
170
+	 * @return \Cake\ORM\Query
171
+	 */
172
+	public function getTypes()
173
+	{
174
+		$findCond = [
175
+			'fields' => [
176
+				'task'
177
+			],
178
+			'group' => [
179
+				'task'
180
+			],
181
+			'keyField' => 'task',
182
+			'valueField' => 'task'
183
+		];
184
+		return $this->find('list', $findCond);
185
+	}
186
+
187
+	/**
188
+	 * Return some statistics about finished jobs still in the Database.
189
+	 * TO-DO: rewrite as virtual field
190
+	 *
191
+	 * @return \Cake\ORM\Query
192
+	 */
193
+	public function getStats()
194
+	{
195
+		$driverName = $this->_getDriverName();
196
+		$options = [
197
+			'fields' => function (Query $query) use ($driverName) {
198
+				$alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
199
+				$runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
200
+				$fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
201
+				switch ($driverName) {
202
+					case static::DRIVER_SQLSERVER:
203
+						$alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
204
+						$runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
205
+						$fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
206
+						break;
207
+				}
208
+				/**
209
+				 *
210
+				 * @var \Cake\ORM\Query
211
+				 */
212
+				return [
213
+					'task',
214
+					'num' => $query->func()->count('*'),
215
+					'alltime' => $alltime,
216
+					'runtime' => $runtime,
217
+					'fetchdelay' => $fetchdelay
218
+				];
219
+			},
220
+			'conditions' => [
221
+				'completed IS NOT' => null
222
+			],
223
+			'group' => [
224
+				'task'
225
+			]
226
+		];
227
+		return $this->find('all', $options);
228
+	}
229
+
230
+	/**
231
+	 * Returns [
232
+	 * 'Task' => [
233
+	 * 'YYYY-MM-DD' => INT,
234
+	 * ...
235
+	 * ]
236
+	 * ]
237
+	 *
238
+	 * @param string|null $task
239
+	 * @return array
240
+	 */
241
+	public function getFullStats($taskName = null)
242
+	{
243
+		$driverName = $this->_getDriverName();
244
+		$fields = function (Query $query) use ($driverName) {
245
+			$runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
246
+			switch ($driverName) {
247
+				case static::DRIVER_SQLSERVER:
248
+					$runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
249
+					break;
250
+			}
251
+
252
+			return [
253
+				'task',
254
+				'created',
255
+				'duration' => $runtime
256
+			];
257
+		};
258
+
259
+		$conditions = [
260
+			'completed IS NOT' => null
261
+		];
262
+		if ($taskName) {
263
+			$conditions['task'] = $taskName;
264
+		}
265
+
266
+		$tasks = $this->find()
267
+			->select($fields)
268
+			->where($conditions)
269
+			->enableHydration(false)
270
+			->orderDesc('id')
271
+			->limit(static::STATS_LIMIT)
272
+			->all()
273
+			->toArray();
274
+
275
+		$result = [];
276
+
277
+		$days = [];
278
+
279
+		foreach ($tasks as $task) {
280
+			/** @var \DateTime $created */
281
+			$created = $task['created'];
282
+			$day = $created->format('Y-m-d');
283
+			if (! isset($days[$day])) {
284
+				$days[$day] = $day;
285
+			}
286
+
287
+			$result[$task['task']][$day][] = $task['duration'];
288
+		}
289
+
290
+		foreach ($result as $type => $tasks) {
291
+			foreach ($tasks as $day => $durations) {
292
+				$average = array_sum($durations) / count($durations);
293
+				$result[$type][$day] = (int) $average;
294
+			}
295
+
296
+			foreach ($days as $day) {
297
+				if (isset($result[$type][$day])) {
298
+					continue;
299
+				}
300
+
301
+				$result[$type][$day] = 0;
302
+			}
303
+
304
+			ksort($result[$type]);
305
+		}
306
+
307
+		return $result;
308
+	}
309
+
310
+	/**
311
+	 * Look for a new job that can be processed with the current abilities and
312
+	 * from the specified group (or any if null).
313
+	 *
314
+	 * @param array $capabilities
315
+	 *            Available QueueWorkerTasks.
316
+	 * @param array $types
317
+	 *            Request a job from these types (or exclude certain types), or any otherwise.
318
+	 * @return \Queue\Model\Entity\QueuedTask|null
319
+	 */
320
+	public function requestJob(array $capabilities, array $types = [])
321
+	{
322
+		$now = $this->getDateTime();
323
+		$nowStr = $now->toDateTimeString();
324
+		$driverName = $this->_getDriverName();
325
+
326
+		$query = $this->find();
327
+		$age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
328
+		switch ($driverName) {
329
+			case static::DRIVER_SQLSERVER:
330
+				$age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
331
+				break;
332
+			case static::DRIVER_POSTGRES:
333
+				$age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
334
+				break;
335
+		}
336
+		$options = [
337
+			'conditions' => [
338
+				'completed IS' => null,
339
+				'OR' => []
340
+			],
341
+			'fields' => [
342
+				'age' => $age
343
+			],
344
+			'order' => [
345
+				'age' => 'ASC',
346
+				'id' => 'ASC'
347
+			]
348
+		];
349
+
350
+		if ($types) {
351
+			$options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
352
+		}
353
+
354
+		// Generate the task specific conditions.
355
+		foreach ($capabilities as $task) {
356
+			list ($plugin, $name) = pluginSplit($task['name']);
357
+			$timeoutAt = $now->copy();
358
+			$tmp = [
359
+				'task' => $name,
360
+				'AND' => [
361
+					[
362
+						'OR' => [
363
+							'not_before <' => $nowStr,
364
+							'not_before IS' => null
365
+						]
366
+					],
367
+					[
368
+						'OR' => [
369
+							'fetched <' => $timeoutAt->subSeconds($task['timeout']),
370
+							'fetched IS' => null
371
+						]
372
+					]
373
+				],
374
+				'failed_count <' => ($task['retries'] + 1)
375
+			];
376
+			$options['conditions']['OR'][] = $tmp;
377
+		}
378
+
379
+		/** @var \Queue\Model\Entity\QueuedTask|null $task */
380
+		$task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
381
+			$task = $query->find('all', $options)
382
+				->enableAutoFields(true)
383
+				->epilog('FOR UPDATE')
384
+				->first();
385
+
386
+			if (! $task) {
387
+				return null;
388
+			}
389
+
390
+			$key = $this->key();
391
+			$task = $this->patchEntity($task, [
392
+				'worker_key' => $key,
393
+				'fetched' => $now
394
+			]);
395
+
396
+			return $this->saveOrFail($task);
397
+		});
398
+
399
+		if (! $task) {
400
+			return null;
401
+		}
402
+
403
+		return $task;
404
+	}
405
+
406
+	/**
407
+	 * Mark a task as Completed, removing it from the queue.
408
+	 *
409
+	 * @param \Queue\Model\Entity\QueuedTask $task
410
+	 *            Task
411
+	 * @return bool Success
412
+	 */
413
+	public function markJobDone(QueuedTask $task)
414
+	{
415
+		$fields = [
416
+			'completed' => $this->getDateTime()
417
+		];
418
+		$task = $this->patchEntity($task, $fields);
419
+
420
+		return (bool) $this->save($task);
421
+	}
422
+
423
+	/**
424
+	 * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
425
+	 *
426
+	 * @param \Queue\Model\Entity\QueuedTask $task
427
+	 *            Task
428
+	 * @param string|null $failureMessage
429
+	 *            Optional message to append to the failure_message field.
430
+	 * @return bool Success
431
+	 */
432
+	public function markJobFailed(QueuedTask $task, $failureMessage = null)
433
+	{
434
+		$fields = [
435
+			'failed_count' => $task->failed_count + 1,
436
+			'failure_message' => $failureMessage
437
+		];
438
+		$task = $this->patchEntity($task, $fields);
439
+
440
+		return (bool) $this->save($task);
441
+	}
442
+
443
+	/**
444
+	 * Reset current jobs
445
+	 *
446
+	 * @param int|null $id
447
+	 *
448
+	 * @return int Success
449
+	 */
450
+	public function reset($id = null)
451
+	{
452
+		$fields = [
453
+			'completed' => null,
454
+			'fetched' => null,
455
+			'failed_count' => 0,
456
+			'worker_key' => null,
457
+			'failure_message' => null
458
+		];
459
+		$conditions = [
460
+			'completed IS' => null
461
+		];
462
+		if ($id) {
463
+			$conditions['id'] = $id;
464
+		}
465
+
466
+		return $this->updateAll($fields, $conditions);
467
+	}
468
+
469
+	/**
470
+	 *
471
+	 * @param string $taskName
472
+	 *
473
+	 * @return int
474
+	 */
475
+	public function rerun($taskName)
476
+	{
477
+		$fields = [
478
+			'completed' => null,
479
+			'fetched' => null,
480
+			'failed_count' => 0,
481
+			'worker_key' => null,
482
+			'failure_message' => null
483
+		];
484
+		$conditions = [
485
+			'completed IS NOT' => null,
486
+			'task' => $taskName
487
+		];
488
+
489
+		return $this->updateAll($fields, $conditions);
490
+	}
491
+
492
+	/**
493
+	 * Cleanup/Delete Completed Tasks.
494
+	 *
495
+	 * @return void
496
+	 */
497
+	public function cleanOldJobs()
498
+	{
499
+		if (! Configure::read('Queue.cleanuptimeout')) {
500
+			return;
501
+		}
502
+
503
+		$this->deleteAll([
504
+			'completed <' => time() - (int) Configure::read('Queue.cleanuptimeout')
505
+		]);
506
+	}
507
+
508
+	/**
509
+	 *
510
+	 * @param \Queue\Model\Entity\QueuedTask $queuedTask
511
+	 * @param array $taskConfiguration
512
+	 * @return string
513
+	 */
514
+	public function getFailedStatus($queuedTask, array $taskConfiguration)
515
+	{
516
+		$failureMessageRequeued = 'requeued';
517
+
518
+		$queuedTaskName = 'Queue' . $queuedTask->task;
519
+		if (empty($taskConfiguration[$queuedTaskName])) {
520
+			return $failureMessageRequeued;
521
+		}
522
+		$retries = $taskConfiguration[$queuedTaskName]['retries'];
523
+		if ($queuedTask->failed_count <= $retries) {
524
+			return $failureMessageRequeued;
525
+		}
526
+
527
+		return 'aborted';
528
+	}
529
+
530
+	/**
531
+	 * Generates a unique Identifier for the current worker thread.
532
+	 *
533
+	 * Useful to identify the currently running processes for this thread.
534
+	 *
535
+	 * @return string Identifier
536
+	 */
537
+	public function key()
538
+	{
539
+		if ($this->_key !== null) {
540
+			return $this->_key;
541
+		}
542
+		$this->_key = sha1(microtime());
543
+		if (! $this->_key) {
544
+			throw new RuntimeException('Invalid key generated');
545
+		}
546
+
547
+		return $this->_key;
548
+	}
549
+
550
+	/**
551
+	 * Resets worker Identifier
552
+	 *
553
+	 * @return void
554
+	 */
555
+	public function clearKey()
556
+	{
557
+		$this->_key = null;
558
+	}
559
+
560
+	/**
561
+	 * truncate()
562
+	 *
563
+	 * @return void
564
+	 */
565
+	public function truncate()
566
+	{
567
+		$sql = $this->getSchema()->truncateSql($this->_connection);
568
+		foreach ($sql as $snippet) {
569
+			$this->_connection->execute($snippet);
570
+		}
571
+	}
572
+
573
+	/**
574
+	 * get the name of the driver
575
+	 *
576
+	 * @return string
577
+	 */
578
+	protected function _getDriverName()
579
+	{
580
+		$className = explode('\\', $this->getConnection()->config()['driver']);
581
+		$name = end($className);
582
+
583
+		return $name;
584
+	}
585
+
586
+	/**
587
+	 *
588
+	 * @param array $conditions
589
+	 * @param string $key
590
+	 * @param array $values
591
+	 * @return array
592
+	 */
593
+	protected function addFilter(array $conditions, $key, array $values)
594
+	{
595
+		$include = [];
596
+		$exclude = [];
597
+		foreach ($values as $value) {
598
+			if (substr($value, 0, 1) === '-') {
599
+				$exclude[] = substr($value, 1);
600
+			} else {
601
+				$include[] = $value;
602
+			}
603
+		}
604
+
605
+		if ($include) {
606
+			$conditions[$key . ' IN'] = $include;
607
+		}
608
+		if ($exclude) {
609
+			$conditions[$key . ' NOT IN'] = $exclude;
610
+		}
611
+
612
+		return $conditions;
613
+	}
614
+
615
+	/**
616
+	 * Returns a DateTime object from different input.
617
+	 *
618
+	 * Without argument this will be "now".
619
+	 *
620
+	 * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore
621
+	 *
622
+	 * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
623
+	 */
624
+	protected function getDateTime($notBefore = null)
625
+	{
626
+		if (is_object($notBefore)) {
627
+			return $notBefore;
628
+		}
629
+
630
+		return new FrozenTime($notBefore);
631
+	}
632 632
 }
Please login to merge, or discard this patch.
Spacing   +15 added lines, -15 removed lines patch added patch discarded remove patch
@@ -68,7 +68,7 @@  discard block
 block discarded – undo
68 68
     public static function defaultConnectionName()
69 69
     {
70 70
         $connection = Configure::read('Queue.connection');
71
-        if (! empty($connection)) {
71
+        if (!empty($connection)) {
72 72
             return $connection;
73 73
         }
74 74
 
@@ -108,7 +108,7 @@  discard block
 block discarded – undo
108 108
             'not_before' => $this->getDateTime()
109 109
         ];
110 110
 
111
-        if (! empty($notBefore)) {
111
+        if (!empty($notBefore)) {
112 112
             $task['not_before'] = $this->getDateTime(strtotime($notBefore));
113 113
         }
114 114
 
@@ -134,7 +134,7 @@  discard block
 block discarded – undo
134 134
             $conditions['task'] = $taskName;
135 135
         }
136 136
 
137
-        return (bool) $this->find()
137
+        return (bool)$this->find()
138 138
             ->where($conditions)
139 139
             ->select([
140 140
             'id'
@@ -194,7 +194,7 @@  discard block
 block discarded – undo
194 194
     {
195 195
         $driverName = $this->_getDriverName();
196 196
         $options = [
197
-            'fields' => function (Query $query) use ($driverName) {
197
+            'fields' => function(Query $query) use ($driverName) {
198 198
                 $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
199 199
                 $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
200 200
                 $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
@@ -241,7 +241,7 @@  discard block
 block discarded – undo
241 241
     public function getFullStats($taskName = null)
242 242
     {
243 243
         $driverName = $this->_getDriverName();
244
-        $fields = function (Query $query) use ($driverName) {
244
+        $fields = function(Query $query) use ($driverName) {
245 245
             $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
246 246
             switch ($driverName) {
247 247
                 case static::DRIVER_SQLSERVER:
@@ -280,7 +280,7 @@  discard block
 block discarded – undo
280 280
             /** @var \DateTime $created */
281 281
             $created = $task['created'];
282 282
             $day = $created->format('Y-m-d');
283
-            if (! isset($days[$day])) {
283
+            if (!isset($days[$day])) {
284 284
                 $days[$day] = $day;
285 285
             }
286 286
 
@@ -290,7 +290,7 @@  discard block
 block discarded – undo
290 290
         foreach ($result as $type => $tasks) {
291 291
             foreach ($tasks as $day => $durations) {
292 292
                 $average = array_sum($durations) / count($durations);
293
-                $result[$type][$day] = (int) $average;
293
+                $result[$type][$day] = (int)$average;
294 294
             }
295 295
 
296 296
             foreach ($days as $day) {
@@ -377,13 +377,13 @@  discard block
 block discarded – undo
377 377
         }
378 378
 
379 379
         /** @var \Queue\Model\Entity\QueuedTask|null $task */
380
-        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
380
+        $task = $this->getConnection()->transactional(function() use ($query, $options, $now) {
381 381
             $task = $query->find('all', $options)
382 382
                 ->enableAutoFields(true)
383 383
                 ->epilog('FOR UPDATE')
384 384
                 ->first();
385 385
 
386
-            if (! $task) {
386
+            if (!$task) {
387 387
                 return null;
388 388
             }
389 389
 
@@ -396,7 +396,7 @@  discard block
 block discarded – undo
396 396
             return $this->saveOrFail($task);
397 397
         });
398 398
 
399
-        if (! $task) {
399
+        if (!$task) {
400 400
             return null;
401 401
         }
402 402
 
@@ -417,7 +417,7 @@  discard block
 block discarded – undo
417 417
         ];
418 418
         $task = $this->patchEntity($task, $fields);
419 419
 
420
-        return (bool) $this->save($task);
420
+        return (bool)$this->save($task);
421 421
     }
422 422
 
423 423
     /**
@@ -437,7 +437,7 @@  discard block
 block discarded – undo
437 437
         ];
438 438
         $task = $this->patchEntity($task, $fields);
439 439
 
440
-        return (bool) $this->save($task);
440
+        return (bool)$this->save($task);
441 441
     }
442 442
 
443 443
     /**
@@ -496,12 +496,12 @@  discard block
 block discarded – undo
496 496
      */
497 497
     public function cleanOldJobs()
498 498
     {
499
-        if (! Configure::read('Queue.cleanuptimeout')) {
499
+        if (!Configure::read('Queue.cleanuptimeout')) {
500 500
             return;
501 501
         }
502 502
 
503 503
         $this->deleteAll([
504
-            'completed <' => time() - (int) Configure::read('Queue.cleanuptimeout')
504
+            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
505 505
         ]);
506 506
     }
507 507
 
@@ -540,7 +540,7 @@  discard block
 block discarded – undo
540 540
             return $this->_key;
541 541
         }
542 542
         $this->_key = sha1(microtime());
543
-        if (! $this->_key) {
543
+        if (!$this->_key) {
544 544
             throw new RuntimeException('Invalid key generated');
545 545
         }
546 546
 
Please login to merge, or discard this patch.