Completed
Pull Request — master (#35)
by
unknown
03:37
created
src/Shell/QueueShell.php 1 patch
Indentation   +562 added lines, -562 removed lines patch added patch discarded remove patch
@@ -31,578 +31,578 @@
 block discarded – undo
31 31
 class QueueShell extends Shell
32 32
 {
33 33
 
34
-    /**
35
-     *
36
-     * @var string
37
-     */
38
-    public $modelClass = 'Queue.QueuedTasks';
39
-
40
-    /**
41
-     *
42
-     * @var array|null
43
-     */
44
-    protected $_taskConf;
45
-
46
-    /**
47
-     *
48
-     * @var int
49
-     */
50
-    protected $_time = 0;
51
-
52
-    /**
53
-     *
54
-     * @var bool
55
-     */
56
-    protected $_exit = false;
57
-
58
-    /**
59
-     * Overwrite shell initialize to dynamically load all Queue Related Tasks.
60
-     *
61
-     * @return void
62
-     */
63
-    public function initialize()
64
-    {
65
-        $taskFinder = new TaskFinder();
66
-        $this->tasks = $taskFinder->allAppAndPluginTasks();
67
-
68
-        parent::initialize();
69
-    }
70
-
71
-    /**
72
-     *
73
-     * @return void
74
-     */
75
-    public function startup()
76
-    {
77
-        if ($this->param('quiet')) {
78
-            $this->interactive = false;
79
-        }
80
-
81
-        parent::startup();
82
-    }
83
-
84
-    /**
85
-     *
86
-     * @return string
87
-     */
88
-    public function getDescription()
89
-    {
90
-        $tasks = [];
91
-        foreach ($this->taskNames as $loadedTask) {
92
-            $tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
93
-        }
94
-        $tasks = implode(PHP_EOL, $tasks);
95
-
96
-        $text = <<<TEXT
34
+	/**
35
+	 *
36
+	 * @var string
37
+	 */
38
+	public $modelClass = 'Queue.QueuedTasks';
39
+
40
+	/**
41
+	 *
42
+	 * @var array|null
43
+	 */
44
+	protected $_taskConf;
45
+
46
+	/**
47
+	 *
48
+	 * @var int
49
+	 */
50
+	protected $_time = 0;
51
+
52
+	/**
53
+	 *
54
+	 * @var bool
55
+	 */
56
+	protected $_exit = false;
57
+
58
+	/**
59
+	 * Overwrite shell initialize to dynamically load all Queue Related Tasks.
60
+	 *
61
+	 * @return void
62
+	 */
63
+	public function initialize()
64
+	{
65
+		$taskFinder = new TaskFinder();
66
+		$this->tasks = $taskFinder->allAppAndPluginTasks();
67
+
68
+		parent::initialize();
69
+	}
70
+
71
+	/**
72
+	 *
73
+	 * @return void
74
+	 */
75
+	public function startup()
76
+	{
77
+		if ($this->param('quiet')) {
78
+			$this->interactive = false;
79
+		}
80
+
81
+		parent::startup();
82
+	}
83
+
84
+	/**
85
+	 *
86
+	 * @return string
87
+	 */
88
+	public function getDescription()
89
+	{
90
+		$tasks = [];
91
+		foreach ($this->taskNames as $loadedTask) {
92
+			$tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
93
+		}
94
+		$tasks = implode(PHP_EOL, $tasks);
95
+
96
+		$text = <<<TEXT
97 97
 Simple and minimalistic job queue (or deferred-task) system.
98 98
 
99 99
 Available Tasks:
100 100
 $tasks
101 101
 TEXT;
102 102
 
103
-        return $text;
104
-    }
105
-
106
-    /**
107
-     * Look for a Queue Task of hte passed name and try to call add() on it.
108
-     * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
109
-     *
110
-     * @return void
111
-     */
112
-    public function add()
113
-    {
114
-        if (count($this->args) < 1) {
115
-            $this->out('Please call like this:');
116
-            $this->out('       bin/cake queue add <taskname>');
117
-            $this->_displayAvailableTasks();
118
-
119
-            return;
120
-        }
121
-
122
-        $name = Inflector::camelize($this->args[0]);
123
-        if (in_array('Queue' . $name, $this->taskNames, true)) {
124
-            /** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
125
-            $task = $this->{'Queue' . $name};
126
-            if (!($task instanceof AddInterface)) {
127
-                $this->abort('This task does not support adding via CLI call');
128
-            }
129
-            $task->add();
130
-        } else {
131
-            $this->out('Error: Task not found: ' . $name);
132
-            $this->_displayAvailableTasks();
133
-        }
134
-    }
135
-
136
-    /**
137
-     * Output the task without Queue or Task
138
-     * example: QueueImageTask becomes Image on display
139
-     *
140
-     * @param string $task Task name
141
-     * @return string Cleaned task name
142
-     */
143
-    protected function _taskName($task)
144
-    {
145
-        if (strpos($task, 'Queue') === 0) {
146
-            return substr($task, 5);
147
-        }
148
-
149
-        return $task;
150
-    }
151
-
152
-    /**
153
-     * Run a QueueWorker loop.
154
-     * Runs a Queue Worker process which will try to find unassigned jobs in the queue
155
-     * which it may run and try to fetch and execute them.
156
-     *
157
-     * @return int|null
158
-     */
159
-    public function runworker()
160
-    {
161
-        try {
162
-            $pid = $this->_initPid();
163
-        } catch (PersistenceFailedException $exception) {
164
-            $this->err($exception->getMessage());
165
-            $limit = (int)Configure::read('Queue.maxWorkers');
166
-            if ($limit) {
167
-                $this->out('Cannot start worker: Too many workers already/still running on this server (' . $limit . '/' . $limit . ')');
168
-            }
169
-
170
-            return static::CODE_ERROR;
171
-        }
172
-
173
-        // Enable Garbage Collector (PHP >= 5.3)
174
-        if (function_exists('gc_enable')) {
175
-            gc_enable();
176
-        }
177
-        if (function_exists('pcntl_signal')) {
178
-            pcntl_signal(SIGTERM, [
179
-                &$this,
180
-                '_exit'
181
-            ]);
182
-            pcntl_signal(SIGINT, [
183
-                &$this,
184
-                '_exit'
185
-            ]);
186
-            pcntl_signal(SIGTSTP, [
187
-                &$this,
188
-                '_exit'
189
-            ]);
190
-            pcntl_signal(SIGQUIT, [
191
-                &$this,
192
-                '_exit'
193
-            ]);
194
-        }
195
-        $this->_exit = false;
196
-
197
-        $startTime = time();
198
-        $types = $this->_stringToArray($this->param('type'));
199
-
200
-        while (!$this->_exit) {
201
-            $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
202
-
203
-            $QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
204
-
205
-            if ($QueuedTask) {
206
-                $this->runJob($QueuedTask, $pid);
207
-            } elseif (Configure::read('Queue.exitWhenNothingToDo')) {
208
-                $this->out('nothing to do, exiting.');
209
-                $this->_exit = true;
210
-            } else {
211
-                $this->out('nothing to do, sleeping.');
212
-                sleep(Config::sleepTime());
213
-            }
214
-
215
-            // check if we are over the maximum runtime and end processing if so.
216
-            if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
217
-                $this->_exit = true;
218
-                $this->out('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
219
-            }
220
-            if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
221
-                $this->out(__d('queue', 'Performing old job cleanup.'));
222
-                $this->QueuedTasks->cleanOldJobs();
223
-            }
224
-            $this->hr();
225
-        }
226
-
227
-        $this->_deletePid($pid);
228
-
229
-        if ($this->param('verbose')) {
230
-            $this->_log('endworker', $pid);
231
-        }
232
-    }
233
-
234
-    /**
235
-     *
236
-     * @param \Queue\Model\Entity\QueuedTask $QueuedTask Queued task
237
-     * @param string $pid PID of the process
238
-     * @return void
239
-     */
240
-    protected function runJob(QueuedTask $QueuedTask, $pid)
241
-    {
242
-        $this->out('Running Job of type "' . $QueuedTask->task . '"');
243
-        $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, $pid, false);
244
-        $taskName = 'Queue' . $QueuedTask->task;
245
-
246
-        try {
247
-            $this->_time = time();
248
-
249
-            $data = unserialize($QueuedTask->data);
250
-            /** @var \Queue\Shell\Task\QueueTask $task */
251
-            $task = $this->{$taskName};
252
-            if (!$task instanceof QueueTaskInterface) {
253
-                throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
254
-            }
255
-
256
-            $return = $task->run((array)$data, $QueuedTask->id);
257
-            if ($return !== null) {
258
-                trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
259
-            }
260
-            $failureMessage = $taskName . ' failed';
261
-        } catch (Throwable $e) {
262
-            $return = false;
263
-
264
-            $failureMessage = get_class($e) . ': ' . $e->getMessage();
265
-            if (!($e instanceof QueueException)) {
266
-                $failureMessage .= "\n" . $e->getTraceAsString();
267
-            }
268
-
269
-            $this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage, $pid);
270
-        } catch (Exception $e) {
271
-            $return = false;
272
-
273
-            $failureMessage = get_class($e) . ': ' . $e->getMessage();
274
-            $this->_logError($taskName . "\n" . $failureMessage, $pid);
275
-        }
276
-
277
-        if ($return === false) {
278
-            $this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
279
-            $failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
280
-            $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus, $pid);
281
-            $this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
282
-
283
-            return;
284
-        }
285
-
286
-        $this->QueuedTasks->markJobDone($QueuedTask);
287
-        $this->out('Job Finished.');
288
-    }
289
-
290
-    /**
291
-     * Manually trigger a Finished job cleanup.
292
-     *
293
-     * @return void
294
-     */
295
-    public function clean()
296
-    {
297
-        if (!Configure::read('Queue.cleanupTimeout')) {
298
-            $this->abort('You disabled cleanuptimout in config. Aborting.');
299
-        }
300
-
301
-        $this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
302
-        $this->QueuedTasks->cleanOldJobs();
303
-    }
304
-
305
-    /**
306
-     * Display current settings
307
-     *
308
-     * @return void
309
-     */
310
-    public function settings()
311
-    {
312
-        $this->out('Current Settings:');
313
-        $conf = (array)Configure::read('Queue');
314
-        foreach ($conf as $key => $val) {
315
-            if ($val === false) {
316
-                $val = 'no';
317
-            }
318
-            if ($val === true) {
319
-                $val = 'yes';
320
-            }
321
-            $this->out('* ' . $key . ': ' . print_r($val, true));
322
-        }
323
-
324
-        $this->out();
325
-    }
326
-
327
-    /**
328
-     * Display some statistics about Finished Jobs.
329
-     *
330
-     * @return void
331
-     */
332
-    public function stats()
333
-    {
334
-        $this->out('Jobs currently in the queue:');
335
-
336
-        $types = $this->QueuedTasks->getTypes()->toArray();
337
-        foreach ($types as $type) {
338
-            $this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
339
-        }
340
-        $this->hr();
341
-        $this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
342
-        $this->hr();
343
-        $this->out('Finished job statistics:');
344
-        $data = $this->QueuedTasks->getStats();
345
-        foreach ($data as $item) {
346
-            $this->out(' ' . $item['task'] . ': ');
347
-            $this->out('   Finished Jobs in Database: ' . $item['num']);
348
-            $this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
349
-            $this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
350
-            $this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
351
-        }
352
-    }
353
-
354
-    /**
355
-     * Get option parser method to parse commandline options
356
-     *
357
-     * @return \Cake\Console\ConsoleOptionParser
358
-     */
359
-    public function getOptionParser()
360
-    {
361
-        $subcommandParser = [
362
-            'options' => [
363
-                /*
103
+		return $text;
104
+	}
105
+
106
+	/**
107
+	 * Look for a Queue Task of hte passed name and try to call add() on it.
108
+	 * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
109
+	 *
110
+	 * @return void
111
+	 */
112
+	public function add()
113
+	{
114
+		if (count($this->args) < 1) {
115
+			$this->out('Please call like this:');
116
+			$this->out('       bin/cake queue add <taskname>');
117
+			$this->_displayAvailableTasks();
118
+
119
+			return;
120
+		}
121
+
122
+		$name = Inflector::camelize($this->args[0]);
123
+		if (in_array('Queue' . $name, $this->taskNames, true)) {
124
+			/** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
125
+			$task = $this->{'Queue' . $name};
126
+			if (!($task instanceof AddInterface)) {
127
+				$this->abort('This task does not support adding via CLI call');
128
+			}
129
+			$task->add();
130
+		} else {
131
+			$this->out('Error: Task not found: ' . $name);
132
+			$this->_displayAvailableTasks();
133
+		}
134
+	}
135
+
136
+	/**
137
+	 * Output the task without Queue or Task
138
+	 * example: QueueImageTask becomes Image on display
139
+	 *
140
+	 * @param string $task Task name
141
+	 * @return string Cleaned task name
142
+	 */
143
+	protected function _taskName($task)
144
+	{
145
+		if (strpos($task, 'Queue') === 0) {
146
+			return substr($task, 5);
147
+		}
148
+
149
+		return $task;
150
+	}
151
+
152
+	/**
153
+	 * Run a QueueWorker loop.
154
+	 * Runs a Queue Worker process which will try to find unassigned jobs in the queue
155
+	 * which it may run and try to fetch and execute them.
156
+	 *
157
+	 * @return int|null
158
+	 */
159
+	public function runworker()
160
+	{
161
+		try {
162
+			$pid = $this->_initPid();
163
+		} catch (PersistenceFailedException $exception) {
164
+			$this->err($exception->getMessage());
165
+			$limit = (int)Configure::read('Queue.maxWorkers');
166
+			if ($limit) {
167
+				$this->out('Cannot start worker: Too many workers already/still running on this server (' . $limit . '/' . $limit . ')');
168
+			}
169
+
170
+			return static::CODE_ERROR;
171
+		}
172
+
173
+		// Enable Garbage Collector (PHP >= 5.3)
174
+		if (function_exists('gc_enable')) {
175
+			gc_enable();
176
+		}
177
+		if (function_exists('pcntl_signal')) {
178
+			pcntl_signal(SIGTERM, [
179
+				&$this,
180
+				'_exit'
181
+			]);
182
+			pcntl_signal(SIGINT, [
183
+				&$this,
184
+				'_exit'
185
+			]);
186
+			pcntl_signal(SIGTSTP, [
187
+				&$this,
188
+				'_exit'
189
+			]);
190
+			pcntl_signal(SIGQUIT, [
191
+				&$this,
192
+				'_exit'
193
+			]);
194
+		}
195
+		$this->_exit = false;
196
+
197
+		$startTime = time();
198
+		$types = $this->_stringToArray($this->param('type'));
199
+
200
+		while (!$this->_exit) {
201
+			$this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
202
+
203
+			$QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
204
+
205
+			if ($QueuedTask) {
206
+				$this->runJob($QueuedTask, $pid);
207
+			} elseif (Configure::read('Queue.exitWhenNothingToDo')) {
208
+				$this->out('nothing to do, exiting.');
209
+				$this->_exit = true;
210
+			} else {
211
+				$this->out('nothing to do, sleeping.');
212
+				sleep(Config::sleepTime());
213
+			}
214
+
215
+			// check if we are over the maximum runtime and end processing if so.
216
+			if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
217
+				$this->_exit = true;
218
+				$this->out('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
219
+			}
220
+			if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
221
+				$this->out(__d('queue', 'Performing old job cleanup.'));
222
+				$this->QueuedTasks->cleanOldJobs();
223
+			}
224
+			$this->hr();
225
+		}
226
+
227
+		$this->_deletePid($pid);
228
+
229
+		if ($this->param('verbose')) {
230
+			$this->_log('endworker', $pid);
231
+		}
232
+	}
233
+
234
+	/**
235
+	 *
236
+	 * @param \Queue\Model\Entity\QueuedTask $QueuedTask Queued task
237
+	 * @param string $pid PID of the process
238
+	 * @return void
239
+	 */
240
+	protected function runJob(QueuedTask $QueuedTask, $pid)
241
+	{
242
+		$this->out('Running Job of type "' . $QueuedTask->task . '"');
243
+		$this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, $pid, false);
244
+		$taskName = 'Queue' . $QueuedTask->task;
245
+
246
+		try {
247
+			$this->_time = time();
248
+
249
+			$data = unserialize($QueuedTask->data);
250
+			/** @var \Queue\Shell\Task\QueueTask $task */
251
+			$task = $this->{$taskName};
252
+			if (!$task instanceof QueueTaskInterface) {
253
+				throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
254
+			}
255
+
256
+			$return = $task->run((array)$data, $QueuedTask->id);
257
+			if ($return !== null) {
258
+				trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
259
+			}
260
+			$failureMessage = $taskName . ' failed';
261
+		} catch (Throwable $e) {
262
+			$return = false;
263
+
264
+			$failureMessage = get_class($e) . ': ' . $e->getMessage();
265
+			if (!($e instanceof QueueException)) {
266
+				$failureMessage .= "\n" . $e->getTraceAsString();
267
+			}
268
+
269
+			$this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage, $pid);
270
+		} catch (Exception $e) {
271
+			$return = false;
272
+
273
+			$failureMessage = get_class($e) . ': ' . $e->getMessage();
274
+			$this->_logError($taskName . "\n" . $failureMessage, $pid);
275
+		}
276
+
277
+		if ($return === false) {
278
+			$this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
279
+			$failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
280
+			$this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus, $pid);
281
+			$this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
282
+
283
+			return;
284
+		}
285
+
286
+		$this->QueuedTasks->markJobDone($QueuedTask);
287
+		$this->out('Job Finished.');
288
+	}
289
+
290
+	/**
291
+	 * Manually trigger a Finished job cleanup.
292
+	 *
293
+	 * @return void
294
+	 */
295
+	public function clean()
296
+	{
297
+		if (!Configure::read('Queue.cleanupTimeout')) {
298
+			$this->abort('You disabled cleanuptimout in config. Aborting.');
299
+		}
300
+
301
+		$this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
302
+		$this->QueuedTasks->cleanOldJobs();
303
+	}
304
+
305
+	/**
306
+	 * Display current settings
307
+	 *
308
+	 * @return void
309
+	 */
310
+	public function settings()
311
+	{
312
+		$this->out('Current Settings:');
313
+		$conf = (array)Configure::read('Queue');
314
+		foreach ($conf as $key => $val) {
315
+			if ($val === false) {
316
+				$val = 'no';
317
+			}
318
+			if ($val === true) {
319
+				$val = 'yes';
320
+			}
321
+			$this->out('* ' . $key . ': ' . print_r($val, true));
322
+		}
323
+
324
+		$this->out();
325
+	}
326
+
327
+	/**
328
+	 * Display some statistics about Finished Jobs.
329
+	 *
330
+	 * @return void
331
+	 */
332
+	public function stats()
333
+	{
334
+		$this->out('Jobs currently in the queue:');
335
+
336
+		$types = $this->QueuedTasks->getTypes()->toArray();
337
+		foreach ($types as $type) {
338
+			$this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
339
+		}
340
+		$this->hr();
341
+		$this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
342
+		$this->hr();
343
+		$this->out('Finished job statistics:');
344
+		$data = $this->QueuedTasks->getStats();
345
+		foreach ($data as $item) {
346
+			$this->out(' ' . $item['task'] . ': ');
347
+			$this->out('   Finished Jobs in Database: ' . $item['num']);
348
+			$this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
349
+			$this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
350
+			$this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
351
+		}
352
+	}
353
+
354
+	/**
355
+	 * Get option parser method to parse commandline options
356
+	 *
357
+	 * @return \Cake\Console\ConsoleOptionParser
358
+	 */
359
+	public function getOptionParser()
360
+	{
361
+		$subcommandParser = [
362
+			'options' => [
363
+				/*
364 364
                  * 'dry-run'=> array(
365 365
                  * 'short' => 'd',
366 366
                  * 'help' => 'Dry run the update, no jobs will actually be added.',
367 367
                  * 'boolean' => true
368 368
                  * ),
369 369
                  */
370
-            ]
371
-        ];
372
-        $subcommandParserFull = $subcommandParser;
373
-        $subcommandParserFull['options']['type'] = [
374
-            'short' => 't',
375
-            'help' => 'Type (comma separated list possible)',
376
-            'default' => null
377
-        ];
378
-
379
-        return parent::getOptionParser()->setDescription($this->getDescription())
380
-            ->addSubcommand('clean', [
381
-                'help' => 'Remove old jobs (cleanup)',
382
-                'parser' => $subcommandParser
383
-            ])
384
-            ->addSubcommand('add', [
385
-                'help' => 'Add Job',
386
-                'parser' => $subcommandParser
387
-            ])
388
-            ->addSubcommand('stats', [
389
-                'help' => 'Stats',
390
-                'parser' => $subcommandParserFull
391
-            ])
392
-            ->addSubcommand('settings', [
393
-                'help' => 'Settings',
394
-                'parser' => $subcommandParserFull
395
-            ])
396
-            ->addSubcommand('runworker', [
397
-                'help' => 'Run Worker',
398
-                'parser' => $subcommandParserFull
399
-            ]);
400
-    }
401
-
402
-    /**
403
-     * Timestamped log.
404
-     *
405
-     * @param string $message Log type
406
-     * @param string|null $pid PID of the process
407
-     * @param bool $addDetails Details
408
-     * @return void
409
-     */
410
-    protected function _log($message, $pid = null, $addDetails = true)
411
-    {
412
-        if (!Configure::read('Queue.log')) {
413
-            return;
414
-        }
415
-
416
-        if ($addDetails) {
417
-            $timeNeeded = $this->_timeNeeded();
418
-            $memoryUsage = $this->_memoryUsage();
419
-            $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
420
-        }
421
-
422
-        if ($pid) {
423
-            $message .= ' (pid ' . $pid . ')';
424
-        }
425
-        Log::write('info', $message, [
426
-            'scope' => 'queue'
427
-        ]);
428
-    }
429
-
430
-    /**
431
-     *
432
-     * @param string $message Message
433
-     * @param string|null $pid PID of the process
434
-     * @return void
435
-     */
436
-    protected function _logError($message, $pid = null)
437
-    {
438
-        $timeNeeded = $this->_timeNeeded();
439
-        $memoryUsage = $this->_memoryUsage();
440
-        $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
441
-
442
-        if ($pid) {
443
-            $message .= ' (pid ' . $pid . ')';
444
-        }
445
-
446
-        Log::write('error', $message);
447
-    }
448
-
449
-    /**
450
-     * Returns a List of available QueueTasks and their individual configurations.
451
-     *
452
-     * @return array
453
-     */
454
-    protected function _getTaskConf()
455
-    {
456
-        if (!is_array($this->_taskConf)) {
457
-            $this->_taskConf = [];
458
-            foreach ($this->tasks as $task) {
459
-                list ($pluginName, $taskName) = pluginSplit($task);
460
-
461
-                $this->_taskConf[$taskName]['name'] = substr($taskName, 5);
462
-                $this->_taskConf[$taskName]['plugin'] = $pluginName;
463
-                if (property_exists($this->{$taskName}, 'timeout')) {
464
-                    $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
465
-                } else {
466
-                    $this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
467
-                }
468
-                if (property_exists($this->{$taskName}, 'retries')) {
469
-                    $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
470
-                } else {
471
-                    $this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
472
-                }
473
-                if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
474
-                    $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
475
-                } else {
476
-                    $this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
477
-                }
478
-            }
479
-        }
480
-
481
-        return $this->_taskConf;
482
-    }
483
-
484
-    /**
485
-     * Signal handling to queue worker for clean shutdown
486
-     *
487
-     * @param int $signal The signal
488
-     * @return void
489
-     */
490
-    protected function _exit($signal)
491
-    {
492
-        $this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
493
-        $this->_exit = true;
494
-    }
495
-
496
-    /**
497
-     *
498
-     * @return void
499
-     */
500
-    protected function _displayAvailableTasks()
501
-    {
502
-        $this->out('Available Tasks:');
503
-        foreach ($this->taskNames as $loadedTask) {
504
-            $this->out("\t" . '* ' . $this->_taskName($loadedTask));
505
-        }
506
-    }
507
-
508
-    /**
509
-     *
510
-     * @return string
511
-     */
512
-    protected function _initPid()
513
-    {
514
-        $this->_pid = $this->_retrievePid();
515
-
516
-        return $this->_pid;
517
-    }
518
-
519
-    /**
520
-     *
521
-     * @return string
522
-     */
523
-    protected function _retrievePid()
524
-    {
525
-        if (function_exists('posix_getpid')) {
526
-            $pid = (string)posix_getpid();
527
-        } else {
528
-            $pid = $this->QueuedTasks->key();
529
-        }
530
-
531
-        return $pid;
532
-    }
533
-
534
-    /**
535
-     *
536
-     * @return string Memory usage in MB.
537
-     */
538
-    protected function _memoryUsage()
539
-    {
540
-        $limit = ini_get('memory_limit');
541
-
542
-        $used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
543
-        if ($limit !== '-1') {
544
-            $used .= '/' . $limit;
545
-        }
546
-
547
-        return $used;
548
-    }
549
-
550
-    /**
551
-     *
552
-     * @param string|null $pid PID of the process
553
-     *
554
-     * @return void
555
-     */
556
-    protected function _deletePid($pid)
557
-    {
558
-        if (!$pid) {
559
-            $pid = $this->_pid;
560
-        }
561
-        if (!$pid) {
562
-            return;
563
-        }
564
-    }
565
-
566
-    /**
567
-     *
568
-     * @return string
569
-     */
570
-    protected function _timeNeeded()
571
-    {
572
-        $diff = $this->_time() - $this->_time($this->_time);
573
-        $seconds = max($diff, 1);
574
-
575
-        return $seconds . 's';
576
-    }
577
-
578
-    /**
579
-     *
580
-     * @param int|null $providedTime Provided time
581
-     *
582
-     * @return int
583
-     */
584
-    protected function _time($providedTime = null)
585
-    {
586
-        if ($providedTime) {
587
-            return $providedTime;
588
-        }
589
-
590
-        return time();
591
-    }
592
-
593
-    /**
594
-     *
595
-     * @param string|null $param String to convert
596
-     * @return array
597
-     */
598
-    protected function _stringToArray($param)
599
-    {
600
-        if (!$param) {
601
-            return [];
602
-        }
603
-
604
-        $array = Text::tokenize($param);
605
-
606
-        return array_filter($array);
607
-    }
370
+			]
371
+		];
372
+		$subcommandParserFull = $subcommandParser;
373
+		$subcommandParserFull['options']['type'] = [
374
+			'short' => 't',
375
+			'help' => 'Type (comma separated list possible)',
376
+			'default' => null
377
+		];
378
+
379
+		return parent::getOptionParser()->setDescription($this->getDescription())
380
+			->addSubcommand('clean', [
381
+				'help' => 'Remove old jobs (cleanup)',
382
+				'parser' => $subcommandParser
383
+			])
384
+			->addSubcommand('add', [
385
+				'help' => 'Add Job',
386
+				'parser' => $subcommandParser
387
+			])
388
+			->addSubcommand('stats', [
389
+				'help' => 'Stats',
390
+				'parser' => $subcommandParserFull
391
+			])
392
+			->addSubcommand('settings', [
393
+				'help' => 'Settings',
394
+				'parser' => $subcommandParserFull
395
+			])
396
+			->addSubcommand('runworker', [
397
+				'help' => 'Run Worker',
398
+				'parser' => $subcommandParserFull
399
+			]);
400
+	}
401
+
402
+	/**
403
+	 * Timestamped log.
404
+	 *
405
+	 * @param string $message Log type
406
+	 * @param string|null $pid PID of the process
407
+	 * @param bool $addDetails Details
408
+	 * @return void
409
+	 */
410
+	protected function _log($message, $pid = null, $addDetails = true)
411
+	{
412
+		if (!Configure::read('Queue.log')) {
413
+			return;
414
+		}
415
+
416
+		if ($addDetails) {
417
+			$timeNeeded = $this->_timeNeeded();
418
+			$memoryUsage = $this->_memoryUsage();
419
+			$message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
420
+		}
421
+
422
+		if ($pid) {
423
+			$message .= ' (pid ' . $pid . ')';
424
+		}
425
+		Log::write('info', $message, [
426
+			'scope' => 'queue'
427
+		]);
428
+	}
429
+
430
+	/**
431
+	 *
432
+	 * @param string $message Message
433
+	 * @param string|null $pid PID of the process
434
+	 * @return void
435
+	 */
436
+	protected function _logError($message, $pid = null)
437
+	{
438
+		$timeNeeded = $this->_timeNeeded();
439
+		$memoryUsage = $this->_memoryUsage();
440
+		$message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
441
+
442
+		if ($pid) {
443
+			$message .= ' (pid ' . $pid . ')';
444
+		}
445
+
446
+		Log::write('error', $message);
447
+	}
448
+
449
+	/**
450
+	 * Returns a List of available QueueTasks and their individual configurations.
451
+	 *
452
+	 * @return array
453
+	 */
454
+	protected function _getTaskConf()
455
+	{
456
+		if (!is_array($this->_taskConf)) {
457
+			$this->_taskConf = [];
458
+			foreach ($this->tasks as $task) {
459
+				list ($pluginName, $taskName) = pluginSplit($task);
460
+
461
+				$this->_taskConf[$taskName]['name'] = substr($taskName, 5);
462
+				$this->_taskConf[$taskName]['plugin'] = $pluginName;
463
+				if (property_exists($this->{$taskName}, 'timeout')) {
464
+					$this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
465
+				} else {
466
+					$this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
467
+				}
468
+				if (property_exists($this->{$taskName}, 'retries')) {
469
+					$this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
470
+				} else {
471
+					$this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
472
+				}
473
+				if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
474
+					$this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
475
+				} else {
476
+					$this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
477
+				}
478
+			}
479
+		}
480
+
481
+		return $this->_taskConf;
482
+	}
483
+
484
+	/**
485
+	 * Signal handling to queue worker for clean shutdown
486
+	 *
487
+	 * @param int $signal The signal
488
+	 * @return void
489
+	 */
490
+	protected function _exit($signal)
491
+	{
492
+		$this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
493
+		$this->_exit = true;
494
+	}
495
+
496
+	/**
497
+	 *
498
+	 * @return void
499
+	 */
500
+	protected function _displayAvailableTasks()
501
+	{
502
+		$this->out('Available Tasks:');
503
+		foreach ($this->taskNames as $loadedTask) {
504
+			$this->out("\t" . '* ' . $this->_taskName($loadedTask));
505
+		}
506
+	}
507
+
508
+	/**
509
+	 *
510
+	 * @return string
511
+	 */
512
+	protected function _initPid()
513
+	{
514
+		$this->_pid = $this->_retrievePid();
515
+
516
+		return $this->_pid;
517
+	}
518
+
519
+	/**
520
+	 *
521
+	 * @return string
522
+	 */
523
+	protected function _retrievePid()
524
+	{
525
+		if (function_exists('posix_getpid')) {
526
+			$pid = (string)posix_getpid();
527
+		} else {
528
+			$pid = $this->QueuedTasks->key();
529
+		}
530
+
531
+		return $pid;
532
+	}
533
+
534
+	/**
535
+	 *
536
+	 * @return string Memory usage in MB.
537
+	 */
538
+	protected function _memoryUsage()
539
+	{
540
+		$limit = ini_get('memory_limit');
541
+
542
+		$used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
543
+		if ($limit !== '-1') {
544
+			$used .= '/' . $limit;
545
+		}
546
+
547
+		return $used;
548
+	}
549
+
550
+	/**
551
+	 *
552
+	 * @param string|null $pid PID of the process
553
+	 *
554
+	 * @return void
555
+	 */
556
+	protected function _deletePid($pid)
557
+	{
558
+		if (!$pid) {
559
+			$pid = $this->_pid;
560
+		}
561
+		if (!$pid) {
562
+			return;
563
+		}
564
+	}
565
+
566
+	/**
567
+	 *
568
+	 * @return string
569
+	 */
570
+	protected function _timeNeeded()
571
+	{
572
+		$diff = $this->_time() - $this->_time($this->_time);
573
+		$seconds = max($diff, 1);
574
+
575
+		return $seconds . 's';
576
+	}
577
+
578
+	/**
579
+	 *
580
+	 * @param int|null $providedTime Provided time
581
+	 *
582
+	 * @return int
583
+	 */
584
+	protected function _time($providedTime = null)
585
+	{
586
+		if ($providedTime) {
587
+			return $providedTime;
588
+		}
589
+
590
+		return time();
591
+	}
592
+
593
+	/**
594
+	 *
595
+	 * @param string|null $param String to convert
596
+	 * @return array
597
+	 */
598
+	protected function _stringToArray($param)
599
+	{
600
+		if (!$param) {
601
+			return [];
602
+		}
603
+
604
+		$array = Text::tokenize($param);
605
+
606
+		return array_filter($array);
607
+	}
608 608
 }
