Completed
Pull Request — master (#35)
by
unknown
03:07
created
src/Model/Table/QueuedTasksTable.php 2 patches
Indentation   +591 added lines, -591 removed lines patch added patch discarded remove patch
@@ -28,595 +28,595 @@
 block discarded – undo
28 28
 class QueuedTasksTable extends Table
29 29
 {
30 30
 
31
-    const DRIVER_MYSQL = 'Mysql';
32
-
33
-    const DRIVER_POSTGRES = 'Postgres';
34
-
35
-    const DRIVER_SQLSERVER = 'Sqlserver';
36
-
37
-    const STATS_LIMIT = 100000;
38
-
39
-    /**
40
-     *
41
-     * @var string|null
42
-     */
43
-    protected $_key;
44
-
45
-    /**
46
-     * Initialize method
47
-     *
48
-     * @param array $config The configuration for the Table.
49
-     * @return void
50
-     */
51
-    public function initialize(array $config)
52
-    {
53
-        parent::initialize($config);
54
-
55
-        $this->setTable('queued_tasks');
56
-        $this->setDisplayField('id');
57
-        $this->setPrimaryKey('id');
58
-
59
-        $this->addBehavior('Timestamp');
60
-    }
61
-
62
-    /**
63
-     * set connection name
64
-     *
65
-     * @return string
66
-     */
67
-    public static function defaultConnectionName()
68
-    {
69
-        $connection = Configure::read('Queue.connection');
70
-        if (!empty($connection)) {
71
-            return $connection;
72
-        }
73
-
74
-        return parent::defaultConnectionName();
75
-    }
76
-
77
-    /**
78
-     *
79
-     * @param \Cake\Event\Event $event
80
-     * @param \ArrayObject $data
81
-     * @param \ArrayObject $options
82
-     * @return void
83
-     */
84
-    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
85
-    {
86
-        if (isset($data['data']) && $data['data'] === '') {
87
-            $data['data'] = null;
88
-        }
89
-    }
90
-
91
-    /**
92
-     * Adds a new job to the queue.
93
-     *
94
-     * @param string $taskName Task name
95
-     * @param array|null $data Array of data
96
-     * @param string $notBefore A datetime which indicates when the job may be executed
97
-     * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
-     */
99
-    public function createJob($taskName, array $data = null, string $notBefore = null)
100
-    {
101
-        $task = [
102
-            'task' => $taskName,
103
-            'data' => serialize($data),
104
-            'not_before' => $this->getDateTime()
105
-        ];
106
-
107
-        if (!empty($notBefore)) {
108
-            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
-        }
110
-
111
-        $queuedTask = $this->newEntity($task);
112
-
113
-        return $this->saveOrFail($queuedTask);
114
-    }
115
-
116
-    /**
117
-     *
118
-     * @param string|null $taskName
119
-     *
120
-     * @return bool
121
-     *
122
-     * @throws \InvalidArgumentException
123
-     */
124
-    public function isQueued($taskName = null)
125
-    {
126
-        $conditions = [
127
-            'completed IS' => null
128
-        ];
129
-        if ($taskName) {
130
-            $conditions['task'] = $taskName;
131
-        }
132
-
133
-        return (bool)$this->find()
134
-            ->where($conditions)
135
-            ->select([
136
-            'id'
137
-        ])
138
-            ->first();
139
-    }
140
-
141
-    /**
142
-     * Returns the number of items in the queue.
143
-     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
144
-     *
145
-     * @param string|null $taskName Task type to Count
146
-     * @return int
147
-     */
148
-    public function getLength($taskName = null)
149
-    {
150
-        $findConf = [
151
-            'conditions' => [
152
-                'completed IS' => null
153
-            ]
154
-        ];
155
-        if ($taskName !== null) {
156
-            $findConf['conditions']['task'] = $taskName;
157
-        }
158
-
159
-        return $this->find('all', $findConf)->count();
160
-    }
161
-
162
-    /**
163
-     * Return a list of all task types in the Queue.
164
-     *
165
-     * @return \Cake\ORM\Query
166
-     */
167
-    public function getTypes()
168
-    {
169
-        $findCond = [
170
-            'fields' => [
171
-                'task'
172
-            ],
173
-            'group' => [
174
-                'task'
175
-            ],
176
-            'keyField' => 'task',
177
-            'valueField' => 'task'
178
-        ];
179
-        return $this->find('list', $findCond);
180
-    }
181
-
182
-    /**
183
-     * Return some statistics about finished jobs still in the Database.
184
-     * TO-DO: rewrite as virtual field
185
-     *
186
-     * @return \Cake\ORM\Query
187
-     */
188
-    public function getStats()
189
-    {
190
-        $driverName = $this->_getDriverName();
191
-        $options = [
192
-            'fields' => function (Query $query) use ($driverName) {
193
-                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
194
-                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
195
-                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
196
-                switch ($driverName) {
197
-                    case static::DRIVER_SQLSERVER:
198
-                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
199
-                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
200
-                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
201
-                        break;
202
-                }
203
-                /**
204
-                 *
205
-                 * @var \Cake\ORM\Query
206
-                 */
207
-                return [
208
-                    'task',
209
-                    'num' => $query->func()->count('*'),
210
-                    'alltime' => $alltime,
211
-                    'runtime' => $runtime,
212
-                    'fetchdelay' => $fetchdelay
213
-                ];
214
-            },
215
-            'conditions' => [
216
-                'completed IS NOT' => null
217
-            ],
218
-            'group' => [
219
-                'task'
220
-            ]
221
-        ];
222
-        return $this->find('all', $options);
223
-    }
224
-
225
-    /**
226
-     * Returns [
227
-     * 'Task' => [
228
-     * 'YYYY-MM-DD' => INT,
229
-     * ...
230
-     * ]
231
-     * ]
232
-     *
233
-     * @param string|null $task
234
-     * @return array
235
-     */
236
-    public function getFullStats($taskName = null)
237
-    {
238
-        $driverName = $this->_getDriverName();
239
-        $fields = function (Query $query) use ($driverName) {
240
-            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
241
-            switch ($driverName) {
242
-                case static::DRIVER_SQLSERVER:
243
-                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
244
-                    break;
245
-            }
246
-
247
-            return [
248
-                'task',
249
-                'created',
250
-                'duration' => $runtime
251
-            ];
252
-        };
253
-
254
-        $conditions = [
255
-            'completed IS NOT' => null
256
-        ];
257
-        if ($taskName) {
258
-            $conditions['task'] = $taskName;
259
-        }
260
-
261
-        $tasks = $this->find()
262
-            ->select($fields)
263
-            ->where($conditions)
264
-            ->enableHydration(false)
265
-            ->orderDesc('id')
266
-            ->limit(static::STATS_LIMIT)
267
-            ->all()
268
-            ->toArray();
269
-
270
-        $result = [];
271
-
272
-        $days = [];
273
-
274
-        foreach ($tasks as $task) {
275
-            /** @var \DateTime $created */
276
-            $created = $task['created'];
277
-            $day = $created->format('Y-m-d');
278
-            if (!isset($days[$day])) {
279
-                $days[$day] = $day;
280
-            }
281
-
282
-            $result[$task['task']][$day][] = $task['duration'];
283
-        }
284
-
285
-        foreach ($result as $type => $tasks) {
286
-            foreach ($tasks as $day => $durations) {
287
-                $average = array_sum($durations) / count($durations);
288
-                $result[$type][$day] = (int)$average;
289
-            }
290
-
291
-            foreach ($days as $day) {
292
-                if (isset($result[$type][$day])) {
293
-                    continue;
294
-                }
295
-
296
-                $result[$type][$day] = 0;
297
-            }
298
-
299
-            ksort($result[$type]);
300
-        }
301
-
302
-        return $result;
303
-    }
304
-
305
-    /**
306
-     * Look for a new job that can be processed with the current abilities and
307
-     * from the specified group (or any if null).
308
-     *
309
-     * @param array $capabilities Available QueueWorkerTasks.
310
-     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
311
-     * @return \Queue\Model\Entity\QueuedTask|null
312
-     */
313
-    public function requestJob(array $capabilities, array $types = [])
314
-    {
315
-        $now = $this->getDateTime();
316
-        $nowStr = $now->toDateTimeString();
317
-        $driverName = $this->_getDriverName();
318
-
319
-        $query = $this->find();
320
-        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
321
-        switch ($driverName) {
322
-            case static::DRIVER_SQLSERVER:
323
-                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
324
-                break;
325
-            case static::DRIVER_POSTGRES:
326
-                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
327
-                break;
328
-        }
329
-        $options = [
330
-            'conditions' => [
331
-                'completed IS' => null,
332
-                'OR' => []
333
-            ],
334
-            'fields' => [
335
-                'age' => $age
336
-            ],
337
-            'order' => [
338
-                'age' => 'ASC',
339
-                'id' => 'ASC'
340
-            ]
341
-        ];
342
-
343
-        if ($types) {
344
-            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
345
-        }
346
-
347
-        // Generate the task specific conditions.
348
-        foreach ($capabilities as $task) {
349
-            list ($plugin, $name) = pluginSplit($task['name']);
350
-            $timeoutAt = $now->copy();
351
-            $tmp = [
352
-                'task' => $name,
353
-                'AND' => [
354
-                    [
355
-                        'OR' => [
356
-                            'not_before <' => $nowStr,
357
-                            'not_before IS' => null
358
-                        ]
359
-                    ],
360
-                    [
361
-                        'OR' => [
362
-                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
363
-                            'fetched IS' => null
364
-                        ]
365
-                    ]
366
-                ],
367
-                'failed_count <' => ($task['retries'] + 1)
368
-            ];
369
-            $options['conditions']['OR'][] = $tmp;
370
-        }
371
-
372
-        /** @var \Queue\Model\Entity\QueuedTask|null $task */
373
-        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
374
-            $task = $query->find('all', $options)
375
-                ->enableAutoFields(true)
376
-                ->epilog('FOR UPDATE')
377
-                ->first();
378
-
379
-            if (!$task) {
380
-                return null;
381
-            }
382
-
383
-            $key = $this->key();
384
-            $task = $this->patchEntity($task, [
385
-                'worker_key' => $key,
386
-                'fetched' => $now
387
-            ]);
388
-
389
-            return $this->saveOrFail($task);
390
-        });
391
-
392
-        if (!$task) {
393
-            return null;
394
-        }
395
-
396
-        return $task;
397
-    }
398
-
399
-    /**
400
-     * Mark a task as Completed, removing it from the queue.
401
-     *
402
-     * @param \Queue\Model\Entity\QueuedTask $task Task
403
-     * @return bool Success
404
-     */
405
-    public function markJobDone(QueuedTask $task)
406
-    {
407
-        $fields = [
408
-            'completed' => $this->getDateTime()
409
-        ];
410
-        $task = $this->patchEntity($task, $fields);
411
-
412
-        return (bool)$this->save($task);
413
-    }
414
-
415
-    /**
416
-     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
417
-     *
418
-     * @param \Queue\Model\Entity\QueuedTask $task Task
419
-     * @param string|null $failureMessage Optional message to append to the failure_message field.
420
-     * @return bool Success
421
-     */
422
-    public function markJobFailed(QueuedTask $task, $failureMessage = null)
423
-    {
424
-        $fields = [
425
-            'failed_count' => $task->failed_count + 1,
426
-            'failure_message' => $failureMessage
427
-        ];
428
-        $task = $this->patchEntity($task, $fields);
429
-
430
-        return (bool)$this->save($task);
431
-    }
432
-
433
-    /**
434
-     * Reset current jobs
435
-     *
436
-     * @param int|null $id
437
-     *
438
-     * @return int Success
439
-     */
440
-    public function reset($id = null)
441
-    {
442
-        $fields = [
443
-            'completed' => null,
444
-            'fetched' => null,
445
-            'failed_count' => 0,
446
-            'worker_key' => null,
447
-            'failure_message' => null
448
-        ];
449
-        $conditions = [
450
-            'completed IS' => null
451
-        ];
452
-        if ($id) {
453
-            $conditions['id'] = $id;
454
-        }
455
-
456
-        return $this->updateAll($fields, $conditions);
457
-    }
458
-
459
-    /**
460
-     *
461
-     * @param string $taskName
462
-     *
463
-     * @return int
464
-     */
465
-    public function rerun($taskName)
466
-    {
467
-        $fields = [
468
-            'completed' => null,
469
-            'fetched' => null,
470
-            'failed_count' => 0,
471
-            'worker_key' => null,
472
-            'failure_message' => null
473
-        ];
474
-        $conditions = [
475
-            'completed IS NOT' => null,
476
-            'task' => $taskName
477
-        ];
478
-
479
-        return $this->updateAll($fields, $conditions);
480
-    }
481
-
482
-    /**
483
-     * Cleanup/Delete Completed Tasks.
484
-     *
485
-     * @return void
486
-     */
487
-    public function cleanOldJobs()
488
-    {
489
-        if (!Configure::read('Queue.cleanuptimeout')) {
490
-            return;
491
-        }
492
-
493
-        $this->deleteAll([
494
-            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
495
-        ]);
496
-    }
497
-
498
-    /**
499
-     *
500
-     * @param \Queue\Model\Entity\QueuedTask $queuedTask
501
-     * @param array $taskConfiguration
502
-     * @return string
503
-     */
504
-    public function getFailedStatus($queuedTask, array $taskConfiguration)
505
-    {
506
-        $failureMessageRequeued = 'requeued';
507
-
508
-        $queuedTaskName = 'Queue' . $queuedTask->task;
509
-        if (empty($taskConfiguration[$queuedTaskName])) {
510
-            return $failureMessageRequeued;
511
-        }
512
-        $retries = $taskConfiguration[$queuedTaskName]['retries'];
513
-        if ($queuedTask->failed_count <= $retries) {
514
-            return $failureMessageRequeued;
515
-        }
516
-
517
-        return 'aborted';
518
-    }
519
-
520
-    /**
521
-     * Generates a unique Identifier for the current worker thread.
522
-     *
523
-     * Useful to identify the currently running processes for this thread.
524
-     *
525
-     * @return string Identifier
526
-     */
527
-    public function key()
528
-    {
529
-        if ($this->_key !== null) {
530
-            return $this->_key;
531
-        }
532
-        $this->_key = sha1(microtime());
533
-        if (!$this->_key) {
534
-            throw new RuntimeException('Invalid key generated');
535
-        }
536
-
537
-        return $this->_key;
538
-    }
539
-
540
-    /**
541
-     * Resets worker Identifier
542
-     *
543
-     * @return void
544
-     */
545
-    public function clearKey()
546
-    {
547
-        $this->_key = null;
548
-    }
549
-
550
-    /**
551
-     * truncate()
552
-     *
553
-     * @return void
554
-     */
555
-    public function truncate()
556
-    {
557
-        $sql = $this->getSchema()->truncateSql($this->_connection);
558
-        foreach ($sql as $snippet) {
559
-            $this->_connection->execute($snippet);
560
-        }
561
-    }
562
-
563
-    /**
564
-     * get the name of the driver
565
-     *
566
-     * @return string
567
-     */
568
-    protected function _getDriverName()
569
-    {
570
-        $className = explode('\\', $this->getConnection()->config()['driver']);
571
-        $name = end($className);
572
-
573
-        return $name;
574
-    }
575
-
576
-    /**
577
-     *
578
-     * @param array $conditions
579
-     * @param string $key
580
-     * @param array $values
581
-     * @return array
582
-     */
583
-    protected function addFilter(array $conditions, $key, array $values)
584
-    {
585
-        $include = [];
586
-        $exclude = [];
587
-        foreach ($values as $value) {
588
-            if (substr($value, 0, 1) === '-') {
589
-                $exclude[] = substr($value, 1);
590
-            } else {
591
-                $include[] = $value;
592
-            }
593
-        }
594
-
595
-        if ($include) {
596
-            $conditions[$key . ' IN'] = $include;
597
-        }
598
-        if ($exclude) {
599
-            $conditions[$key . ' NOT IN'] = $exclude;
600
-        }
601
-
602
-        return $conditions;
603
-    }
604
-
605
-    /**
606
-     * Returns a DateTime object from different input.
607
-     *
608
-     * Without argument this will be "now".
609
-     *
610
-     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore
611
-     *
612
-     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
613
-     */
614
-    protected function getDateTime($notBefore = null)
615
-    {
616
-        if (is_object($notBefore)) {
617
-            return $notBefore;
618
-        }
619
-
620
-        return new FrozenTime($notBefore);
621
-    }
31
+	const DRIVER_MYSQL = 'Mysql';
32
+
33
+	const DRIVER_POSTGRES = 'Postgres';
34
+
35
+	const DRIVER_SQLSERVER = 'Sqlserver';
36
+
37
+	const STATS_LIMIT = 100000;
38
+
39
+	/**
40
+	 *
41
+	 * @var string|null
42
+	 */
43
+	protected $_key;
44
+
45
+	/**
46
+	 * Initialize method
47
+	 *
48
+	 * @param array $config The configuration for the Table.
49
+	 * @return void
50
+	 */
51
+	public function initialize(array $config)
52
+	{
53
+		parent::initialize($config);
54
+
55
+		$this->setTable('queued_tasks');
56
+		$this->setDisplayField('id');
57
+		$this->setPrimaryKey('id');
58
+
59
+		$this->addBehavior('Timestamp');
60
+	}
61
+
62
+	/**
63
+	 * set connection name
64
+	 *
65
+	 * @return string
66
+	 */
67
+	public static function defaultConnectionName()
68
+	{
69
+		$connection = Configure::read('Queue.connection');
70
+		if (!empty($connection)) {
71
+			return $connection;
72
+		}
73
+
74
+		return parent::defaultConnectionName();
75
+	}
76
+
77
+	/**
78
+	 *
79
+	 * @param \Cake\Event\Event $event
80
+	 * @param \ArrayObject $data
81
+	 * @param \ArrayObject $options
82
+	 * @return void
83
+	 */
84
+	public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
85
+	{
86
+		if (isset($data['data']) && $data['data'] === '') {
87
+			$data['data'] = null;
88
+		}
89
+	}
90
+
91
+	/**
92
+	 * Adds a new job to the queue.
93
+	 *
94
+	 * @param string $taskName Task name
95
+	 * @param array|null $data Array of data
96
+	 * @param string $notBefore A datetime which indicates when the job may be executed
97
+	 * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
+	 */
99
+	public function createJob($taskName, array $data = null, string $notBefore = null)
100
+	{
101
+		$task = [
102
+			'task' => $taskName,
103
+			'data' => serialize($data),
104
+			'not_before' => $this->getDateTime()
105
+		];
106
+
107
+		if (!empty($notBefore)) {
108
+			$task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
+		}
110
+
111
+		$queuedTask = $this->newEntity($task);
112
+
113
+		return $this->saveOrFail($queuedTask);
114
+	}
115
+
116
+	/**
117
+	 *
118
+	 * @param string|null $taskName
119
+	 *
120
+	 * @return bool
121
+	 *
122
+	 * @throws \InvalidArgumentException
123
+	 */
124
+	public function isQueued($taskName = null)
125
+	{
126
+		$conditions = [
127
+			'completed IS' => null
128
+		];
129
+		if ($taskName) {
130
+			$conditions['task'] = $taskName;
131
+		}
132
+
133
+		return (bool)$this->find()
134
+			->where($conditions)
135
+			->select([
136
+			'id'
137
+		])
138
+			->first();
139
+	}
140
+
141
+	/**
142
+	 * Returns the number of items in the queue.
143
+	 * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
144
+	 *
145
+	 * @param string|null $taskName Task type to Count
146
+	 * @return int
147
+	 */
148
+	public function getLength($taskName = null)
149
+	{
150
+		$findConf = [
151
+			'conditions' => [
152
+				'completed IS' => null
153
+			]
154
+		];
155
+		if ($taskName !== null) {
156
+			$findConf['conditions']['task'] = $taskName;
157
+		}
158
+
159
+		return $this->find('all', $findConf)->count();
160
+	}
161
+
162
+	/**
163
+	 * Return a list of all task types in the Queue.
164
+	 *
165
+	 * @return \Cake\ORM\Query
166
+	 */
167
+	public function getTypes()
168
+	{
169
+		$findCond = [
170
+			'fields' => [
171
+				'task'
172
+			],
173
+			'group' => [
174
+				'task'
175
+			],
176
+			'keyField' => 'task',
177
+			'valueField' => 'task'
178
+		];
179
+		return $this->find('list', $findCond);
180
+	}
181
+
182
+	/**
183
+	 * Return some statistics about finished jobs still in the Database.
184
+	 * TO-DO: rewrite as virtual field
185
+	 *
186
+	 * @return \Cake\ORM\Query
187
+	 */
188
+	public function getStats()
189
+	{
190
+		$driverName = $this->_getDriverName();
191
+		$options = [
192
+			'fields' => function (Query $query) use ($driverName) {
193
+				$alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
194
+				$runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
195
+				$fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
196
+				switch ($driverName) {
197
+					case static::DRIVER_SQLSERVER:
198
+						$alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
199
+						$runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
200
+						$fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
201
+						break;
202
+				}
203
+				/**
204
+				 *
205
+				 * @var \Cake\ORM\Query
206
+				 */
207
+				return [
208
+					'task',
209
+					'num' => $query->func()->count('*'),
210
+					'alltime' => $alltime,
211
+					'runtime' => $runtime,
212
+					'fetchdelay' => $fetchdelay
213
+				];
214
+			},
215
+			'conditions' => [
216
+				'completed IS NOT' => null
217
+			],
218
+			'group' => [
219
+				'task'
220
+			]
221
+		];
222
+		return $this->find('all', $options);
223
+	}
224
+
225
+	/**
226
+	 * Returns [
227
+	 * 'Task' => [
228
+	 * 'YYYY-MM-DD' => INT,
229
+	 * ...
230
+	 * ]
231
+	 * ]
232
+	 *
233
+	 * @param string|null $task
234
+	 * @return array
235
+	 */
236
+	public function getFullStats($taskName = null)
237
+	{
238
+		$driverName = $this->_getDriverName();
239
+		$fields = function (Query $query) use ($driverName) {
240
+			$runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
241
+			switch ($driverName) {
242
+				case static::DRIVER_SQLSERVER:
243
+					$runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
244
+					break;
245
+			}
246
+
247
+			return [
248
+				'task',
249
+				'created',
250
+				'duration' => $runtime
251
+			];
252
+		};
253
+
254
+		$conditions = [
255
+			'completed IS NOT' => null
256
+		];
257
+		if ($taskName) {
258
+			$conditions['task'] = $taskName;
259
+		}
260
+
261
+		$tasks = $this->find()
262
+			->select($fields)
263
+			->where($conditions)
264
+			->enableHydration(false)
265
+			->orderDesc('id')
266
+			->limit(static::STATS_LIMIT)
267
+			->all()
268
+			->toArray();
269
+
270
+		$result = [];
271
+
272
+		$days = [];
273
+
274
+		foreach ($tasks as $task) {
275
+			/** @var \DateTime $created */
276
+			$created = $task['created'];
277
+			$day = $created->format('Y-m-d');
278
+			if (!isset($days[$day])) {
279
+				$days[$day] = $day;
280
+			}
281
+
282
+			$result[$task['task']][$day][] = $task['duration'];
283
+		}
284
+
285
+		foreach ($result as $type => $tasks) {
286
+			foreach ($tasks as $day => $durations) {
287
+				$average = array_sum($durations) / count($durations);
288
+				$result[$type][$day] = (int)$average;
289
+			}
290
+
291
+			foreach ($days as $day) {
292
+				if (isset($result[$type][$day])) {
293
+					continue;
294
+				}
295
+
296
+				$result[$type][$day] = 0;
297
+			}
298
+
299
+			ksort($result[$type]);
300
+		}
301
+
302
+		return $result;
303
+	}
304
+
305
+	/**
306
+	 * Look for a new job that can be processed with the current abilities and
307
+	 * from the specified group (or any if null).
308
+	 *
309
+	 * @param array $capabilities Available QueueWorkerTasks.
310
+	 * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
311
+	 * @return \Queue\Model\Entity\QueuedTask|null
312
+	 */
313
+	public function requestJob(array $capabilities, array $types = [])
314
+	{
315
+		$now = $this->getDateTime();
316
+		$nowStr = $now->toDateTimeString();
317
+		$driverName = $this->_getDriverName();
318
+
319
+		$query = $this->find();
320
+		$age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
321
+		switch ($driverName) {
322
+			case static::DRIVER_SQLSERVER:
323
+				$age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
324
+				break;
325
+			case static::DRIVER_POSTGRES:
326
+				$age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
327
+				break;
328
+		}
329
+		$options = [
330
+			'conditions' => [
331
+				'completed IS' => null,
332
+				'OR' => []
333
+			],
334
+			'fields' => [
335
+				'age' => $age
336
+			],
337
+			'order' => [
338
+				'age' => 'ASC',
339
+				'id' => 'ASC'
340
+			]
341
+		];
342
+
343
+		if ($types) {
344
+			$options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
345
+		}
346
+
347
+		// Generate the task specific conditions.
348
+		foreach ($capabilities as $task) {
349
+			list ($plugin, $name) = pluginSplit($task['name']);
350
+			$timeoutAt = $now->copy();
351
+			$tmp = [
352
+				'task' => $name,
353
+				'AND' => [
354
+					[
355
+						'OR' => [
356
+							'not_before <' => $nowStr,
357
+							'not_before IS' => null
358
+						]
359
+					],
360
+					[
361
+						'OR' => [
362
+							'fetched <' => $timeoutAt->subSeconds($task['timeout']),
363
+							'fetched IS' => null
364
+						]
365
+					]
366
+				],
367
+				'failed_count <' => ($task['retries'] + 1)
368
+			];
369
+			$options['conditions']['OR'][] = $tmp;
370
+		}
371
+
372
+		/** @var \Queue\Model\Entity\QueuedTask|null $task */
373
+		$task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
374
+			$task = $query->find('all', $options)
375
+				->enableAutoFields(true)
376
+				->epilog('FOR UPDATE')
377
+				->first();
378
+
379
+			if (!$task) {
380
+				return null;
381
+			}
382
+
383
+			$key = $this->key();
384
+			$task = $this->patchEntity($task, [
385
+				'worker_key' => $key,
386
+				'fetched' => $now
387
+			]);
388
+
389
+			return $this->saveOrFail($task);
390
+		});
391
+
392
+		if (!$task) {
393
+			return null;
394
+		}
395
+
396
+		return $task;
397
+	}
398
+
399
+	/**
400
+	 * Mark a task as Completed, removing it from the queue.
401
+	 *
402
+	 * @param \Queue\Model\Entity\QueuedTask $task Task
403
+	 * @return bool Success
404
+	 */
405
+	public function markJobDone(QueuedTask $task)
406
+	{
407
+		$fields = [
408
+			'completed' => $this->getDateTime()
409
+		];
410
+		$task = $this->patchEntity($task, $fields);
411
+
412
+		return (bool)$this->save($task);
413
+	}
414
+
415
+	/**
416
+	 * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
417
+	 *
418
+	 * @param \Queue\Model\Entity\QueuedTask $task Task
419
+	 * @param string|null $failureMessage Optional message to append to the failure_message field.
420
+	 * @return bool Success
421
+	 */
422
+	public function markJobFailed(QueuedTask $task, $failureMessage = null)
423
+	{
424
+		$fields = [
425
+			'failed_count' => $task->failed_count + 1,
426
+			'failure_message' => $failureMessage
427
+		];
428
+		$task = $this->patchEntity($task, $fields);
429
+
430
+		return (bool)$this->save($task);
431
+	}
432
+
433
+	/**
434
+	 * Reset current jobs
435
+	 *
436
+	 * @param int|null $id
437
+	 *
438
+	 * @return int Success
439
+	 */
440
+	public function reset($id = null)
441
+	{
442
+		$fields = [
443
+			'completed' => null,
444
+			'fetched' => null,
445
+			'failed_count' => 0,
446
+			'worker_key' => null,
447
+			'failure_message' => null
448
+		];
449
+		$conditions = [
450
+			'completed IS' => null
451
+		];
452
+		if ($id) {
453
+			$conditions['id'] = $id;
454
+		}
455
+
456
+		return $this->updateAll($fields, $conditions);
457
+	}
458
+
459
+	/**
460
+	 *
461
+	 * @param string $taskName
462
+	 *
463
+	 * @return int
464
+	 */
465
+	public function rerun($taskName)
466
+	{
467
+		$fields = [
468
+			'completed' => null,
469
+			'fetched' => null,
470
+			'failed_count' => 0,
471
+			'worker_key' => null,
472
+			'failure_message' => null
473
+		];
474
+		$conditions = [
475
+			'completed IS NOT' => null,
476
+			'task' => $taskName
477
+		];
478
+
479
+		return $this->updateAll($fields, $conditions);
480
+	}
481
+
482
+	/**
483
+	 * Cleanup/Delete Completed Tasks.
484
+	 *
485
+	 * @return void
486
+	 */
487
+	public function cleanOldJobs()
488
+	{
489
+		if (!Configure::read('Queue.cleanuptimeout')) {
490
+			return;
491
+		}
492
+
493
+		$this->deleteAll([
494
+			'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
495
+		]);
496
+	}
497
+
498
+	/**
499
+	 *
500
+	 * @param \Queue\Model\Entity\QueuedTask $queuedTask
501
+	 * @param array $taskConfiguration
502
+	 * @return string
503
+	 */
504
+	public function getFailedStatus($queuedTask, array $taskConfiguration)
505
+	{
506
+		$failureMessageRequeued = 'requeued';
507
+
508
+		$queuedTaskName = 'Queue' . $queuedTask->task;
509
+		if (empty($taskConfiguration[$queuedTaskName])) {
510
+			return $failureMessageRequeued;
511
+		}
512
+		$retries = $taskConfiguration[$queuedTaskName]['retries'];
513
+		if ($queuedTask->failed_count <= $retries) {
514
+			return $failureMessageRequeued;
515
+		}
516
+
517
+		return 'aborted';
518
+	}
519
+
520
+	/**
521
+	 * Generates a unique Identifier for the current worker thread.
522
+	 *
523
+	 * Useful to identify the currently running processes for this thread.
524
+	 *
525
+	 * @return string Identifier
526
+	 */
527
+	public function key()
528
+	{
529
+		if ($this->_key !== null) {
530
+			return $this->_key;
531
+		}
532
+		$this->_key = sha1(microtime());
533
+		if (!$this->_key) {
534
+			throw new RuntimeException('Invalid key generated');
535
+		}
536
+
537
+		return $this->_key;
538
+	}
539
+
540
+	/**
541
+	 * Resets worker Identifier
542
+	 *
543
+	 * @return void
544
+	 */
545
+	public function clearKey()
546
+	{
547
+		$this->_key = null;
548
+	}
549
+
550
+	/**
551
+	 * truncate()
552
+	 *
553
+	 * @return void
554
+	 */
555
+	public function truncate()
556
+	{
557
+		$sql = $this->getSchema()->truncateSql($this->_connection);
558
+		foreach ($sql as $snippet) {
559
+			$this->_connection->execute($snippet);
560
+		}
561
+	}
562
+
563
+	/**
564
+	 * get the name of the driver
565
+	 *
566
+	 * @return string
567
+	 */
568
+	protected function _getDriverName()
569
+	{
570
+		$className = explode('\\', $this->getConnection()->config()['driver']);
571
+		$name = end($className);
572
+
573
+		return $name;
574
+	}
575
+
576
+	/**
577
+	 *
578
+	 * @param array $conditions
579
+	 * @param string $key
580
+	 * @param array $values
581
+	 * @return array
582
+	 */
583
+	protected function addFilter(array $conditions, $key, array $values)
584
+	{
585
+		$include = [];
586
+		$exclude = [];
587
+		foreach ($values as $value) {
588
+			if (substr($value, 0, 1) === '-') {
589
+				$exclude[] = substr($value, 1);
590
+			} else {
591
+				$include[] = $value;
592
+			}
593
+		}
594
+
595
+		if ($include) {
596
+			$conditions[$key . ' IN'] = $include;
597
+		}
598
+		if ($exclude) {
599
+			$conditions[$key . ' NOT IN'] = $exclude;
600
+		}
601
+
602
+		return $conditions;
603
+	}
604
+
605
+	/**
606
+	 * Returns a DateTime object from different input.
607
+	 *
608
+	 * Without argument this will be "now".
609
+	 *
610
+	 * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore
611
+	 *
612
+	 * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
613
+	 */
614
+	protected function getDateTime($notBefore = null)
615
+	{
616
+		if (is_object($notBefore)) {
617
+			return $notBefore;
618
+		}
619
+
620
+		return new FrozenTime($notBefore);
621
+	}
622 622
 }