Please login to merge, or discard this patch.
tests/TestCase/Queue/TaskFinderTest.php 1 patch
Indentation   +46 added lines, -46 removed lines patch added patch discarded remove patch
@@ -7,50 +7,50 @@
 block discarded – undo
7 7
 class TaskFinderTest extends TestCase
8 8
 {
9 9
 
10
-    /**
11
-     *
12
-     * @var \Queue\Shell\QueueShell|\PHPUnit_Framework_MockObject_MockObject
13
-     */
14
-    public $QueueShell;
15
-
16
-    /**
17
-     *
18
-     * @var \Queue\Queue\TaskFinder
19
-     */
20
-    protected $taskFinder;
21
-
22
-    /**
23
-     * Fixtures to load
24
-     *
25
-     * @var array
26
-     */
27
-    public $fixtures = [
28
-        'plugin.Queue.QueuedTasks',
29
-    ];
30
-
31
-    /**
32
-     * Setup Defaults
33
-     *
34
-     * @return void
35
-     */
36
-    public function setUp()
37
-    {
38
-        parent::setUp();
39
-    }
40
-
41
-    /**
42
-     *
43
-     * @return void
44
-     */
45
-    public function testAllAppAndPluginTasks()
46
-    {
47
-        $this->taskFinder = new TaskFinder();
48
-
49
-        $result = $this->taskFinder->allAppAndPluginTasks();
50
-        $this->assertCount(9, $result);
51
-        $this->assertArraySubset([
52
-            'QueueFoo'
53
-        ], $result);
54
-        $this->assertTrue(!in_array('Foo.QueueFoo', $result));
55
-    }
10
+	/**
11
+	 *
12
+	 * @var \Queue\Shell\QueueShell|\PHPUnit_Framework_MockObject_MockObject
13
+	 */
14
+	public $QueueShell;
15
+
16
+	/**
17
+	 *
18
+	 * @var \Queue\Queue\TaskFinder
19
+	 */
20
+	protected $taskFinder;
21
+
22
+	/**
23
+	 * Fixtures to load
24
+	 *
25
+	 * @var array
26
+	 */
27
+	public $fixtures = [
28
+		'plugin.Queue.QueuedTasks',
29
+	];
30
+
31
+	/**
32
+	 * Setup Defaults
33
+	 *
34
+	 * @return void
35
+	 */
36
+	public function setUp()
37
+	{
38
+		parent::setUp();
39
+	}
40
+
41
+	/**
42
+	 *
43
+	 * @return void
44
+	 */
45
+	public function testAllAppAndPluginTasks()
46
+	{
47
+		$this->taskFinder = new TaskFinder();
48
+
49
+		$result = $this->taskFinder->allAppAndPluginTasks();
50
+		$this->assertCount(9, $result);
51
+		$this->assertArraySubset([
52
+			'QueueFoo'
53
+		], $result);
54
+		$this->assertTrue(!in_array('Foo.QueueFoo', $result));
55
+	}
56 56
 }
Please login to merge, or discard this patch.
tests/TestCase/Shell/QueueShellTest.php 1 patch
Indentation   +232 added lines, -232 removed lines patch added patch discarded remove patch
@@ -13,241 +13,241 @@
 block discarded – undo
13 13
 class QueueShellWrapper extends QueueShell
14 14
 {
15 15
 
16
-    /**
17
-     * A list with error messages.
18
-     *
19
-     * @var array
20
-     */
21
-    protected $_err = [];
22
-
23
-    /**
24
-     * A list with out messages.
25
-     *
26
-     * @var array
27
-     */
28
-    protected $_out = [];
29
-
30
-    /**
31
-     * Test double of `parent::err`.
32
-     *
33
-     * @return void
34
-     */
35
-    public function err($message = null, $newlines = 1)
36
-    {
37
-        $this->_err[] = $message;
38
-    }
39
-
40
-    /**
41
-     * Test double of `parent::out`.
42
-     *
43
-     * @return void
44
-     */
45
-    public function out($message = null, $newlines = 1, $level = Shell::NORMAL)
46
-    {
47
-        $this->_out[] = $message;
48
-    }
49
-
50
-    /**
51
-     * Test double of `parent::_stop`.
52
-     *
53
-     * @return int
54
-     */
55
-    protected function _stop($status = 0)
56
-    {
57
-        return $status;
58
-    }
16
+	/**
17
+	 * A list with error messages.
18
+	 *
19
+	 * @var array
20
+	 */
21
+	protected $_err = [];
22
+
23
+	/**
24
+	 * A list with out messages.
25
+	 *
26
+	 * @var array
27
+	 */
28
+	protected $_out = [];
29
+
30
+	/**
31
+	 * Test double of `parent::err`.
32
+	 *
33
+	 * @return void
34
+	 */
35
+	public function err($message = null, $newlines = 1)
36
+	{
37
+		$this->_err[] = $message;
38
+	}
39
+
40
+	/**
41
+	 * Test double of `parent::out`.
42
+	 *
43
+	 * @return void
44
+	 */
45
+	public function out($message = null, $newlines = 1, $level = Shell::NORMAL)
46
+	{
47
+		$this->_out[] = $message;
48
+	}
49
+
50
+	/**
51
+	 * Test double of `parent::_stop`.
52
+	 *
53
+	 * @return int
54
+	 */
55
+	protected function _stop($status = 0)
56
+	{
57
+		return $status;
58
+	}
59 59
 }
60 60
 
61 61
 class QueueShellTest extends TestCase