Please login to merge, or discard this patch.
Spacing   +3 added lines, -3 removed lines patch added patch discarded remove patch
@@ -189,7 +189,7 @@  discard block
 block discarded – undo
189 189
     {
190 190
         $driverName = $this->_getDriverName();
191 191
         $options = [
192
-            'fields' => function (Query $query) use ($driverName) {
192
+            'fields' => function(Query $query) use ($driverName) {
193 193
                 $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
194 194
                 $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
195 195
                 $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
@@ -236,7 +236,7 @@  discard block
 block discarded – undo
236 236
     public function getFullStats($taskName = null)
237 237
     {
238 238
         $driverName = $this->_getDriverName();
239
-        $fields = function (Query $query) use ($driverName) {
239
+        $fields = function(Query $query) use ($driverName) {
240 240
             $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
241 241
             switch ($driverName) {
242 242
                 case static::DRIVER_SQLSERVER:
@@ -370,7 +370,7 @@  discard block
 block discarded – undo
370 370
         }
371 371
 
372 372
         /** @var \Queue\Model\Entity\QueuedTask|null $task */
373
-        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
373
+        $task = $this->getConnection()->transactional(function() use ($query, $options, $now) {
374 374
             $task = $query->find('all', $options)
375 375
                 ->enableAutoFields(true)
376 376
                 ->epilog('FOR UPDATE')
Please login to merge, or discard this patch.
tests/bootstrap.php 1 patch
Indentation   +52 added lines, -52 removed lines patch added patch discarded remove patch
@@ -2,14 +2,14 @@  discard block
 block discarded – undo
2 2
 use Cake\Datasource\ConnectionManager;
3 3
 
4 4
 if (!defined('DS')) {
5
-    define('DS', DIRECTORY_SEPARATOR);
5
+	define('DS', DIRECTORY_SEPARATOR);
6 6
 }
7 7
 if (!defined('WINDOWS')) {
8
-    if (DS === '\\' || substr(PHP_OS, 0, 3) === 'WIN') {
9
-        define('WINDOWS', true);
10
-    } else {
11
-        define('WINDOWS', false);
12
-    }
8
+	if (DS === '\\' || substr(PHP_OS, 0, 3) === 'WIN') {
9
+		define('WINDOWS', true);
10
+	} else {
11
+		define('WINDOWS', false);
12
+	}
13 13
 }
14 14
 
15 15
 define('ROOT', dirname(__DIR__));
@@ -32,25 +32,25 @@  discard block
 block discarded – undo
32 32
 require CORE_PATH . 'config/bootstrap.php';
33 33
 
34 34
 Cake\Core\Configure::write('App', [
35
-    'namespace' => 'App',
36
-    'encoding' => 'UTF-8',
37
-    'paths' => [
38
-        'templates' => [ROOT . DS . 'tests' . DS . 'test_app' . DS . 'src' . DS . 'Template' . DS],
39
-    ]
35
+	'namespace' => 'App',
36
+	'encoding' => 'UTF-8',
37
+	'paths' => [
38
+		'templates' => [ROOT . DS . 'tests' . DS . 'test_app' . DS . 'src' . DS . 'Template' . DS],
39
+	]
40 40
 ]);
41 41
 
42 42
 Cake\Core\Configure::write('debug', true);
43 43
 
44 44
 Cake\Core\Configure::write('EmailTransport', [
45
-    'default' => [
46
-        'className' => 'Debug',
47
-    ],
45
+	'default' => [
46
+		'className' => 'Debug',
47
+	],
48 48
 ]);
49 49
 Cake\Core\Configure::write('Email', [
50
-    'default' => [
51
-        'transport' => 'default',
52
-        'from' => 'you@localhost',
53
-    ],
50
+	'default' => [
51
+		'transport' => 'default',
52
+		'from' => 'you@localhost',
53
+	],
54 54
 ]);
55 55
 
56 56
 mb_internal_encoding('UTF-8');
@@ -61,24 +61,24 @@  discard block
 block discarded – undo
61 61
 $Tmp->create(TMP . 'cache/views', 0770);
62 62
 
63 63
 $cache = [
64
-    'default' => [
65
-        'engine' => 'File',
66
-        'path' => CACHE,
67
-    ],
68
-    '_cake_core_' => [
69
-        'className' => 'File',
70
-        'prefix' => 'crud_myapp_cake_core_',
71
-        'path' => CACHE . 'persistent/',
72
-        'serialize' => true,
73
-        'duration' => '+10 seconds',
74
-    ],
75
-    '_cake_model_' => [
76
-        'className' => 'File',
77
-        'prefix' => 'crud_my_app_cake_model_',
78
-        'path' => CACHE . 'models/',
79
-        'serialize' => 'File',
80
-        'duration' => '+10 seconds',
81
-    ],
64
+	'default' => [
65
+		'engine' => 'File',
66
+		'path' => CACHE,
67
+	],
68
+	'_cake_core_' => [
69
+		'className' => 'File',
70
+		'prefix' => 'crud_myapp_cake_core_',
71
+		'path' => CACHE . 'persistent/',
72
+		'serialize' => true,
73
+		'duration' => '+10 seconds',
74
+	],
75
+	'_cake_model_' => [
76
+		'className' => 'File',
77
+		'prefix' => 'crud_my_app_cake_model_',
78
+		'path' => CACHE . 'models/',
79
+		'serialize' => 'File',
80
+		'duration' => '+10 seconds',
81
+	],
82 82
 ];
83 83
 
84 84
 Cake\Cache\Cache::setConfig($cache);
@@ -86,37 +86,37 @@  discard block
 block discarded – undo
86 86
 Cake\Core\Plugin::getCollection()->add(new \Queue\Plugin());
87 87
 
88 88
 Cake\Mailer\TransportFactory::setConfig('default', [
89
-    'className' => 'Debug',
89
+	'className' => 'Debug',
90 90
 ]);
91 91
 Cake\Mailer\TransportFactory::setConfig('queue', [
92
-    'className' => 'Queue.Queue',
92
+	'className' => 'Queue.Queue',
93 93
 ]);
94 94
 Cake\Mailer\Email::setConfig('default', [
95
-    'transport' => 'default',
95
+	'transport' => 'default',
96 96
 ]);
97 97
 
98 98
 // Allow local overwrite
99 99
 // E.g. in your console: export db_dsn="mysql://root:[email protected]/cake_test"
100 100
 if (!getenv('db_class') && getenv('db_dsn')) {
101
-    ConnectionManager::setConfig('test', ['url' => getenv('db_dsn')]);
101
+	ConnectionManager::setConfig('test', ['url' => getenv('db_dsn')]);
102 102
 
103
-    return;
103
+	return;
104 104
 }
105 105
 if (!getenv('db_class')) {
106
-    putenv('db_class=Cake\Database\Driver\Sqlite');
107
-    putenv('db_dsn=sqlite::memory:');
106
+	putenv('db_class=Cake\Database\Driver\Sqlite');
107
+	putenv('db_dsn=sqlite::memory:');
108 108
 }
109 109
 
110 110
 // Uses Travis config then (MySQL, Postgres, ...)
111 111
 ConnectionManager::setConfig('test', [
112
-    'className' => 'Cake\Database\Connection',
113
-    'driver' => getenv('db_class'),
114
-    'dsn' => getenv('db_dsn'),
115
-    'database' => getenv('db_database'),
116
-    'username' => getenv('db_username'),
117
-    'password' => getenv('db_password'),
118
-    'timezone' => 'UTC',
119
-    'quoteIdentifiers' => true,
120
-    'cacheMetadata' => true,
112
+	'className' => 'Cake\Database\Connection',
113
+	'driver' => getenv('db_class'),
114
+	'dsn' => getenv('db_dsn'),
115
+	'database' => getenv('db_database'),
116
+	'username' => getenv('db_username'),
117
+	'password' => getenv('db_password'),
118
+	'timezone' => 'UTC',
119
+	'quoteIdentifiers' => true,
120
+	'cacheMetadata' => true,
121 121
 
122 122
 ]);
Please login to merge, or discard this patch.
tests/TestCase/Model/Table/QueuedTasksTableTest.php 1 patch
Indentation   +414 added lines, -414 removed lines patch added patch discarded remove patch
@@ -19,418 +19,418 @@
 block discarded – undo
19 19
 class QueuedTasksTableTest extends TestCase
20 20
 {
21 21
 
22
-    /**
23
-     *
24
-     * @var \Queue\Model\Table\QueuedTasksTable
25
-     */
26
-    protected $QueuedTasks;
27
-
28
-    /**
29
-     * Fixtures
30
-     *
31
-     * @var array
32
-     */
33
-    public $fixtures = [
34
-        'plugin.Queue.QueuedTasks'
35
-    ];
36
-
37
-    /**
38
-     * setUp method
39
-     *
40
-     * @return void
41
-     */
42
-    public function setUp()
43
-    {
44
-        parent::setUp();
45
-        $config = TableRegistry::getTableLocator()->exists('QueuedTasks') ? [] : [
46
-            'className' => QueuedTasksTable::class
47
-        ];
48
-        $this->QueuedTasks = TableRegistry::getTableLocator()->get('QueuedTasks', $config);
49
-    }
50
-
51
-    /**
52
-     * Basic Instance test
53
-     *
54
-     * @return void
55
-     */
56
-    public function testQueueInstance()
57
-    {
58
-        $this->assertInstanceOf(QueuedTasksTable::class, $this->QueuedTasks);
59
-    }
60
-
61
-    /**
62
-     * Test the basic create and length evaluation functions.
63
-     *
64
-     * @return void
65
-     */
66
-    public function testCreateAndCount()
67
-    {
68
-        // at first, the queue should contain 0 items.
69
-        $this->assertSame(0, $this->QueuedTasks->getLength());
70
-
71
-        // create a job
72
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test1', [
73
-            'some' => 'random',
74
-            'test' => 'data'
75
-        ]));
76
-
77
-        // test if queue Length is 1 now.
78
-        $this->assertSame(1, $this->QueuedTasks->getLength());
79
-
80
-        // create some more jobs
81
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
82
-            'some' => 'random',
83
-            'test' => 'data2'
84
-        ]));
85
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
86
-            'some' => 'random',
87
-            'test' => 'data3'
88
-        ]));
89
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test3', [
90
-            'some' => 'random',
91
-            'test' => 'data4'
92
-        ]));
93
-
94
-        // overall queueLength shpould now be 4
95
-        $this->assertSame(4, $this->QueuedTasks->getLength());
96
-
97
-        // there should be 1 task of type 'test1', one of type 'test3' and 2 of type 'test2'
98
-        $this->assertSame(1, $this->QueuedTasks->getLength('test1'));
99
-        $this->assertSame(2, $this->QueuedTasks->getLength('test2'));
100
-        $this->assertSame(1, $this->QueuedTasks->getLength('test3'));
101
-    }
102
-
103
-    /**
104
-     * Test the basic create and fetch functions.
105
-     *
106
-     * @return void
107
-     */
108
-    public function testCreateAndFetch()
109
-    {
110
-        $this->_needsConnection();
111
-
112
-        // $capabilities is a list of tasks the worker can run.
113
-        $capabilities = [
114
-            'task1' => [
115
-                'name' => 'task1',
116
-                'timeout' => 100,
117
-                'retries' => 2
118
-            ]
119
-        ];
120
-        $testData = [
121
-            'x1' => 'y1',
122
-            'x2' => 'y2',
123
-            'x3' => 'y3',
124
-            'x4' => 'y4'
125
-        ];
126
-
127
-        // start off empty.
128
-        $this->assertSame([], $this->QueuedTasks->find()
129
-            ->toArray());
130
-        // at first, the queue should contain 0 items.
131
-        $this->assertSame(0, $this->QueuedTasks->getLength());
132
-        // there are no jobs, so we cant fetch any.
133
-        $this->assertNull($this->QueuedTasks->requestJob($capabilities));
134
-        // insert one job.
135
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $testData));
136
-
137
-        // fetch and check the first job.
138
-        $job = $this->QueuedTasks->requestJob($capabilities);
139
-        $this->assertSame(1, $job->id);
140
-        $this->assertSame('task1', $job->task);
141
-        $this->assertSame(0, $job->failed);
142
-        $this->assertNull($job->completed);
143
-        $this->assertSame($testData, unserialize($job->data));
144
-
145
-        // after this job has been fetched, it may not be reassigned.
146
-        $result = $this->QueuedTasks->requestJob($capabilities);
147
-        $this->assertNull($result);
148
-
149
-        // queue length is still 1 since the first job did not finish.
150
-        $this->assertSame(1, $this->QueuedTasks->getLength());
151
-
152
-        // Now mark Task1 as done
153
-        $this->assertTrue($this->QueuedTasks->markJobDone($job));
154
-
155
-        // Should be 0 again.
156
-        $this->assertSame(0, $this->QueuedTasks->getLength());
157
-    }
158
-
159
-    /**
160
-     * Test the delivery of jobs in sequence, skipping fetched but not completed tasks.
161
-     *
162
-     * @return void
163
-     */
164
-    public function testSequence()
165
-    {
166
-        $this->_needsConnection();
167
-
168
-        // $capabilities is a list of tasks the worker can run.
169
-        $capabilities = [
170
-            'task1' => [
171
-                'name' => 'task1',
172
-                'timeout' => 100,
173
-                'retries' => 2
174
-            ]
175
-        ];
176
-        // at first, the queue should contain 0 items.
177
-        $this->assertSame(0, $this->QueuedTasks->getLength());
178
-        // create some more jobs
179
-        foreach (range(0, 9) as $num) {
180
-            $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
181
-                'tasknum' => $num
182
-            ]));
183
-        }
184
-        // 10 jobs in the queue.
185
-        $this->assertSame(10, $this->QueuedTasks->getLength());
186
-
187
-        // jobs should be fetched in the original sequence.
188
-        $array = [];
189
-        foreach (range(0, 4) as $num) {
190
-            $this->QueuedTasks->clearKey();
191
-            $array[$num] = $this->QueuedTasks->requestJob($capabilities);
192
-            $jobData = unserialize($array[$num]['data']);
193
-            $this->assertSame($num, $jobData['tasknum']);
194
-        }
195
-        // now mark them as done
196
-        foreach (range(0, 4) as $num) {
197
-            $this->assertTrue($this->QueuedTasks->markJobDone($array[$num]));
198
-            $this->assertSame(9 - $num, $this->QueuedTasks->getLength());
199
-        }
200
-
201
-        // jobs should be fetched in the original sequence.
202
-        foreach (range(5, 9) as $num) {
203
-            $job = $this->QueuedTasks->requestJob($capabilities);
204
-            $jobData = unserialize($job->data);
205
-            $this->assertSame($num, $jobData['tasknum']);
206
-            $this->assertTrue($this->QueuedTasks->markJobDone($job));
207
-            $this->assertSame(9 - $num, $this->QueuedTasks->getLength());
208
-        }
209
-    }
210
-
211
-    /**
212
-     * Test creating Jobs to run close to a specified time, and strtotime parsing.
213
-     * Using toUnixString() function to convert Time object to timestamp, instead of strtotime
214
-     *
215
-     * @return null
216
-     */
217
-    public function testNotBefore()
218
-    {
219
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Min'));
220
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Day'));
221
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '2009-07-01 12:00:00'));
222
-        $data = $this->QueuedTasks->find('all')->toArray();
223
-        $this->assertWithinRange((new Time('+ 1 Min'))->toUnixString(), $data[0]['not_before']->toUnixString(), 60);
224
-        $this->assertWithinRange((new Time('+ 1 Day'))->toUnixString(), $data[1]['not_before']->toUnixString(), 60);
225
-        $this->assertWithinRange((new Time('2009-07-01 12:00:00'))->toUnixString(), $data[2]['not_before']->toUnixString(), 60);
226
-    }
227
-
228
-    /**
229
-     * Test Job reordering depending on 'notBefore' field.
230
-     * Jobs with an expired not_before field should be executed before any other job without specific timing info.
231
-     *
232
-     * @return void
233
-     */
234
-    public function testNotBeforeOrder()
235
-    {
236
-        $this->_needsConnection();
237
-
238
-        $capabilities = [
239
-            'task1' => [
240
-                'name' => 'task1',
241
-                'timeout' => 100,
242
-                'retries' => 2
243
-            ],
244
-            'dummytask' => [
245
-                'name' => 'dummytask',
246
-                'timeout' => 100,
247
-                'retries' => 2
248
-            ]
249
-        ];
250
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
251
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
252
-        // create a task with it's execution target some seconds in the past, so it should jump to the top of the testCreateAndFetchlist.
253
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
254
-            'three'
255
-        ], '- 3 Seconds'));
256
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
257
-            'two'
258
-        ], '- 5 Seconds'));
259
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
260
-            'one'
261
-        ], '- 7 Seconds'));
262
-
263
-        // when using requestJob, the jobs we just created should be delivered in this order, NOT the order in which they where created.
264
-        $expected = [
265
-            [
266
-                'name' => 'task1',
267
-                'data' => [
268
-                    'one'
269
-                ]
270
-            ],
271
-            [
272
-                'name' => 'task1',
273
-                'data' => [
274
-                    'two'
275
-                ]
276
-            ],
277
-            [
278
-                'name' => 'task1',
279
-                'data' => [
280
-                    'three'
281
-                ]
282
-            ],
283
-            [
284
-                'name' => 'dummytask',
285
-                'data' => null
286
-            ],
287
-            [
288
-                'name' => 'dummytask',
289
-                'data' => null
290
-            ]
291
-        ];
292
-
293
-        foreach ($expected as $item) {
294
-            $this->QueuedTasks->clearKey();
295
-            $tmp = $this->QueuedTasks->requestJob($capabilities);
296
-
297
-            $this->assertSame($item['name'], $tmp['task']);
298
-            $this->assertEquals($item['data'], unserialize($tmp['data']));
299
-        }
300
-    }
301
-
302
-    /**
303
-     * Job Rate limiting.
304
-     * Do not execute jobs of a certain type more often than once every X seconds.
305
-     *
306
-     * @return void
307
-     */
308
-    public function testRateLimit()
309
-    {
310
-        $this->_needsConnection();
311
-
312
-        $capabilities = [
313
-            'task1' => [
314
-                'name' => 'task1',
315
-                'timeout' => 101,
316
-                'retries' => 2,
317
-                'rate' => 2
318
-            ],
319
-            'dummytask' => [
320
-                'name' => 'dummytask',
321
-                'timeout' => 101,
322
-                'retries' => 2
323
-            ]
324
-        ];
325
-
326
-        // clear out the rate history
327
-        $this->QueuedTasks->rateHistory = [];
328
-
329
-        $data1 = [
330
-            'key' => 1
331
-        ];
332
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data1));
333
-        $data2 = [
334
-            'key' => 2
335
-        ];
336
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data2));
337
-        $data3 = [
338
-            'key' => 3
339
-        ];
340
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data3));
341
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
342
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
343
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
344
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
345
-
346
-        // At first we get task1-1.
347
-        $this->QueuedTasks->clearKey();
348
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
349
-        $this->assertSame('task1', $tmp['task']);
350
-        $this->assertSame($data1, unserialize($tmp['data']));
351
-
352
-        // The rate limit should now skip over task1-2 and fetch a dummytask.
353
-        $this->QueuedTasks->clearKey();
354
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
355
-        $this->assertSame('dummytask', $tmp['task']);
356
-        $this->assertFalse(unserialize($tmp['data']));
357
-
358
-        usleep(100000);
359
-        // and again.
360
-        $this->QueuedTasks->clearKey();
361
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
362
-        $this->assertSame('dummytask', $tmp['task']);
363
-        $this->assertFalse(unserialize($tmp['data']));
364
-
365
-        // Then some time passes
366
-        sleep(2);
367
-
368
-        // Now we should get task1-2
369
-        $this->QueuedTasks->clearKey();
370
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
371
-        $this->assertSame('task1', $tmp['task']);
372
-        $this->assertSame($data2, unserialize($tmp['data']));
373
-
374
-        // and again rate limit to dummytask.
375
-        $this->QueuedTasks->clearKey();
376
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
377
-        $this->assertSame('dummytask', $tmp['task']);
378
-        $this->assertFalse(unserialize($tmp['data']));
379
-
380
-        // Then some more time passes
381
-        sleep(2);
382
-
383
-        // Now we should get task1-3
384
-        $this->QueuedTasks->clearKey();
385
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
386
-        $this->assertSame('task1', $tmp['task']);
387
-        $this->assertSame($data3, unserialize($tmp['data']));
388
-
389
-        // and again rate limit to dummytask.
390
-        $this->QueuedTasks->clearKey();
391
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
392
-        $this->assertSame('dummytask', $tmp['task']);
393
-        $this->assertFalse(unserialize($tmp['data']));
394
-
395
-        // and now the queue is empty
396
-        $this->QueuedTasks->clearKey();
397
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
398
-        $this->assertNull($tmp);
399
-    }
400
-
401
-    /**
402
-     *
403
-     * @return void
404
-     */
405
-    public function testIsQueued()
406
-    {
407
-        $result = $this->QueuedTasks->isQueued('foo-bar');
408
-        $this->assertFalse($result);
409
-
410
-        $queuedJob = $this->QueuedTasks->newEntity([
411
-            'key' => 'key',
412
-            'task' => 'FooBar'
413
-        ]);
414
-        $this->QueuedTasks->saveOrFail($queuedJob);
415
-
416
-        $result = $this->QueuedTasks->isQueued('foo-bar');
417
-        $this->assertTrue($result);
418
-
419
-        $queuedJob->completed = new FrozenTime();
420
-        $this->QueuedTasks->saveOrFail($queuedJob);
421
-
422
-        $result = $this->QueuedTasks->isQueued('foo-bar');
423
-        $this->assertFalse($result);
424
-    }
425
-
426
-    /**
427
-     * Helper method for skipping tests that need a real connection.
428
-     *
429
-     * @return void
430
-     */
431
-    protected function _needsConnection()
432
-    {
433
-        $config = ConnectionManager::getConfig('test');
434
-        $this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
435
-    }
22
+	/**
23
+	 *
24
+	 * @var \Queue\Model\Table\QueuedTasksTable
25
+	 */
26
+	protected $QueuedTasks;
27
+
28
+	/**
29
+	 * Fixtures
30
+	 *
31
+	 * @var array
32
+	 */
33
+	public $fixtures = [
34
+		'plugin.Queue.QueuedTasks'
35
+	];
36
+
37
+	/**
38
+	 * setUp method
39
+	 *
40
+	 * @return void
41
+	 */
42
+	public function setUp()
43
+	{
44
+		parent::setUp();
45
+		$config = TableRegistry::getTableLocator()->exists('QueuedTasks') ? [] : [
46
+			'className' => QueuedTasksTable::class
47
+		];
48
+		$this->QueuedTasks = TableRegistry::getTableLocator()->get('QueuedTasks', $config);
49
+	}
50
+
51
+	/**
52
+	 * Basic Instance test
53
+	 *
54
+	 * @return void
55
+	 */
56
+	public function testQueueInstance()
57
+	{
58
+		$this->assertInstanceOf(QueuedTasksTable::class, $this->QueuedTasks);
59
+	}
60
+
61
+	/**
62
+	 * Test the basic create and length evaluation functions.
63
+	 *
64
+	 * @return void
65
+	 */
66
+	public function testCreateAndCount()
67
+	{
68
+		// at first, the queue should contain 0 items.
69
+		$this->assertSame(0, $this->QueuedTasks->getLength());
70
+
71
+		// create a job
72
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test1', [
73
+			'some' => 'random',
74
+			'test' => 'data'
75
+		]));
76
+
77
+		// test if queue Length is 1 now.
78
+		$this->assertSame(1, $this->QueuedTasks->getLength());
79
+
80
+		// create some more jobs
81
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
82
+			'some' => 'random',
83
+			'test' => 'data2'
84
+		]));
85
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
86
+			'some' => 'random',
87
+			'test' => 'data3'
88
+		]));
89
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test3', [
90
+			'some' => 'random',
91
+			'test' => 'data4'
92
+		]));
93
+
94
+		// overall queueLength shpould now be 4
95
+		$this->assertSame(4, $this->QueuedTasks->getLength());
96
+
97
+		// there should be 1 task of type 'test1', one of type 'test3' and 2 of type 'test2'
98
+		$this->assertSame(1, $this->QueuedTasks->getLength('test1'));
99
+		$this->assertSame(2, $this->QueuedTasks->getLength('test2'));
100
+		$this->assertSame(1, $this->QueuedTasks->getLength('test3'));
101
+	}
102
+
103
+	/**
104
+	 * Test the basic create and fetch functions.
105
+	 *
106
+	 * @return void
107
+	 */
108
+	public function testCreateAndFetch()
109
+	{
110
+		$this->_needsConnection();
111
+
112
+		// $capabilities is a list of tasks the worker can run.
113
+		$capabilities = [
114
+			'task1' => [
115
+				'name' => 'task1',
116
+				'timeout' => 100,
117
+				'retries' => 2
118
+			]
119
+		];
120
+		$testData = [
121
+			'x1' => 'y1',
122
+			'x2' => 'y2',
123
+			'x3' => 'y3',
124
+			'x4' => 'y4'
125
+		];
126
+
127
+		// start off empty.
128
+		$this->assertSame([], $this->QueuedTasks->find()
129
+			->toArray());
130
+		// at first, the queue should contain 0 items.
131
+		$this->assertSame(0, $this->QueuedTasks->getLength());
132
+		// there are no jobs, so we cant fetch any.
133
+		$this->assertNull($this->QueuedTasks->requestJob($capabilities));
134
+		// insert one job.
135
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $testData));
136
+
137
+		// fetch and check the first job.
138
+		$job = $this->QueuedTasks->requestJob($capabilities);
139
+		$this->assertSame(1, $job->id);
140
+		$this->assertSame('task1', $job->task);
141
+		$this->assertSame(0, $job->failed);
142
+		$this->assertNull($job->completed);
143
+		$this->assertSame($testData, unserialize($job->data));
144
+
145
+		// after this job has been fetched, it may not be reassigned.
146
+		$result = $this->QueuedTasks->requestJob($capabilities);
147
+		$this->assertNull($result);
148
+
149
+		// queue length is still 1 since the first job did not finish.
150
+		$this->assertSame(1, $this->QueuedTasks->getLength());
151
+
152
+		// Now mark Task1 as done
153
+		$this->assertTrue($this->QueuedTasks->markJobDone($job));
154
+
155
+		// Should be 0 again.
156
+		$this->assertSame(0, $this->QueuedTasks->getLength());
157
+	}
158
+
159
+	/**
160
+	 * Test the delivery of jobs in sequence, skipping fetched but not completed tasks.
161
+	 *
162
+	 * @return void
163
+	 */
164
+	public function testSequence()
165
+	{
166
+		$this->_needsConnection();
167
+
168
+		// $capabilities is a list of tasks the worker can run.
169
+		$capabilities = [
170
+			'task1' => [
171
+				'name' => 'task1',
172
+				'timeout' => 100,
173
+				'retries' => 2
174
+			]
175
+		];
176
+		// at first, the queue should contain 0 items.
177
+		$this->assertSame(0, $this->QueuedTasks->getLength());
178
+		// create some more jobs
179
+		foreach (range(0, 9) as $num) {
180
+			$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
181
+				'tasknum' => $num
182
+			]));
183
+		}
184
+		// 10 jobs in the queue.
185
+		$this->assertSame(10, $this->QueuedTasks->getLength());
186
+
187
+		// jobs should be fetched in the original sequence.
188
+		$array = [];
189
+		foreach (range(0, 4) as $num) {
190
+			$this->QueuedTasks->clearKey();
191
+			$array[$num] = $this->QueuedTasks->requestJob($capabilities);
192
+			$jobData = unserialize($array[$num]['data']);
193
+			$this->assertSame($num, $jobData['tasknum']);
194
+		}
195
+		// now mark them as done
196
+		foreach (range(0, 4) as $num) {
197
+			$this->assertTrue($this->QueuedTasks->markJobDone($array[$num]));
198
+			$this->assertSame(9 - $num, $this->QueuedTasks->getLength());
199
+		}
200
+
201
+		// jobs should be fetched in the original sequence.
202
+		foreach (range(5, 9) as $num) {
203
+			$job = $this->QueuedTasks->requestJob($capabilities);
204
+			$jobData = unserialize($job->data);
205
+			$this->assertSame($num, $jobData['tasknum']);
206
+			$this->assertTrue($this->QueuedTasks->markJobDone($job));
207
+			$this->assertSame(9 - $num, $this->QueuedTasks->getLength());
208
+		}
209
+	}
210
+
211
+	/**
212
+	 * Test creating Jobs to run close to a specified time, and strtotime parsing.
213
+	 * Using toUnixString() function to convert Time object to timestamp, instead of strtotime
214
+	 *
215
+	 * @return null
216
+	 */
217
+	public function testNotBefore()
218
+	{
219
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Min'));
220
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Day'));
221
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '2009-07-01 12:00:00'));
222
+		$data = $this->QueuedTasks->find('all')->toArray();
223
+		$this->assertWithinRange((new Time('+ 1 Min'))->toUnixString(), $data[0]['not_before']->toUnixString(), 60);
224
+		$this->assertWithinRange((new Time('+ 1 Day'))->toUnixString(), $data[1]['not_before']->toUnixString(), 60);
225
+		$this->assertWithinRange((new Time('2009-07-01 12:00:00'))->toUnixString(), $data[2]['not_before']->toUnixString(), 60);
226
+	}
227
+
228
+	/**
229
+	 * Test Job reordering depending on 'notBefore' field.
230
+	 * Jobs with an expired not_before field should be executed before any other job without specific timing info.
231
+	 *
232
+	 * @return void
233
+	 */
234
+	public function testNotBeforeOrder()
235
+	{
236
+		$this->_needsConnection();
237
+
238
+		$capabilities = [
239
+			'task1' => [
240
+				'name' => 'task1',
241
+				'timeout' => 100,
242
+				'retries' => 2
243
+			],
244
+			'dummytask' => [
245
+				'name' => 'dummytask',
246
+				'timeout' => 100,
247
+				'retries' => 2
248
+			]
249
+		];
250
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
251
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
252
+		// create a task with it's execution target some seconds in the past, so it should jump to the top of the testCreateAndFetchlist.
253
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
254
+			'three'
255
+		], '- 3 Seconds'));
256
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
257
+			'two'
258
+		], '- 5 Seconds'));
259
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
260
+			'one'
261
+		], '- 7 Seconds'));
262
+
263
+		// when using requestJob, the jobs we just created should be delivered in this order, NOT the order in which they where created.
264
+		$expected = [
265
+			[
266
+				'name' => 'task1',
267
+				'data' => [
268
+					'one'
269
+				]
270
+			],
271
+			[
272
+				'name' => 'task1',
273
+				'data' => [
274
+					'two'
275
+				]
276
+			],
277
+			[
278
+				'name' => 'task1',
279
+				'data' => [
280
+					'three'
281
+				]
282
+			],
283
+			[
284
+				'name' => 'dummytask',
285
+				'data' => null
286
+			],
287
+			[
288
+				'name' => 'dummytask',
289
+				'data' => null
290
+			]
291
+		];
292
+
293
+		foreach ($expected as $item) {
294
+			$this->QueuedTasks->clearKey();
295
+			$tmp = $this->QueuedTasks->requestJob($capabilities);
296
+
297
+			$this->assertSame($item['name'], $tmp['task']);
298
+			$this->assertEquals($item['data'], unserialize($tmp['data']));
299
+		}
300
+	}
301
+
302
+	/**
303
+	 * Job Rate limiting.
304
+	 * Do not execute jobs of a certain type more often than once every X seconds.
305
+	 *
306
+	 * @return void
307
+	 */
308
+	public function testRateLimit()
309
+	{
310
+		$this->_needsConnection();
311
+
312
+		$capabilities = [
313
+			'task1' => [
314
+				'name' => 'task1',
315
+				'timeout' => 101,
316
+				'retries' => 2,
317
+				'rate' => 2
318
+			],
319
+			'dummytask' => [
320
+				'name' => 'dummytask',
321
+				'timeout' => 101,
322
+				'retries' => 2
323
+			]
324
+		];
325
+
326
+		// clear out the rate history
327
+		$this->QueuedTasks->rateHistory = [];
328
+
329
+		$data1 = [
330
+			'key' => 1
331
+		];
332
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data1));
333
+		$data2 = [
334
+			'key' => 2
335
+		];
336
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data2));
337
+		$data3 = [
338
+			'key' => 3
339
+		];
340
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data3));
341
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
342
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
343
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
344
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
345
+
346
+		// At first we get task1-1.
347
+		$this->QueuedTasks->clearKey();
348
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
349
+		$this->assertSame('task1', $tmp['task']);
350
+		$this->assertSame($data1, unserialize($tmp['data']));
351
+
352
+		// The rate limit should now skip over task1-2 and fetch a dummytask.
353
+		$this->QueuedTasks->clearKey();
354
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
355
+		$this->assertSame('dummytask', $tmp['task']);
356
+		$this->assertFalse(unserialize($tmp['data']));
357
+
358
+		usleep(100000);
359
+		// and again.
360
+		$this->QueuedTasks->clearKey();
361
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
362
+		$this->assertSame('dummytask', $tmp['task']);
363
+		$this->assertFalse(unserialize($tmp['data']));
364
+
365
+		// Then some time passes
366
+		sleep(2);
367
+
368
+		// Now we should get task1-2
369
+		$this->QueuedTasks->clearKey();
370
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
371
+		$this->assertSame('task1', $tmp['task']);
372
+		$this->assertSame($data2, unserialize($tmp['data']));
373
+
374
+		// and again rate limit to dummytask.
375
+		$this->QueuedTasks->clearKey();
376
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
377
+		$this->assertSame('dummytask', $tmp['task']);
378
+		$this->assertFalse(unserialize($tmp['data']));
379
+
380
+		// Then some more time passes
381
+		sleep(2);
382
+
383
+		// Now we should get task1-3
384
+		$this->QueuedTasks->clearKey();
385
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
386
+		$this->assertSame('task1', $tmp['task']);
387
+		$this->assertSame($data3, unserialize($tmp['data']));
388
+
389
+		// and again rate limit to dummytask.
390
+		$this->QueuedTasks->clearKey();
391
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
392
+		$this->assertSame('dummytask', $tmp['task']);
393
+		$this->assertFalse(unserialize($tmp['data']));
394
+
395
+		// and now the queue is empty
396
+		$this->QueuedTasks->clearKey();
397
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
398
+		$this->assertNull($tmp);
399
+	}
400
+
401
+	/**
402
+	 *
403
+	 * @return void
404
+	 */
405
+	public function testIsQueued()
406
+	{
407
+		$result = $this->QueuedTasks->isQueued('foo-bar');
408
+		$this->assertFalse($result);
409
+
410
+		$queuedJob = $this->QueuedTasks->newEntity([
411
+			'key' => 'key',
412
+			'task' => 'FooBar'
413
+		]);
414
+		$this->QueuedTasks->saveOrFail($queuedJob);
415
+
416
+		$result = $this->QueuedTasks->isQueued('foo-bar');
417
+		$this->assertTrue($result);
418
+
419
+		$queuedJob->completed = new FrozenTime();
420
+		$this->QueuedTasks->saveOrFail($queuedJob);
421
+
422
+		$result = $this->QueuedTasks->isQueued('foo-bar');
423
+		$this->assertFalse($result);
424
+	}
425
+
426
+	/**
427
+	 * Helper method for skipping tests that need a real connection.
428
+	 *
429
+	 * @return void
430
+	 */
431
+	protected function _needsConnection()
432
+	{
433
+		$config = ConnectionManager::getConfig('test');
434
+		$this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
435
+	}
436 436
 }
Please login to merge, or discard this patch.