62 62
 {
63 63
 
64
-    /**
65
-     *
66
-     * @var QueueShellWrapper
67
-     */
68
-    public $QueueShell;
69
-
70
-    /**
71
-     * Fixtures to load
72
-     *
73
-     * @var array
74
-     */
75
-    public $fixtures = [
76
-        'plugin.Queue.QueuedTasks'
77
-    ];
78
-
79
-    /**
80
-     * Setup Defaults
81
-     *
82
-     * @return void
83
-     */
84
-    public function setUp()
85
-    {
86
-        parent::setUp();
87
-
88
-        $this->QueueShell = $this->getMockBuilder(QueueShellWrapper::class)
89
-            ->setMethods([
90
-                'in',
91
-                'err',
92
-                '_stop'
93
-            ])
94
-            ->getMock();
95
-
96
-        $this->QueueShell->initialize();
97
-
98
-        Configure::write('Queue', [
99
-            'sleepTime' => 2,
100
-            'defaultWorkerTimeout' => 3,
101
-            'workerMaxRuntime' => 5,
102
-            'cleanupTimeout' => 10,
103
-            'exitWhenNothingToDo' => false,
104
-            'log' => false
105
-        ]);
106
-    }
107
-
108
-    /**
109
-     *
110
-     * @return void
111
-     */
112
-    public function testObject()
113
-    {
114
-        $this->assertTrue(is_object($this->QueueShell));
115
-        $this->assertInstanceOf(QueueShell::class, $this->QueueShell);
116
-    }
117
-
118
-    /**
119
-     *
120
-     * @return void
121
-     */
122
-    public function testStats()
123
-    {
124
-        $this->_needsConnection();
125
-
126
-        $this->QueueShell->stats();
127
-        $this->assertContains('Total unfinished jobs: 0', $this->out);
128
-    }
129
-
130
-    /**
131
-     *
132
-     * @return void
133
-     */
134
-    public function testSettings()
135
-    {
136
-        $this->QueueShell->settings();
137
-        $this->assertContains('* cleanuptimeout: 10', $this->out);
138
-    }
139
-
140
-    /**
141
-     *
142
-     * @return void
143
-     */
144
-    public function testAddInexistent()
145
-    {
146
-        $this->QueueShell->args[] = 'FooBar';
147
-        $this->QueueShell->add();
148
-        $this->assertContains('Error: Task not found: FooBar', $this->out);
149
-    }
150
-
151
-    /**
152
-     *
153
-     * @return void
154
-     */
155
-    public function testAdd()
156
-    {
157
-        $this->QueueShell->args[] = 'Example';
158
-        $this->QueueShell->add();
159
-
160
-        $this->assertContains('OK, job created, now run the worker', $this->out, print_r($this->out->output, true));
161
-    }
162
-
163
-    /**
164
-     *
165
-     * @return void
166
-     */
167
-    public function testRetry()
168
-    {
169
-        $file = TMP . 'task_retry.txt';
170
-        if (file_exists($file)) {
171
-            unlink($file);
172
-        }
173
-
174
-        $this->_needsConnection();
175
-
176
-        $this->QueueShell->args[] = 'RetryExample';
177
-        $this->QueueShell->add();
178
-
179
-        $expected = 'This is a very simple example of a QueueTask and how retries work';
180
-        $this->assertContains($expected, $this->out);
181
-
182
-        $this->QueueShell->runworker();
183
-
184
-        $this->assertContains('Job did not finish, requeued after try 1.', $this->out);
185
-    }
186
-
187
-    /**
188
-     *
189
-     * @return void
190
-     */
191
-    public function testTimeNeeded()
192
-    {
193
-        $this->QueueShell = $this->getMockBuilder(QueueShell::class)
194
-            ->setMethods([
195
-                '_time'
196
-            ])
197
-            ->getMock();
198
-
199
-        $first = time();
200
-        $second = $first - HOUR + MINUTE;
201
-        $this->QueueShell->expects($this->at(0))
202
-            ->method('_time')
203
-            ->will($this->returnValue($first));
204
-        $this->QueueShell->expects($this->at(1))
205
-            ->method('_time')
206
-            ->will($this->returnValue($second));
207
-        $this->QueueShell->expects($this->exactly(2))
208
-            ->method('_time')
209
-            ->withAnyParameters();
210
-
211
-        $result = $this->invokeMethod($this->QueueShell, '_timeNeeded');
212
-        $this->assertSame('3540s', $result);
213
-    }
214
-
215
-    /**
216
-     *
217
-     * @return void
218
-     */
219
-    public function testMemoryUsage()
220
-    {
221
-        $result = $this->invokeMethod($this->QueueShell, '_memoryUsage');
222
-        $this->assertRegExp('/^\d+MB/', $result, 'Should be e.g. `17MB` or `17MB/1GB` etc.');
223
-    }
224
-
225
-    /**
226
-     *
227
-     * @return void
228
-     */
229
-    public function testStringToArray()
230
-    {
231
-        $string = 'Foo,Bar,';
232
-        $result = $this->invokeMethod($this->QueueShell, '_stringToArray', [
233
-            $string
234
-        ]);
235
-
236
-        $expected = [
237
-            'Foo',
238
-            'Bar'
239
-        ];
240
-        $this->assertSame($expected, $result);
241
-    }
242
-
243
-    /**
244
-     * Helper method for skipping tests that need a real connection.
245
-     *
246
-     * @return void
247
-     */
248
-    protected function _needsConnection()
249
-    {
250
-        $config = ConnectionManager::getConfig('test');
251
-        $this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
252
-    }
64
+	/**
65
+	 *
66
+	 * @var QueueShellWrapper
67
+	 */
68
+	public $QueueShell;
69
+
70
+	/**
71
+	 * Fixtures to load
72
+	 *
73
+	 * @var array
74
+	 */
75
+	public $fixtures = [
76
+		'plugin.Queue.QueuedTasks'
77
+	];
78
+
79
+	/**
80
+	 * Setup Defaults
81
+	 *
82
+	 * @return void
83
+	 */
84
+	public function setUp()
85
+	{
86
+		parent::setUp();
87
+
88
+		$this->QueueShell = $this->getMockBuilder(QueueShellWrapper::class)
89
+			->setMethods([
90
+				'in',
91
+				'err',
92
+				'_stop'
93
+			])
94
+			->getMock();
95
+
96
+		$this->QueueShell->initialize();
97
+
98
+		Configure::write('Queue', [
99
+			'sleepTime' => 2,
100
+			'defaultWorkerTimeout' => 3,
101
+			'workerMaxRuntime' => 5,
102
+			'cleanupTimeout' => 10,
103
+			'exitWhenNothingToDo' => false,
104
+			'log' => false
105
+		]);
106
+	}
107
+
108
+	/**
109
+	 *
110
+	 * @return void
111
+	 */
112
+	public function testObject()
113
+	{
114
+		$this->assertTrue(is_object($this->QueueShell));
115
+		$this->assertInstanceOf(QueueShell::class, $this->QueueShell);
116
+	}
117
+
118
+	/**
119
+	 *
120
+	 * @return void
121
+	 */
122
+	public function testStats()
123
+	{
124
+		$this->_needsConnection();
125
+
126
+		$this->QueueShell->stats();
127
+		$this->assertContains('Total unfinished jobs: 0', $this->out);
128
+	}
129
+
130
+	/**
131
+	 *
132
+	 * @return void
133
+	 */
134
+	public function testSettings()
135
+	{
136
+		$this->QueueShell->settings();
137
+		$this->assertContains('* cleanuptimeout: 10', $this->out);
138
+	}
139
+
140
+	/**
141
+	 *
142
+	 * @return void
143
+	 */
144
+	public function testAddInexistent()
145
+	{
146
+		$this->QueueShell->args[] = 'FooBar';
147
+		$this->QueueShell->add();
148
+		$this->assertContains('Error: Task not found: FooBar', $this->out);
149
+	}
150
+
151
+	/**
152
+	 *
153
+	 * @return void
154
+	 */
155
+	public function testAdd()
156
+	{
157
+		$this->QueueShell->args[] = 'Example';
158
+		$this->QueueShell->add();
159
+
160
+		$this->assertContains('OK, job created, now run the worker', $this->out, print_r($this->out->output, true));
161
+	}
162
+
163
+	/**
164
+	 *
165
+	 * @return void
166
+	 */
167
+	public function testRetry()
168
+	{
169
+		$file = TMP . 'task_retry.txt';
170
+		if (file_exists($file)) {
171
+			unlink($file);
172
+		}
173
+
174
+		$this->_needsConnection();
175
+
176
+		$this->QueueShell->args[] = 'RetryExample';
177
+		$this->QueueShell->add();
178
+
179
+		$expected = 'This is a very simple example of a QueueTask and how retries work';
180
+		$this->assertContains($expected, $this->out);
181
+
182
+		$this->QueueShell->runworker();
183
+
184
+		$this->assertContains('Job did not finish, requeued after try 1.', $this->out);
185
+	}
186
+
187
+	/**
188
+	 *
189
+	 * @return void
190
+	 */
191
+	public function testTimeNeeded()
192
+	{
193
+		$this->QueueShell = $this->getMockBuilder(QueueShell::class)
194
+			->setMethods([
195
+				'_time'
196
+			])
197
+			->getMock();
198
+
199
+		$first = time();
200
+		$second = $first - HOUR + MINUTE;
201
+		$this->QueueShell->expects($this->at(0))
202
+			->method('_time')
203
+			->will($this->returnValue($first));
204
+		$this->QueueShell->expects($this->at(1))
205
+			->method('_time')
206
+			->will($this->returnValue($second));
207
+		$this->QueueShell->expects($this->exactly(2))
208
+			->method('_time')
209
+			->withAnyParameters();
210
+
211
+		$result = $this->invokeMethod($this->QueueShell, '_timeNeeded');
212
+		$this->assertSame('3540s', $result);
213
+	}
214
+
215
+	/**
216
+	 *
217
+	 * @return void
218
+	 */
219
+	public function testMemoryUsage()
220
+	{
221
+		$result = $this->invokeMethod($this->QueueShell, '_memoryUsage');
222
+		$this->assertRegExp('/^\d+MB/', $result, 'Should be e.g. `17MB` or `17MB/1GB` etc.');
223
+	}
224
+
225
+	/**
226
+	 *
227
+	 * @return void
228
+	 */
229
+	public function testStringToArray()
230
+	{
231
+		$string = 'Foo,Bar,';
232
+		$result = $this->invokeMethod($this->QueueShell, '_stringToArray', [
233
+			$string
234
+		]);
235
+
236
+		$expected = [
237
+			'Foo',
238
+			'Bar'
239
+		];
240
+		$this->assertSame($expected, $result);
241
+	}
242
+
243
+	/**
244
+	 * Helper method for skipping tests that need a real connection.
245
+	 *
246
+	 * @return void
247
+	 */
248
+	protected function _needsConnection()
249
+	{
250
+		$config = ConnectionManager::getConfig('test');
251
+		$this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
252
+	}
253 253
 }
Please login to merge, or discard this patch.
tests/TestCase/Model/Table/QueuedTasksTableTest.php 1 patch
Indentation   +413 added lines, -413 removed lines patch added patch discarded remove patch
@@ -19,418 +19,418 @@
 block discarded – undo
19 19
 class QueuedTasksTableTest extends TestCase
20 20
 {
21 21
 
22
-    /**
23
-     *
24
-     * @var \Queue\Model\Table\QueuedTasksTable
25
-     */
26
-    protected $QueuedTasks;
27
-
28
-    /**
29
-     * Fixtures
30
-     *
31
-     * @var array
32
-     */
33
-    public $fixtures = [
34
-        'plugin.Queue.QueuedTasks'
35
-    ];
36
-
37
-    /**
38
-     * setUp method
39
-     *
40
-     * @return void
41
-     */
42
-    public function setUp()
43
-    {
44
-        parent::setUp();
45
-        $config = TableRegistry::getTableLocator()->exists('QueuedTasks') ? [] : [
46
-            'className' => QueuedTasksTable::class
47
-        ];
48
-        $this->QueuedTasks = TableRegistry::getTableLocator()->get('QueuedTasks', $config);
49
-    }
50
-
51
-    /**
52
-     * Basic Instance test
53
-     *
54
-     * @return void
55
-     */
56
-    public function testQueueInstance()
57
-    {
58
-        $this->assertInstanceOf(QueuedTasksTable::class, $this->QueuedTasks);
59
-    }
60
-
61
-    /**
62
-     * Test the basic create and length evaluation functions.
63
-     *
64
-     * @return void
65
-     */
66
-    public function testCreateAndCount()
67
-    {
68
-        // at first, the queue should contain 0 items.
69
-        $this->assertSame(0, $this->QueuedTasks->getLength());
70
-
71
-        // create a job
72
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test1', [
73
-            'some' => 'random',
74
-            'test' => 'data'
75
-        ]));
76
-
77
-        // test if queue Length is 1 now.
78
-        $this->assertSame(1, $this->QueuedTasks->getLength());
79
-
80
-        // create some more jobs
81
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
82
-            'some' => 'random',
83
-            'test' => 'data2'
84
-        ]));
85
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
86
-            'some' => 'random',
87
-            'test' => 'data3'
88
-        ]));
89
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test3', [
90
-            'some' => 'random',
91
-            'test' => 'data4'
92
-        ]));
93
-
94
-        // overall queueLength shpould now be 4
95
-        $this->assertSame(4, $this->QueuedTasks->getLength());
96
-
97
-        // there should be 1 task of type 'test1', one of type 'test3' and 2 of type 'test2'
98
-        $this->assertSame(1, $this->QueuedTasks->getLength('test1'));
99
-        $this->assertSame(2, $this->QueuedTasks->getLength('test2'));
100
-        $this->assertSame(1, $this->QueuedTasks->getLength('test3'));
101
-    }
102
-
103
-    /**
104
-     * Test the basic create and fetch functions.
105
-     *
106
-     * @return void
107
-     */
108
-    public function testCreateAndFetch()
109
-    {
110
-        $this->_needsConnection();
111
-
112
-        // $capabilities is a list of tasks the worker can run.
113
-        $capabilities = [
114
-            'task1' => [
115
-                'name' => 'task1',
116
-                'timeout' => 100,
117
-                'retries' => 2
118
-            ]
119
-        ];
120
-        $testData = [
121
-            'x1' => 'y1',
122
-            'x2' => 'y2',
123
-            'x3' => 'y3',
124
-            'x4' => 'y4'
125
-        ];
126
-
127
-        // start off empty.
128
-        $this->assertSame([], $this->QueuedTasks->find()
129
-            ->toArray());
130
-        // at first, the queue should contain 0 items.
131
-        $this->assertSame(0, $this->QueuedTasks->getLength());
132
-        // there are no jobs, so we cant fetch any.
133
-        $this->assertNull($this->QueuedTasks->requestJob($capabilities));
134
-        // insert one job.
135
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $testData));
136
-
137
-        // fetch and check the first job.
138
-        $job = $this->QueuedTasks->requestJob($capabilities);
139
-        $this->assertSame(1, $job->id);
140
-        $this->assertSame('task1', $job->task);
141
-        $this->assertSame(0, $job->failed);
142
-        $this->assertNull($job->completed);
143
-        $this->assertSame($testData, unserialize($job->data));
144
-
145
-        // after this job has been fetched, it may not be reassigned.
146
-        $result = $this->QueuedTasks->requestJob($capabilities);
147
-        $this->assertNull($result);
148
-
149
-        // queue length is still 1 since the first job did not finish.
150
-        $this->assertSame(1, $this->QueuedTasks->getLength());
151
-
152
-        // Now mark Task1 as done
153
-        $this->assertTrue($this->QueuedTasks->markJobDone($job));
154
-
155
-        // Should be 0 again.
156
-        $this->assertSame(0, $this->QueuedTasks->getLength());
157
-    }
158
-
159
-    /**
160
-     * Test the delivery of jobs in sequence, skipping fetched but not completed tasks.
161
-     *
162
-     * @return void
163
-     */
164
-    public function testSequence()
165
-    {
166
-        $this->_needsConnection();
167
-
168
-        // $capabilities is a list of tasks the worker can run.
169
-        $capabilities = [
170
-            'task1' => [
171
-                'name' => 'task1',
172
-                'timeout' => 100,
173
-                'retries' => 2
174
-            ]
175
-        ];
176
-        // at first, the queue should contain 0 items.
177
-        $this->assertSame(0, $this->QueuedTasks->getLength());
178
-        // create some more jobs
179
-        foreach (range(0, 9) as $num) {
180
-            $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
181
-                'tasknum' => $num
182
-            ]));
183
-        }
184
-        // 10 jobs in the queue.
185
-        $this->assertSame(10, $this->QueuedTasks->getLength());
186
-
187
-        // jobs should be fetched in the original sequence.
188
-        $array = [];
189
-        foreach (range(0, 4) as $num) {
190
-            $this->QueuedTasks->clearKey();
191
-            $array[$num] = $this->QueuedTasks->requestJob($capabilities);
192
-            $jobData = unserialize($array[$num]['data']);
193
-            $this->assertSame($num, $jobData['tasknum']);
194
-        }
195
-        // now mark them as done
196
-        foreach (range(0, 4) as $num) {
197
-            $this->assertTrue($this->QueuedTasks->markJobDone($array[$num]));
198
-            $this->assertSame(9 - $num, $this->QueuedTasks->getLength());
199
-        }
200
-
201
-        // jobs should be fetched in the original sequence.
202
-        foreach (range(5, 9) as $num) {
203
-            $job = $this->QueuedTasks->requestJob($capabilities);
204
-            $jobData = unserialize($job->data);
205
-            $this->assertSame($num, $jobData['tasknum']);
206
-            $this->assertTrue($this->QueuedTasks->markJobDone($job));
207
-            $this->assertSame(9 - $num, $this->QueuedTasks->getLength());
208
-        }
209
-    }
210
-
211
-    /**
212
-     * Test creating Jobs to run close to a specified time, and strtotime parsing.
213
-     * Using toUnixString() function to convert Time object to timestamp, instead of strtotime
214
-     *
215
-     * @return null
216
-     */
217
-    public function testNotBefore()
218
-    {
219
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Min'));
220
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Day'));
221
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '2009-07-01 12:00:00'));
222
-        $data = $this->QueuedTasks->find('all')->toArray();
223
-        $this->assertWithinRange((new Time('+ 1 Min'))->toUnixString(), $data[0]['not_before']->toUnixString(), 60);
224
-        $this->assertWithinRange((new Time('+ 1 Day'))->toUnixString(), $data[1]['not_before']->toUnixString(), 60);
225
-        $this->assertWithinRange((new Time('2009-07-01 12:00:00'))->toUnixString(), $data[2]['not_before']->toUnixString(), 60);
226
-    }
227
-
228
-    /**
229
-     * Test Job reordering depending on 'notBefore' field.
230
-     * Jobs with an expired not_before field should be executed before any other job without specific timing info.
231
-     *
232
-     * @return void
233
-     */
234
-    public function testNotBeforeOrder()
235
-    {
236
-        $this->_needsConnection();
237
-
238
-        $capabilities = [
239
-            'task1' => [
240
-                'name' => 'task1',
241
-                'timeout' => 100,
242
-                'retries' => 2
243
-            ],
244
-            'dummytask' => [
245
-                'name' => 'dummytask',
246
-                'timeout' => 100,
247
-                'retries' => 2
248
-            ]
249
-        ];
250
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
251
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
252
-        // create a task with it's execution target some seconds in the past, so it should jump to the top of the testCreateAndFetchlist.
253
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
254
-            'three'
255
-        ], '- 3 Seconds'));
256
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
257
-            'two'
258
-        ], '- 5 Seconds'));
259
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
260
-            'one'
261
-        ], '- 7 Seconds'));
262
-
263
-        // when using requestJob, the jobs we just created should be delivered in this order, NOT the order in which they where created.
264
-        $expected = [
265
-            [
266
-                'name' => 'task1',
267
-                'data' => [
268
-                    'one'
269
-                ]
270
-            ],
271
-            [
272
-                'name' => 'task1',
273
-                'data' => [
274
-                    'two'
275
-                ]
276
-            ],
277
-            [
278
-                'name' => 'task1',
279
-                'data' => [
280
-                    'three'
281
-                ]
282
-            ],
283
-            [
284
-                'name' => 'dummytask',
285
-                'data' => null
286
-            ],
287
-            [
288
-                'name' => 'dummytask',
289
-                'data' => null
290
-            ]
291
-        ];
292
-
293
-        foreach ($expected as $item) {
294
-            $this->QueuedTasks->clearKey();
295
-            $tmp = $this->QueuedTasks->requestJob($capabilities);
22
+	/**
23
+	 *
24
+	 * @var \Queue\Model\Table\QueuedTasksTable
25
+	 */
26
+	protected $QueuedTasks;
27
+
28
+	/**
29
+	 * Fixtures
30
+	 *
31
+	 * @var array
32
+	 */
33
+	public $fixtures = [
34
+		'plugin.Queue.QueuedTasks'
35
+	];
36
+
37
+	/**
38
+	 * setUp method
39
+	 *
40
+	 * @return void
41
+	 */
42
+	public function setUp()
43
+	{
44
+		parent::setUp();
45
+		$config = TableRegistry::getTableLocator()->exists('QueuedTasks') ? [] : [
46
+			'className' => QueuedTasksTable::class
47
+		];
48
+		$this->QueuedTasks = TableRegistry::getTableLocator()->get('QueuedTasks', $config);
49
+	}
50
+
51
+	/**
52
+	 * Basic Instance test
53
+	 *
54
+	 * @return void
55
+	 */
56
+	public function testQueueInstance()
57
+	{
58
+		$this->assertInstanceOf(QueuedTasksTable::class, $this->QueuedTasks);
59
+	}
60
+
61
+	/**
62
+	 * Test the basic create and length evaluation functions.
63
+	 *
64
+	 * @return void
65
+	 */
66
+	public function testCreateAndCount()
67
+	{
68
+		// at first, the queue should contain 0 items.
69
+		$this->assertSame(0, $this->QueuedTasks->getLength());
70
+
71
+		// create a job
72
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test1', [
73
+			'some' => 'random',
74
+			'test' => 'data'
75
+		]));
76
+
77
+		// test if queue Length is 1 now.
78
+		$this->assertSame(1, $this->QueuedTasks->getLength());
79
+
80
+		// create some more jobs
81
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
82
+			'some' => 'random',
83
+			'test' => 'data2'
84
+		]));
85
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
86
+			'some' => 'random',
87
+			'test' => 'data3'
88
+		]));
89
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test3', [
90
+			'some' => 'random',
91
+			'test' => 'data4'
92
+		]));
93
+
94
+		// overall queueLength shpould now be 4
95
+		$this->assertSame(4, $this->QueuedTasks->getLength());
96
+
97
+		// there should be 1 task of type 'test1', one of type 'test3' and 2 of type 'test2'
98
+		$this->assertSame(1, $this->QueuedTasks->getLength('test1'));
99
+		$this->assertSame(2, $this->QueuedTasks->getLength('test2'));
100
+		$this->assertSame(1, $this->QueuedTasks->getLength('test3'));
101
+	}
102
+
103
+	/**
104
+	 * Test the basic create and fetch functions.
105
+	 *
106
+	 * @return void
107
+	 */
108
+	public function testCreateAndFetch()
109
+	{
110
+		$this->_needsConnection();
111
+
112
+		// $capabilities is a list of tasks the worker can run.
113
+		$capabilities = [
114
+			'task1' => [
115
+				'name' => 'task1',
116
+				'timeout' => 100,
117
+				'retries' => 2
118
+			]
119
+		];
120
+		$testData = [
121
+			'x1' => 'y1',
122
+			'x2' => 'y2',
123
+			'x3' => 'y3',
124
+			'x4' => 'y4'
125
+		];
126
+
127
+		// start off empty.
128
+		$this->assertSame([], $this->QueuedTasks->find()
129
+			->toArray());
130
+		// at first, the queue should contain 0 items.
131
+		$this->assertSame(0, $this->QueuedTasks->getLength());
132
+		// there are no jobs, so we cant fetch any.
133
+		$this->assertNull($this->QueuedTasks->requestJob($capabilities));
134
+		// insert one job.
135
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $testData));
136
+
137
+		// fetch and check the first job.
138
+		$job = $this->QueuedTasks->requestJob($capabilities);
139
+		$this->assertSame(1, $job->id);
140
+		$this->assertSame('task1', $job->task);
141
+		$this->assertSame(0, $job->failed);
142
+		$this->assertNull($job->completed);
143
+		$this->assertSame($testData, unserialize($job->data));
144
+
145
+		// after this job has been fetched, it may not be reassigned.
146
+		$result = $this->QueuedTasks->requestJob($capabilities);
147
+		$this->assertNull($result);
148
+
149
+		// queue length is still 1 since the first job did not finish.
150
+		$this->assertSame(1, $this->QueuedTasks->getLength());
151
+
152
+		// Now mark Task1 as done
153
+		$this->assertTrue($this->QueuedTasks->markJobDone($job));
154
+
155
+		// Should be 0 again.
156
+		$this->assertSame(0, $this->QueuedTasks->getLength());
157
+	}
158
+
159
+	/**
160
+	 * Test the delivery of jobs in sequence, skipping fetched but not completed tasks.
161
+	 *
162
+	 * @return void
163
+	 */
164
+	public function testSequence()
165
+	{
166
+		$this->_needsConnection();
167
+
168
+		// $capabilities is a list of tasks the worker can run.
169
+		$capabilities = [
170
+			'task1' => [
171
+				'name' => 'task1',
172
+				'timeout' => 100,
173
+				'retries' => 2
174
+			]
175
+		];
176
+		// at first, the queue should contain 0 items.
177
+		$this->assertSame(0, $this->QueuedTasks->getLength());
178
+		// create some more jobs
179
+		foreach (range(0, 9) as $num) {
180
+			$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
181
+				'tasknum' => $num
182
+			]));
183
+		}
184
+		// 10 jobs in the queue.
185
+		$this->assertSame(10, $this->QueuedTasks->getLength());
186
+
187
+		// jobs should be fetched in the original sequence.
188
+		$array = [];
189
+		foreach (range(0, 4) as $num) {
190
+			$this->QueuedTasks->clearKey();
191
+			$array[$num] = $this->QueuedTasks->requestJob($capabilities);
192
+			$jobData = unserialize($array[$num]['data']);
193
+			$this->assertSame($num, $jobData['tasknum']);
194
+		}
195
+		// now mark them as done
196
+		foreach (range(0, 4) as $num) {
197
+			$this->assertTrue($this->QueuedTasks->markJobDone($array[$num]));
198
+			$this->assertSame(9 - $num, $this->QueuedTasks->getLength());
199
+		}
200
+
201
+		// jobs should be fetched in the original sequence.
202
+		foreach (range(5, 9) as $num) {
203
+			$job = $this->QueuedTasks->requestJob($capabilities);
204
+			$jobData = unserialize($job->data);
205
+			$this->assertSame($num, $jobData['tasknum']);
206
+			$this->assertTrue($this->QueuedTasks->markJobDone($job));
207
+			$this->assertSame(9 - $num, $this->QueuedTasks->getLength());
208
+		}
209
+	}
210
+
211
+	/**
212
+	 * Test creating Jobs to run close to a specified time, and strtotime parsing.
213
+	 * Using toUnixString() function to convert Time object to timestamp, instead of strtotime
214
+	 *
215
+	 * @return null
216
+	 */
217
+	public function testNotBefore()
218
+	{
219
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Min'));
220
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Day'));
221
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '2009-07-01 12:00:00'));
222
+		$data = $this->QueuedTasks->find('all')->toArray();
223
+		$this->assertWithinRange((new Time('+ 1 Min'))->toUnixString(), $data[0]['not_before']->toUnixString(), 60);
224
+		$this->assertWithinRange((new Time('+ 1 Day'))->toUnixString(), $data[1]['not_before']->toUnixString(), 60);
225
+		$this->assertWithinRange((new Time('2009-07-01 12:00:00'))->toUnixString(), $data[2]['not_before']->toUnixString(), 60);
226
+	}
227
+
228
+	/**
229
+	 * Test Job reordering depending on 'notBefore' field.
230
+	 * Jobs with an expired not_before field should be executed before any other job without specific timing info.
231
+	 *
232
+	 * @return void
233
+	 */
234
+	public function testNotBeforeOrder()
235
+	{
236
+		$this->_needsConnection();
237
+
238
+		$capabilities = [
239
+			'task1' => [
240
+				'name' => 'task1',
241
+				'timeout' => 100,
242
+				'retries' => 2
243
+			],
244
+			'dummytask' => [
245
+				'name' => 'dummytask',
246
+				'timeout' => 100,
247
+				'retries' => 2
248
+			]
249
+		];
250
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
251
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
252
+		// create a task with it's execution target some seconds in the past, so it should jump to the top of the testCreateAndFetchlist.
253
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
254
+			'three'
255
+		], '- 3 Seconds'));
256
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
257
+			'two'
258
+		], '- 5 Seconds'));
259
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
260
+			'one'
261
+		], '- 7 Seconds'));
262
+
263
+		// when using requestJob, the jobs we just created should be delivered in this order, NOT the order in which they where created.
264
+		$expected = [
265
+			[
266
+				'name' => 'task1',
267
+				'data' => [
268
+					'one'
269
+				]
270
+			],
271
+			[
272
+				'name' => 'task1',
273
+				'data' => [
274
+					'two'
275
+				]
276
+			],
277
+			[
278
+				'name' => 'task1',
279
+				'data' => [
280
+					'three'
281
+				]
282
+			],
283
+			[
284
+				'name' => 'dummytask',
285
+				'data' => null
286
+			],
287
+			[
288
+				'name' => 'dummytask',
289
+				'data' => null
290
+			]
291
+		];
292
+
293
+		foreach ($expected as $item) {
294
+			$this->QueuedTasks->clearKey();
295
+			$tmp = $this->QueuedTasks->requestJob($capabilities);
296 296
 var_dump($tmp);
297
-            $this->assertSame($item['name'], $tmp['task']);
298
-            $this->assertEquals($item['data'], unserialize($tmp['data']));
299
-        }
300
-    }
301
-
302
-    /**
303
-     * Job Rate limiting.
304
-     * Do not execute jobs of a certain type more often than once every X seconds.
305
-     *
306
-     * @return void
307
-     */
308
-    public function testRateLimit()
309
-    {
310
-        $this->_needsConnection();
311
-
312
-        $capabilities = [
313
-            'task1' => [
314
-                'name' => 'task1',
315
-                'timeout' => 101,
316
-                'retries' => 2,
317
-                'rate' => 2
318
-            ],
319
-            'dummytask' => [
320
-                'name' => 'dummytask',
321
-                'timeout' => 101,
322
-                'retries' => 2
323
-            ]
324
-        ];
325
-
326
-        // clear out the rate history
327
-        $this->QueuedTasks->rateHistory = [];
328
-
329
-        $data1 = [
330
-            'key' => 1
331
-        ];
332
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data1));
333
-        $data2 = [
334
-            'key' => 2
335
-        ];
336
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data2));
337
-        $data3 = [
338
-            'key' => 3
339
-        ];
340
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data3));
341
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
342
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
343
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
344
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
345
-
346
-        // At first we get task1-1.
347
-        $this->QueuedTasks->clearKey();
348
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
349
-        $this->assertSame('task1', $tmp['task']);
350
-        $this->assertSame($data1, unserialize($tmp['data']));
351
-
352
-        // The rate limit should now skip over task1-2 and fetch a dummytask.
353
-        $this->QueuedTasks->clearKey();
354
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
355
-        $this->assertSame('dummytask', $tmp['task']);
356
-        $this->assertFalse(unserialize($tmp['data']));
357
-
358
-        usleep(100000);
359
-        // and again.
360
-        $this->QueuedTasks->clearKey();
361
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
362
-        $this->assertSame('dummytask', $tmp['task']);
363
-        $this->assertFalse(unserialize($tmp['data']));
364
-
365
-        // Then some time passes
366
-        sleep(2);
367
-
368
-        // Now we should get task1-2
369
-        $this->QueuedTasks->clearKey();
370
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
371
-        $this->assertSame('task1', $tmp['task']);
372
-        $this->assertSame($data2, unserialize($tmp['data']));
373
-
374
-        // and again rate limit to dummytask.
375
-        $this->QueuedTasks->clearKey();
376
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
377
-        $this->assertSame('dummytask', $tmp['task']);
378
-        $this->assertFalse(unserialize($tmp['data']));
379
-
380
-        // Then some more time passes
381
-        sleep(2);
382
-
383
-        // Now we should get task1-3
384
-        $this->QueuedTasks->clearKey();
385
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
386
-        $this->assertSame('task1', $tmp['task']);
387
-        $this->assertSame($data3, unserialize($tmp['data']));
388
-
389
-        // and again rate limit to dummytask.
390
-        $this->QueuedTasks->clearKey();
391
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
392
-        $this->assertSame('dummytask', $tmp['task']);
393
-        $this->assertFalse(unserialize($tmp['data']));
394
-
395
-        // and now the queue is empty
396
-        $this->QueuedTasks->clearKey();
397
-        $tmp = $this->QueuedTasks->requestJob($capabilities);
398
-        $this->assertNull($tmp);
399
-    }
400
-
401
-    /**
402
-     *
403
-     * @return void
404
-     */
405
-    public function testIsQueued()
406
-    {
407
-        $result = $this->QueuedTasks->isQueued('foo-bar');
408
-        $this->assertFalse($result);
409
-
410
-        $queuedJob = $this->QueuedTasks->newEntity([
411
-            'key' => 'key',
412
-            'task' => 'FooBar'
413
-        ]);
414
-        $this->QueuedTasks->saveOrFail($queuedJob);
415
-
416
-        $result = $this->QueuedTasks->isQueued('foo-bar');
417
-        $this->assertTrue($result);
418
-
419
-        $queuedJob->completed = new FrozenTime();
420
-        $this->QueuedTasks->saveOrFail($queuedJob);
421
-
422
-        $result = $this->QueuedTasks->isQueued('foo-bar');
423
-        $this->assertFalse($result);
424
-    }
425
-
426
-    /**
427
-     * Helper method for skipping tests that need a real connection.
428
-     *
429
-     * @return void
430
-     */
431
-    protected function _needsConnection()
432
-    {
433
-        $config = ConnectionManager::getConfig('test');
434
-        $this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
435
-    }
297
+			$this->assertSame($item['name'], $tmp['task']);
298
+			$this->assertEquals($item['data'], unserialize($tmp['data']));
299
+		}
300
+	}
301
+
302
+	/**
303
+	 * Job Rate limiting.
304
+	 * Do not execute jobs of a certain type more often than once every X seconds.
305
+	 *
306
+	 * @return void
307
+	 */
308
+	public function testRateLimit()
309
+	{
310
+		$this->_needsConnection();
311
+
312
+		$capabilities = [
313
+			'task1' => [
314
+				'name' => 'task1',
315
+				'timeout' => 101,
316
+				'retries' => 2,
317
+				'rate' => 2
318
+			],
319
+			'dummytask' => [
320
+				'name' => 'dummytask',
321
+				'timeout' => 101,
322
+				'retries' => 2
323
+			]
324
+		];
325
+
326
+		// clear out the rate history
327
+		$this->QueuedTasks->rateHistory = [];
328
+
329
+		$data1 = [
330
+			'key' => 1
331
+		];
332
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data1));
333
+		$data2 = [
334
+			'key' => 2
335
+		];
336
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data2));
337
+		$data3 = [
338
+			'key' => 3
339
+		];
340
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $data3));
341
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
342
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
343
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
344
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
345
+
346
+		// At first we get task1-1.
347
+		$this->QueuedTasks->clearKey();
348
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
349
+		$this->assertSame('task1', $tmp['task']);
350
+		$this->assertSame($data1, unserialize($tmp['data']));
351
+
352
+		// The rate limit should now skip over task1-2 and fetch a dummytask.
353
+		$this->QueuedTasks->clearKey();
354
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
355
+		$this->assertSame('dummytask', $tmp['task']);
356
+		$this->assertFalse(unserialize($tmp['data']));
357
+
358
+		usleep(100000);
359
+		// and again.
360
+		$this->QueuedTasks->clearKey();
361
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
362
+		$this->assertSame('dummytask', $tmp['task']);
363
+		$this->assertFalse(unserialize($tmp['data']));
364
+
365
+		// Then some time passes
366
+		sleep(2);
367
+
368
+		// Now we should get task1-2
369
+		$this->QueuedTasks->clearKey();
370
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
371
+		$this->assertSame('task1', $tmp['task']);
372
+		$this->assertSame($data2, unserialize($tmp['data']));
373
+
374
+		// and again rate limit to dummytask.
375
+		$this->QueuedTasks->clearKey();
376
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
377
+		$this->assertSame('dummytask', $tmp['task']);
378
+		$this->assertFalse(unserialize($tmp['data']));
379
+
380
+		// Then some more time passes
381
+		sleep(2);
382
+
383
+		// Now we should get task1-3
384
+		$this->QueuedTasks->clearKey();
385
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
386
+		$this->assertSame('task1', $tmp['task']);
387
+		$this->assertSame($data3, unserialize($tmp['data']));
388
+
389
+		// and again rate limit to dummytask.
390
+		$this->QueuedTasks->clearKey();
391
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
392
+		$this->assertSame('dummytask', $tmp['task']);
393
+		$this->assertFalse(unserialize($tmp['data']));
394
+
395
+		// and now the queue is empty
396
+		$this->QueuedTasks->clearKey();
397
+		$tmp = $this->QueuedTasks->requestJob($capabilities);
398
+		$this->assertNull($tmp);
399
+	}
400
+
401
+	/**
402
+	 *
403
+	 * @return void
404
+	 */
405
+	public function testIsQueued()
406
+	{
407
+		$result = $this->QueuedTasks->isQueued('foo-bar');
408
+		$this->assertFalse($result);
409
+
410
+		$queuedJob = $this->QueuedTasks->newEntity([
411
+			'key' => 'key',
412
+			'task' => 'FooBar'
413
+		]);
414
+		$this->QueuedTasks->saveOrFail($queuedJob);
415
+
416
+		$result = $this->QueuedTasks->isQueued('foo-bar');
417
+		$this->assertTrue($result);
418
+
419
+		$queuedJob->completed = new FrozenTime();
420
+		$this->QueuedTasks->saveOrFail($queuedJob);
421
+
422
+		$result = $this->QueuedTasks->isQueued('foo-bar');
423
+		$this->assertFalse($result);
424
+	}
425
+
426
+	/**
427
+	 * Helper method for skipping tests that need a real connection.
428
+	 *
429
+	 * @return void
430
+	 */
431
+	protected function _needsConnection()
432
+	{
433
+		$config = ConnectionManager::getConfig('test');
434
+		$this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
435
+	}
436 436
 }
Please login to merge, or discard this patch.
src/Model/Table/QueuedTasksTable.php 1 patch
Indentation   +590 added lines, -590 removed lines patch added patch discarded remove patch
@@ -28,594 +28,594 @@
 block discarded – undo
28 28
 class QueuedTasksTable extends Table
29 29
 {
30 30
 
31
-    const DRIVER_MYSQL = 'Mysql';
32
-
33
-    const DRIVER_POSTGRES = 'Postgres';
34
-
35
-    const DRIVER_SQLSERVER = 'Sqlserver';
36
-
37
-    const STATS_LIMIT = 100000;
38
-
39
-    /**
40
-     *
41
-     * @var string|null
42
-     */
43
-    protected $_key;
44
-
45
-    /**
46
-     * Initialize method
47
-     *
48
-     * @param array $config The configuration for the Table.
49
-     * @return void
50
-     */
51
-    public function initialize(array $config)
52
-    {
53
-        parent::initialize($config);
54
-
55
-        $this->setTable('queued_tasks');
56
-        $this->setDisplayField('id');
57
-        $this->setPrimaryKey('id');
58
-
59
-        $this->addBehavior('Timestamp');
60
-    }
61
-
62
-    /**
63
-     * set connection name
64
-     *
65
-     * @return string
66
-     */
67
-    public static function defaultConnectionName()
68
-    {
69
-        $connection = Configure::read('Queue.connection');
70
-        if (!empty($connection)) {
71
-            return $connection;
72
-        }
73
-
74
-        return parent::defaultConnectionName();
75
-    }
76
-
77
-    /**
78
-     *
79
-     * @param \Cake\Event\Event $event Model event
80
-     * @param \ArrayObject $data The data
81
-     * @param \ArrayObject $options The options
82
-     * @return void
83
-     */
84
-    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
85
-    {
86
-        if (isset($data['data']) && $data['data'] === '') {
87
-            $data['data'] = null;
88
-        }
89
-    }
90
-
91
-    /**
92
-     * Adds a new job to the queue.
93
-     *
94
-     * @param string $taskName Task name
95
-     * @param array|null $data Array of data
96
-     * @param string $notBefore A datetime which indicates when the job may be executed
97
-     * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
-     */
99
-    public function createJob($taskName, array $data = null, string $notBefore = null)
100
-    {
101
-        $task = [
102
-            'task' => $taskName,
103
-            'data' => serialize($data),
104
-            'not_before' => $this->getDateTime()
105
-        ];
106
-
107
-        if (!empty($notBefore)) {
108
-            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
-        }
110
-
111
-        $queuedTask = $this->newEntity($task);
112
-
113
-        return $this->saveOrFail($queuedTask);
114
-    }
115
-
116
-    /**
117
-     *
118
-     * @param string|null $taskName Task name
119
-     * @return bool
120
-     */
121
-    public function isQueued($taskName = null)
122
-    {
123
-        $conditions = [
124
-            'completed IS' => null
125
-        ];
126
-        if ($taskName) {
127
-            $conditions['task'] = $taskName;
128
-        }
129
-
130
-        return (bool)$this->find()
131
-            ->where($conditions)
132
-            ->select([
133
-                'id'
134
-            ])
135
-            ->first();
136
-    }
137
-
138
-    /**
139
-     * Returns the number of items in the queue.
140
-     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
141
-     *
142
-     * @param string|null $taskName Task type to Count
143
-     * @return int
144
-     */
145
-    public function getLength($taskName = null)
146
-    {
147
-        $findConf = [
148
-            'conditions' => [
149
-                'completed IS' => null
150
-            ]
151
-        ];
152
-        if ($taskName !== null) {
153
-            $findConf['conditions']['task'] = $taskName;
154
-        }
155
-
156
-        return $this->find('all', $findConf)->count();
157
-    }
158
-
159
-    /**
160
-     * Return a list of all task types in the Queue.
161
-     *
162
-     * @return \Cake\ORM\Query
163
-     */
164
-    public function getTypes()
165
-    {
166
-        $findCond = [
167
-            'fields' => [
168
-                'task'
169
-            ],
170
-            'group' => [
171
-                'task'
172
-            ],
173
-            'keyField' => 'task',
174
-            'valueField' => 'task'
175
-        ];
176
-
177
-        return $this->find('list', $findCond);
178
-    }
179
-
180
-    /**
181
-     * Return some statistics about finished jobs still in the Database.
182
-     * TO-DO: rewrite as virtual field
183
-     *
184
-     * @return \Cake\ORM\Query
185
-     */
186
-    public function getStats()
187
-    {
188
-        $driverName = $this->_getDriverName();
189
-        $options = [
190
-            'fields' => function (Query $query) use ($driverName) {
191
-                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
192
-                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
193
-                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
194
-                switch ($driverName) {
195
-                    case static::DRIVER_SQLSERVER:
196
-                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
197
-                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
198
-                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
199
-                        break;
200
-                }
201
-                /**
202
-                 *
203
-                 * @var \Cake\ORM\Query
204
-                 */
205
-                return [
206
-                    'task',
207
-                    'num' => $query->func()->count('*'),
208
-                    'alltime' => $alltime,
209
-                    'runtime' => $runtime,
210
-                    'fetchdelay' => $fetchdelay
211
-                ];
212
-            },
213
-            'conditions' => [
214
-                'completed IS NOT' => null
215
-            ],
216
-            'group' => [
217
-                'task'
218
-            ]
219
-        ];
220
-
221
-        return $this->find('all', $options);
222
-    }
223
-
224
-    /**
225
-     * Returns [
226
-     * 'Task' => [
227
-     * 'YYYY-MM-DD' => INT,
228
-     * ...
229
-     * ]
230
-     * ]
231
-     *
232
-     * @param string|null $taskName The task name
233
-     * @return array
234
-     */
235
-    public function getFullStats($taskName = null)
236
-    {
237
-        $driverName = $this->_getDriverName();
238
-        $fields = function (Query $query) use ($driverName) {
239
-            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
240
-            switch ($driverName) {
241
-                case static::DRIVER_SQLSERVER:
242
-                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
243
-                    break;
244
-            }
245
-
246
-            return [
247
-                'task',
248
-                'created',
249
-                'duration' => $runtime
250
-            ];
251
-        };
252
-
253
-        $conditions = [
254
-            'completed IS NOT' => null
255
-        ];
256
-        if ($taskName) {
257
-            $conditions['task'] = $taskName;
258
-        }
259
-
260
-        $tasks = $this->find()
261
-            ->select($fields)
262
-            ->where($conditions)
263
-            ->enableHydration(false)
264
-            ->orderDesc('id')
265
-            ->limit(static::STATS_LIMIT)
266
-            ->all()
267
-            ->toArray();
268
-
269
-        $result = [];
270
-
271
-        $days = [];
272
-
273
-        foreach ($tasks as $task) {
274
-            /** @var \DateTime $created */
275
-            $created = $task['created'];
276
-            $day = $created->format('Y-m-d');
277
-            if (!isset($days[$day])) {
278
-                $days[$day] = $day;
279
-            }
280
-
281
-            $result[$task['task']][$day][] = $task['duration'];
282
-        }
283
-
284
-        foreach ($result as $type => $tasks) {
285
-            foreach ($tasks as $day => $durations) {
286
-                $average = array_sum($durations) / count($durations);
287
-                $result[$type][$day] = (int)$average;
288
-            }
289
-
290
-            foreach ($days as $day) {
291
-                if (isset($result[$type][$day])) {
292
-                    continue;
293
-                }
294
-
295
-                $result[$type][$day] = 0;
296
-            }
297
-
298
-            ksort($result[$type]);
299
-        }
300
-
301
-        return $result;
302
-    }
303
-
304
-    /**
305
-     * Look for a new job that can be processed with the current abilities and
306
-     * from the specified group (or any if null).
307
-     *
308
-     * @param array $capabilities Available QueueWorkerTasks.
309
-     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
310
-     * @return \Queue\Model\Entity\QueuedTask|null
311
-     */
312
-    public function requestJob(array $capabilities, array $types = [])
313
-    {
314
-        $now = $this->getDateTime();
315
-        $nowStr = $now->toDateTimeString();
316
-        $driverName = $this->_getDriverName();
317
-
318
-        $query = $this->find();
319
-        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
320
-        switch ($driverName) {
321
-            case static::DRIVER_SQLSERVER:
322
-                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
323
-                break;
324
-            case static::DRIVER_POSTGRES:
325
-                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
326
-                break;
327
-        }
328
-        $options = [
329
-            'conditions' => [
330
-                'completed IS' => null,
331
-                'OR' => []
332
-            ],
333
-            'fields' => [
334
-                'age' => $age
335
-            ],
336
-            'order' => [
337
-                'age' => 'ASC',
338
-                'id' => 'ASC'
339
-            ]
340
-        ];
341
-
342
-        if ($types) {
343
-            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
344
-        }
345
-
346
-        // Generate the task specific conditions.
347
-        foreach ($capabilities as $task) {
348
-            list ($plugin, $name) = pluginSplit($task['name']);
349
-            $timeoutAt = $now->copy();
350
-            $tmp = [
351
-                'task' => $name,
352
-                'AND' => [
353
-                    [
354
-                        'OR' => [
355
-                            'not_before <' => $nowStr,
356
-                            'not_before IS' => null
357
-                        ]
358
-                    ],
359
-                    [
360
-                        'OR' => [
361
-                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
362
-                            'fetched IS' => null
363
-                        ]
364
-                    ]
365
-                ],
366
-                'failed_count <' => ($task['retries'] + 1)
367
-            ];
368
-            $options['conditions']['OR'][] = $tmp;
369
-        }
370
-
371
-        /** @var \Queue\Model\Entity\QueuedTask|null $task */
372
-        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
373
-            $task = $query->find('all', $options)
374
-                ->enableAutoFields(true)
375
-                ->epilog('FOR UPDATE')
376
-                ->first();
377
-
378
-            if (!$task) {
379
-                return null;
380
-            }
381
-
382
-            $key = $this->key();
383
-            $task = $this->patchEntity($task, [
384
-                'worker_key' => $key,
385
-                'fetched' => $now
386
-            ]);
387
-
388
-            return $this->saveOrFail($task);
389
-        });
390
-
391
-        if (!$task) {
392
-            return null;
393
-        }
394
-
395
-        return $task;
396
-    }
397
-
398
-    /**
399
-     * Mark a task as Completed, removing it from the queue.
400
-     *
401
-     * @param \Queue\Model\Entity\QueuedTask $task Task
402
-     * @return bool Success
403
-     */
404
-    public function markJobDone(QueuedTask $task)
405
-    {
406
-        $fields = [
407
-            'completed' => $this->getDateTime()
408
-        ];
409
-        $task = $this->patchEntity($task, $fields);
410
-
411
-        return (bool)$this->save($task);
412
-    }
413
-
414
-    /**
415
-     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
416
-     *
417
-     * @param \Queue\Model\Entity\QueuedTask $task Task
418
-     * @param string|null $failureMessage Optional message to append to the failure_message field.
419
-     * @return bool Success
420
-     */
421
-    public function markJobFailed(QueuedTask $task, $failureMessage = null)
422
-    {
423
-        $fields = [
424
-            'failed_count' => $task->failed_count + 1,
425
-            'failure_message' => $failureMessage
426
-        ];
427
-        $task = $this->patchEntity($task, $fields);
428
-
429
-        return (bool)$this->save($task);
430
-    }
431
-
432
-    /**
433
-     * Reset current jobs
434
-     *
435
-     * @param int|null $id ID
436
-     *
437
-     * @return int Success
438
-     */
439
-    public function reset($id = null)
440
-    {
441
-        $fields = [
442
-            'completed' => null,
443
-            'fetched' => null,
444
-            'failed_count' => 0,
445
-            'worker_key' => null,
446
-            'failure_message' => null
447
-        ];
448
-        $conditions = [
449
-            'completed IS' => null
450
-        ];
451
-        if ($id) {
452
-            $conditions['id'] = $id;
453
-        }
454
-
455
-        return $this->updateAll($fields, $conditions);
456
-    }
457
-
458
-    /**
459
-     *
460
-     * @param string $taskName Task name
461
-     *
462
-     * @return int
463
-     */
464
-    public function rerun($taskName)
465
-    {
466
-        $fields = [
467
-            'completed' => null,
468
-            'fetched' => null,
469
-            'failed_count' => 0,
470
-            'worker_key' => null,
471
-            'failure_message' => null
472
-        ];
473
-        $conditions = [
474
-            'completed IS NOT' => null,
475
-            'task' => $taskName
476
-        ];
477
-
478
-        return $this->updateAll($fields, $conditions);
479
-    }
480
-
481
-    /**
482
-     * Cleanup/Delete Completed Tasks.
483
-     *
484
-     * @return void
485
-     */
486
-    public function cleanOldJobs()
487
-    {
488
-        if (!Configure::read('Queue.cleanuptimeout')) {
489
-            return;
490
-        }
491
-
492
-        $this->deleteAll([
493
-            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
494
-        ]);
495
-    }
496
-
497
-    /**
498
-     *
499
-     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
500
-     * @param array $taskConfiguration Task configuration
501
-     * @return string
502
-     */
503
-    public function getFailedStatus($queuedTask, array $taskConfiguration)
504
-    {
505
-        $failureMessageRequeued = 'requeued';
506
-
507
-        $queuedTaskName = 'Queue' . $queuedTask->task;
508
-        if (empty($taskConfiguration[$queuedTaskName])) {
509
-            return $failureMessageRequeued;
510
-        }
511
-        $retries = $taskConfiguration[$queuedTaskName]['retries'];
512
-        if ($queuedTask->failed_count <= $retries) {
513
-            return $failureMessageRequeued;
514
-        }
515
-
516
-        return 'aborted';
517
-    }
518
-
519
-    /**
520
-     * Generates a unique Identifier for the current worker thread.
521
-     *
522
-     * Useful to identify the currently running processes for this thread.
523
-     *
524
-     * @return string Identifier
525
-     */
526
-    public function key()
527
-    {
528
-        if ($this->_key !== null) {
529
-            return $this->_key;
530
-        }
531
-        $this->_key = sha1(microtime());
532
-        if (!$this->_key) {
533
-            throw new RuntimeException('Invalid key generated');
534
-        }
535
-
536
-        return $this->_key;
537
-    }
538
-
539
-    /**
540
-     * Resets worker Identifier
541
-     *
542
-     * @return void
543
-     */
544
-    public function clearKey()
545
-    {
546
-        $this->_key = null;
547
-    }
548
-
549
-    /**
550
-     * truncate()
551
-     *
552
-     * @return void
553
-     */
554
-    public function truncate()
555
-    {
556
-        $sql = $this->getSchema()->truncateSql($this->_connection);
557
-        foreach ($sql as $snippet) {
558
-            $this->_connection->execute($snippet);
559
-        }
560
-    }
561
-
562
-    /**
563
-     * get the name of the driver
564
-     *
565
-     * @return string
566
-     */
567
-    protected function _getDriverName()
568
-    {
569
-        $className = explode('\\', $this->getConnection()->config()['driver']);
570
-        $name = end($className);
571
-
572
-        return $name;
573
-    }
574
-
575
-    /**
576
-     *
577
-     * @param array $conditions Conditions
578
-     * @param string $key Key
579
-     * @param array $values Values
580
-     * @return array
581
-     */
582
-    protected function addFilter(array $conditions, $key, array $values)
583
-    {
584
-        $include = [];
585
-        $exclude = [];
586
-        foreach ($values as $value) {
587
-            if (substr($value, 0, 1) === '-') {
588
-                $exclude[] = substr($value, 1);
589
-            } else {
590
-                $include[] = $value;
591
-            }
592
-        }
593
-
594
-        if ($include) {
595
-            $conditions[$key . ' IN'] = $include;
596
-        }
597
-        if ($exclude) {
598
-            $conditions[$key . ' NOT IN'] = $exclude;
599
-        }
600
-
601
-        return $conditions;
602
-    }
603
-
604
-    /**
605
-     * Returns a DateTime object from different input.
606
-     *
607
-     * Without argument this will be "now".
608
-     *
609
-     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
610
-     *
611
-     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
612
-     */
613
-    protected function getDateTime($notBefore = null)
614
-    {
615
-        if (is_object($notBefore)) {
616
-            return $notBefore;
617
-        }
618
-
619
-        return new FrozenTime($notBefore);
620
-    }
31
+	const DRIVER_MYSQL = 'Mysql';
32
+
33
+	const DRIVER_POSTGRES = 'Postgres';
34
+
35
+	const DRIVER_SQLSERVER = 'Sqlserver';
36
+
37
+	const STATS_LIMIT = 100000;
38
+
39
+	/**
40
+	 *
41
+	 * @var string|null
42
+	 */
43
+	protected $_key;
44
+
45
+	/**
46
+	 * Initialize method
47
+	 *
48
+	 * @param array $config The configuration for the Table.
49
+	 * @return void
50
+	 */
51
+	public function initialize(array $config)
52
+	{
53
+		parent::initialize($config);
54
+
55
+		$this->setTable('queued_tasks');
56
+		$this->setDisplayField('id');
57
+		$this->setPrimaryKey('id');
58
+
59
+		$this->addBehavior('Timestamp');
60
+	}
61
+
62
+	/**
63
+	 * set connection name
64
+	 *
65
+	 * @return string
66
+	 */
67
+	public static function defaultConnectionName()
68
+	{
69
+		$connection = Configure::read('Queue.connection');
70
+		if (!empty($connection)) {
71
+			return $connection;
72
+		}
73
+
74
+		return parent::defaultConnectionName();
75
+	}
76
+
77
+	/**
78
+	 *
79
+	 * @param \Cake\Event\Event $event Model event
80
+	 * @param \ArrayObject $data The data
81
+	 * @param \ArrayObject $options The options
82
+	 * @return void
83
+	 */
84
+	public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
85
+	{
86
+		if (isset($data['data']) && $data['data'] === '') {
87
+			$data['data'] = null;
88
+		}
89
+	}
90
+
91
+	/**
92
+	 * Adds a new job to the queue.
93
+	 *
94
+	 * @param string $taskName Task name
95
+	 * @param array|null $data Array of data
96
+	 * @param string $notBefore A datetime which indicates when the job may be executed
97
+	 * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
+	 */
99
+	public function createJob($taskName, array $data = null, string $notBefore = null)
100
+	{
101
+		$task = [
102
+			'task' => $taskName,
103
+			'data' => serialize($data),
104
+			'not_before' => $this->getDateTime()
105
+		];
106
+
107
+		if (!empty($notBefore)) {
108
+			$task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
+		}
110
+
111
+		$queuedTask = $this->newEntity($task);
112
+
113
+		return $this->saveOrFail($queuedTask);
114
+	}
115
+
116
+	/**
117
+	 *
118
+	 * @param string|null $taskName Task name
119
+	 * @return bool
120
+	 */
121
+	public function isQueued($taskName = null)
122
+	{
123
+		$conditions = [
124
+			'completed IS' => null
125
+		];
126
+		if ($taskName) {
127
+			$conditions['task'] = $taskName;
128
+		}
129
+
130
+		return (bool)$this->find()
131
+			->where($conditions)
132
+			->select([
133
+				'id'
134
+			])
135
+			->first();
136
+	}
137
+
138
+	/**
139
+	 * Returns the number of items in the queue.
140
+	 * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
141
+	 *
142
+	 * @param string|null $taskName Task type to Count
143
+	 * @return int
144
+	 */
145
+	public function getLength($taskName = null)
146
+	{
147
+		$findConf = [
148
+			'conditions' => [
149
+				'completed IS' => null
150
+			]
151
+		];
152
+		if ($taskName !== null) {
153
+			$findConf['conditions']['task'] = $taskName;
154
+		}
155
+
156
+		return $this->find('all', $findConf)->count();
157
+	}
158
+
159
+	/**
160
+	 * Return a list of all task types in the Queue.
161
+	 *
162
+	 * @return \Cake\ORM\Query
163
+	 */
164
+	public function getTypes()
165
+	{
166
+		$findCond = [
167
+			'fields' => [
168
+				'task'
169
+			],
170
+			'group' => [
171
+				'task'
172
+			],
173
+			'keyField' => 'task',
174
+			'valueField' => 'task'
175
+		];
176
+
177
+		return $this->find('list', $findCond);
178
+	}
179
+
180
+	/**
181
+	 * Return some statistics about finished jobs still in the Database.
182
+	 * TO-DO: rewrite as virtual field
183
+	 *
184
+	 * @return \Cake\ORM\Query
185
+	 */
186
+	public function getStats()
187
+	{
188
+		$driverName = $this->_getDriverName();
189
+		$options = [
190
+			'fields' => function (Query $query) use ($driverName) {
191
+				$alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
192
+				$runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
193
+				$fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
194
+				switch ($driverName) {
195
+					case static::DRIVER_SQLSERVER:
196
+						$alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
197
+						$runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
198
+						$fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
199
+						break;
200
+				}
201
+				/**
202
+				 *
203
+				 * @var \Cake\ORM\Query
204
+				 */
205
+				return [
206
+					'task',
207
+					'num' => $query->func()->count('*'),
208
+					'alltime' => $alltime,
209
+					'runtime' => $runtime,
210
+					'fetchdelay' => $fetchdelay
211
+				];
212
+			},
213
+			'conditions' => [
214
+				'completed IS NOT' => null
215
+			],
216
+			'group' => [
217
+				'task'
218
+			]
219
+		];
220
+
221
+		return $this->find('all', $options);
222
+	}
223
+
224
+	/**
225
+	 * Returns [
226
+	 * 'Task' => [
227
+	 * 'YYYY-MM-DD' => INT,
228
+	 * ...
229
+	 * ]
230
+	 * ]
231
+	 *
232
+	 * @param string|null $taskName The task name
233
+	 * @return array
234
+	 */
235
+	public function getFullStats($taskName = null)
236
+	{
237
+		$driverName = $this->_getDriverName();
238
+		$fields = function (Query $query) use ($driverName) {
239
+			$runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
240
+			switch ($driverName) {
241
+				case static::DRIVER_SQLSERVER:
242
+					$runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
243
+					break;
244
+			}
245
+
246
+			return [
247
+				'task',
248
+				'created',
249
+				'duration' => $runtime
250
+			];
251
+		};
252
+
253
+		$conditions = [
254
+			'completed IS NOT' => null
255
+		];
256
+		if ($taskName) {
257
+			$conditions['task'] = $taskName;
258
+		}
259
+
260
+		$tasks = $this->find()
261
+			->select($fields)
262
+			->where($conditions)
263
+			->enableHydration(false)
264
+			->orderDesc('id')
265
+			->limit(static::STATS_LIMIT)
266
+			->all()
267
+			->toArray();
268
+
269
+		$result = [];
270
+
271
+		$days = [];
272
+
273
+		foreach ($tasks as $task) {
274
+			/** @var \DateTime $created */
275
+			$created = $task['created'];
276
+			$day = $created->format('Y-m-d');
277
+			if (!isset($days[$day])) {
278
+				$days[$day] = $day;
279
+			}
280
+
281
+			$result[$task['task']][$day][] = $task['duration'];
282
+		}
283
+
284
+		foreach ($result as $type => $tasks) {
285
+			foreach ($tasks as $day => $durations) {
286
+				$average = array_sum($durations) / count($durations);
287
+				$result[$type][$day] = (int)$average;
288
+			}
289
+
290
+			foreach ($days as $day) {
291
+				if (isset($result[$type][$day])) {
292
+					continue;
293
+				}
294
+
295
+				$result[$type][$day] = 0;
296
+			}
297
+
298
+			ksort($result[$type]);
299
+		}
300
+
301
+		return $result;
302
+	}
303
+
304
+	/**
305
+	 * Look for a new job that can be processed with the current abilities and
306
+	 * from the specified group (or any if null).
307
+	 *
308
+	 * @param array $capabilities Available QueueWorkerTasks.
309
+	 * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
310
+	 * @return \Queue\Model\Entity\QueuedTask|null
311
+	 */
312
+	public function requestJob(array $capabilities, array $types = [])
313
+	{
314
+		$now = $this->getDateTime();
315
+		$nowStr = $now->toDateTimeString();
316
+		$driverName = $this->_getDriverName();
317
+
318
+		$query = $this->find();
319
+		$age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
320
+		switch ($driverName) {
321
+			case static::DRIVER_SQLSERVER:
322
+				$age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
323
+				break;
324
+			case static::DRIVER_POSTGRES:
325
+				$age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
326
+				break;
327
+		}
328
+		$options = [
329
+			'conditions' => [
330
+				'completed IS' => null,
331
+				'OR' => []
332
+			],
333
+			'fields' => [
334
+				'age' => $age
335
+			],
336
+			'order' => [
337
+				'age' => 'ASC',
338
+				'id' => 'ASC'
339
+			]
340
+		];
341
+
342
+		if ($types) {
343
+			$options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
344
+		}
345
+
346
+		// Generate the task specific conditions.
347
+		foreach ($capabilities as $task) {
348
+			list ($plugin, $name) = pluginSplit($task['name']);
349
+			$timeoutAt = $now->copy();
350
+			$tmp = [
351
+				'task' => $name,
352
+				'AND' => [
353
+					[
354
+						'OR' => [
355
+							'not_before <' => $nowStr,
356
+							'not_before IS' => null
357
+						]
358
+					],
359
+					[
360
+						'OR' => [
361
+							'fetched <' => $timeoutAt->subSeconds($task['timeout']),
362
+							'fetched IS' => null
363
+						]
364
+					]
365
+				],
366
+				'failed_count <' => ($task['retries'] + 1)
367
+			];
368
+			$options['conditions']['OR'][] = $tmp;
369
+		}
370
+
371
+		/** @var \Queue\Model\Entity\QueuedTask|null $task */
372
+		$task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
373
+			$task = $query->find('all', $options)
374
+				->enableAutoFields(true)
375
+				->epilog('FOR UPDATE')
376
+				->first();
377
+
378
+			if (!$task) {
379
+				return null;
380
+			}
381
+
382
+			$key = $this->key();
383
+			$task = $this->patchEntity($task, [
384
+				'worker_key' => $key,
385
+				'fetched' => $now
386
+			]);
387
+
388
+			return $this->saveOrFail($task);
389
+		});
390
+
391
+		if (!$task) {
392
+			return null;
393
+		}
394
+
395
+		return $task;
396
+	}
397
+
398
+	/**
399
+	 * Mark a task as Completed, removing it from the queue.
400
+	 *
401
+	 * @param \Queue\Model\Entity\QueuedTask $task Task
402
+	 * @return bool Success
403
+	 */
404
+	public function markJobDone(QueuedTask $task)
405
+	{
406
+		$fields = [
407
+			'completed' => $this->getDateTime()
408
+		];
409
+		$task = $this->patchEntity($task, $fields);
410
+
411
+		return (bool)$this->save($task);
412
+	}
413
+
414
+	/**
415
+	 * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
416
+	 *
417
+	 * @param \Queue\Model\Entity\QueuedTask $task Task
418
+	 * @param string|null $failureMessage Optional message to append to the failure_message field.
419
+	 * @return bool Success
420
+	 */
421
+	public function markJobFailed(QueuedTask $task, $failureMessage = null)
422
+	{
423
+		$fields = [
424
+			'failed_count' => $task->failed_count + 1,
425
+			'failure_message' => $failureMessage
426
+		];
427
+		$task = $this->patchEntity($task, $fields);
428
+
429
+		return (bool)$this->save($task);
430
+	}
431
+
432
+	/**
433
+	 * Reset current jobs
434
+	 *
435
+	 * @param int|null $id ID
436
+	 *
437
+	 * @return int Success
438
+	 */
439
+	public function reset($id = null)
440
+	{
441
+		$fields = [
442
+			'completed' => null,
443
+			'fetched' => null,
444
+			'failed_count' => 0,
445
+			'worker_key' => null,
446
+			'failure_message' => null
447
+		];
448
+		$conditions = [
449
+			'completed IS' => null
450
+		];
451
+		if ($id) {
452
+			$conditions['id'] = $id;
453
+		}
454
+
455
+		return $this->updateAll($fields, $conditions);
456
+	}
457
+
458
+	/**
459
+	 *
460
+	 * @param string $taskName Task name
461
+	 *
462
+	 * @return int
463
+	 */
464
+	public function rerun($taskName)
465
+	{
466
+		$fields = [
467
+			'completed' => null,
468
+			'fetched' => null,
469
+			'failed_count' => 0,
470
+			'worker_key' => null,
471
+			'failure_message' => null
472
+		];
473
+		$conditions = [
474
+			'completed IS NOT' => null,
475
+			'task' => $taskName
476
+		];
477
+
478
+		return $this->updateAll($fields, $conditions);
479
+	}
480
+
481
+	/**
482
+	 * Cleanup/Delete Completed Tasks.
483
+	 *
484
+	 * @return void
485
+	 */
486
+	public function cleanOldJobs()
487
+	{
488
+		if (!Configure::read('Queue.cleanuptimeout')) {
489
+			return;
490
+		}
491
+
492
+		$this->deleteAll([
493
+			'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
494
+		]);
495
+	}
496
+
497
+	/**
498
+	 *
499
+	 * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
500
+	 * @param array $taskConfiguration Task configuration
501
+	 * @return string
502
+	 */
503
+	public function getFailedStatus($queuedTask, array $taskConfiguration)
504
+	{
505
+		$failureMessageRequeued = 'requeued';
506
+
507
+		$queuedTaskName = 'Queue' . $queuedTask->task;
508
+		if (empty($taskConfiguration[$queuedTaskName])) {
509
+			return $failureMessageRequeued;
510
+		}
511
+		$retries = $taskConfiguration[$queuedTaskName]['retries'];
512
+		if ($queuedTask->failed_count <= $retries) {
513
+			return $failureMessageRequeued;
514
+		}
515
+
516
+		return 'aborted';
517
+	}
518
+
519
+	/**
520
+	 * Generates a unique Identifier for the current worker thread.
521
+	 *
522
+	 * Useful to identify the currently running processes for this thread.
523
+	 *
524
+	 * @return string Identifier
525
+	 */
526
+	public function key()
527
+	{
528
+		if ($this->_key !== null) {
529
+			return $this->_key;
530
+		}
531
+		$this->_key = sha1(microtime());
532
+		if (!$this->_key) {
533
+			throw new RuntimeException('Invalid key generated');
534
+		}
535
+
536
+		return $this->_key;
537
+	}
538
+
539
+	/**
540
+	 * Resets worker Identifier
541
+	 *
542
+	 * @return void
543
+	 */
544
+	public function clearKey()
545
+	{
546
+		$this->_key = null;
547
+	}
548
+
549
+	/**
550
+	 * truncate()
551
+	 *
552
+	 * @return void
553
+	 */
554
+	public function truncate()
555
+	{
556
+		$sql = $this->getSchema()->truncateSql($this->_connection);
557
+		foreach ($sql as $snippet) {
558
+			$this->_connection->execute($snippet);
559
+		}
560
+	}
561
+
562
+	/**
563
+	 * get the name of the driver
564
+	 *
565
+	 * @return string
566
+	 */
567
+	protected function _getDriverName()
568
+	{
569
+		$className = explode('\\', $this->getConnection()->config()['driver']);
570
+		$name = end($className);
571
+
572
+		return $name;
573
+	}
574
+
575
+	/**
576
+	 *
577
+	 * @param array $conditions Conditions
578
+	 * @param string $key Key
579
+	 * @param array $values Values
580
+	 * @return array
581
+	 */
582
+	protected function addFilter(array $conditions, $key, array $values)
583
+	{
584
+		$include = [];
585
+		$exclude = [];
586
+		foreach ($values as $value) {
587
+			if (substr($value, 0, 1) === '-') {
588
+				$exclude[] = substr($value, 1);
589
+			} else {
590
+				$include[] = $value;
591
+			}
592
+		}
593
+
594
+		if ($include) {
595
+			$conditions[$key . ' IN'] = $include;
596
+		}
597
+		if ($exclude) {
598
+			$conditions[$key . ' NOT IN'] = $exclude;
599
+		}
600
+
601
+		return $conditions;
602
+	}
603
+
604
+	/**
605
+	 * Returns a DateTime object from different input.
606
+	 *
607
+	 * Without argument this will be "now".
608
+	 *
609
+	 * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
610
+	 *
611
+	 * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
612
+	 */
613
+	protected function getDateTime($notBefore = null)
614
+	{
615
+		if (is_object($notBefore)) {
616
+			return $notBefore;
617
+		}
618
+
619
+		return new FrozenTime($notBefore);
620
+	}
621 621
 }
Please login to merge, or discard this patch.