Completed
Pull Request — master (#35)
by
unknown
03:37
created
src/Queue/Config.php 1 patch
Indentation   +56 added lines, -56 removed lines patch added patch discarded remove patch
@@ -6,66 +6,66 @@
 block discarded – undo
6 6
 class Config
7 7
 {
8 8
 
9
-    /**
10
-     *
11
-     * @return int
12
-     */
13
-    public static function defaultWorkerTimeout()
14
-    {
15
-        return Configure::read('Queue.defaultWorkerTimeout', 600); // 10min
16
-    }
9
+	/**
10
+	 *
11
+	 * @return int
12
+	 */
13
+	public static function defaultWorkerTimeout()
14
+	{
15
+		return Configure::read('Queue.defaultWorkerTimeout', 600); // 10min
16
+	}
17 17
 
18
-    /**
19
-     *
20
-     * @return int
21
-     */
22
-    public static function workerMaxRuntime()
23
-    {
24
-        return Configure::read('Queue.workerMaxRuntime', 120);
25
-    }
18
+	/**
19
+	 *
20
+	 * @return int
21
+	 */
22
+	public static function workerMaxRuntime()
23
+	{
24
+		return Configure::read('Queue.workerMaxRuntime', 120);
25
+	}
26 26
 
27
-    /**
28
-     *
29
-     * @return int
30
-     */
31
-    public static function cleanupTimeout()
32
-    {
33
-        return Configure::read('Queue.cleanupTimeout', 2592000); // 30 days
34
-    }
27
+	/**
28
+	 *
29
+	 * @return int
30
+	 */
31
+	public static function cleanupTimeout()
32
+	{
33
+		return Configure::read('Queue.cleanupTimeout', 2592000); // 30 days
34
+	}
35 35
 
36
-    /**
37
-     *
38
-     * @return int
39
-     */
40
-    public static function sleepTime()
41
-    {
42
-        return Configure::read('Queue.sleepTime', 10);
43
-    }
36
+	/**
37
+	 *
38
+	 * @return int
39
+	 */
40
+	public static function sleepTime()
41
+	{
42
+		return Configure::read('Queue.sleepTime', 10);
43
+	}
44 44
 
45
-    /**
46
-     *
47
-     * @return int
48
-     */
49
-    public static function gcprob()
50
-    {
51
-        return Configure::read('Queue.gcprob', 10);
52
-    }
45
+	/**
46
+	 *
47
+	 * @return int
48
+	 */
49
+	public static function gcprob()
50
+	{
51
+		return Configure::read('Queue.gcprob', 10);
52
+	}
53 53
 
54
-    /**
55
-     *
56
-     * @return int
57
-     */
58
-    public static function defaultWorkerRetries()
59
-    {
60
-        return Configure::read('Queue.defaultWorkerRetries', 1);
61
-    }
54
+	/**
55
+	 *
56
+	 * @return int
57
+	 */
58
+	public static function defaultWorkerRetries()
59
+	{
60
+		return Configure::read('Queue.defaultWorkerRetries', 1);
61
+	}
62 62
 
63
-    /**
64
-     *
65
-     * @return int
66
-     */
67
-    public static function maxWorkers()
68
-    {
69
-        return Configure::read('Queue.maxWorkers', 1);
70
-    }
63
+	/**
64
+	 *
65
+	 * @return int
66
+	 */
67
+	public static function maxWorkers()
68
+	{
69
+		return Configure::read('Queue.maxWorkers', 1);
70
+	}
71 71
 }
Please login to merge, or discard this patch.
src/Shell/Task/AddInterface.php 1 patch
Indentation   +8 added lines, -8 removed lines patch added patch discarded remove patch
@@ -10,12 +10,12 @@
 block discarded – undo
10 10
 interface AddInterface
11 11
 {
12 12
 
13
-    /**
14
-     * Allows adding a task to the queue.
15
-     *
16
-     * Will create one example task in the queue, which later will be executed using run().
17
-     *
18
-     * @return void
19
-     */
20
-    public function add(): void;
13
+	/**
14
+	 * Allows adding a task to the queue.
15
+	 *
16
+	 * Will create one example task in the queue, which later will be executed using run().
17
+	 *
18
+	 * @return void
19
+	 */
20
+	public function add(): void;
21 21
 }
Please login to merge, or discard this patch.
src/Model/Entity/QueuedTask.php 1 patch
Indentation   +17 added lines, -17 removed lines patch added patch discarded remove patch
@@ -20,21 +20,21 @@
 block discarded – undo
20 20
 class QueuedTask extends Entity
21 21
 {
22 22
 
23
-    /**
24
-     *
25
-     * {@inheritdoc}
26
-     *
27
-     * @var array
28
-     */
29
-    protected $_accessible = [
30
-        'task' => true,
31
-        'data' => true,
32
-        'not_before' => true,
33
-        'fetched' => true,
34
-        'completed' => true,
35
-        'failed_count' => true,
36
-        'failure_message' => true,
37
-        'worker_key' => true,
38
-        'created' => true
39
-    ];
23
+	/**
24
+	 *
25
+	 * {@inheritdoc}
26
+	 *
27
+	 * @var array
28
+	 */
29
+	protected $_accessible = [
30
+		'task' => true,
31
+		'data' => true,
32
+		'not_before' => true,
33
+		'fetched' => true,
34
+		'completed' => true,
35
+		'failed_count' => true,
36
+		'failure_message' => true,
37
+		'worker_key' => true,
38
+		'created' => true
39
+	];
40 40
 }
Please login to merge, or discard this patch.
tests/config/app_queue.php 1 patch
Indentation   +10 added lines, -10 removed lines patch added patch discarded remove patch
@@ -1,18 +1,18 @@
 block discarded – undo
1 1
 <?php
2 2
 
3 3
 return [
4
-    'Queue' => [
5
-        // time (in seconds) after which a job is requeued if the worker doesn't report back
6
-        'defaultWorkerTimeout' => 1800,
4
+	'Queue' => [
5
+		// time (in seconds) after which a job is requeued if the worker doesn't report back
6
+		'defaultWorkerTimeout' => 1800,
7 7
 
8
-        // seconds of running time after which the worker will terminate (0 = unlimited)
9
-        'workerMaxRuntime' => 120,
8
+		// seconds of running time after which the worker will terminate (0 = unlimited)
9
+		'workerMaxRuntime' => 120,
10 10
 
11
-        // minimum time (in seconds) which a task remains in the database before being cleaned up.
12
-        'cleanupTimeout' => 2592000, // 30 days
11
+		// minimum time (in seconds) which a task remains in the database before being cleaned up.
12
+		'cleanupTimeout' => 2592000, // 30 days
13 13
 
14
-        /* Optional */
14
+		/* Optional */
15 15
 
16
-        'isSearchEnabled' => true,
17
-    ],
16
+		'isSearchEnabled' => true,
17
+	],
18 18
 ];
Please login to merge, or discard this patch.
src/Queue/TaskFinder.php 1 patch
Indentation   +68 added lines, -68 removed lines patch added patch discarded remove patch
@@ -8,80 +8,80 @@
 block discarded – undo
8 8
 class TaskFinder
9 9
 {
10 10
 
11
-    /**
12
-     *
13
-     * @var array|null
14
-     */
15
-    protected $tasks;
11
+	/**
12
+	 *
13
+	 * @var array|null
14
+	 */
15
+	protected $tasks;
16 16
 
17
-    /**
18
-     * Returns all possible Queue tasks.
19
-     *
20
-     * Makes sure that app tasks are prioritized over plugin ones.
21
-     *
22
-     * @return array
23
-     */
24
-    public function allAppAndPluginTasks()
25
-    {
26
-        if ($this->tasks !== null) {
27
-            return $this->tasks;
28
-        }
17
+	/**
18
+	 * Returns all possible Queue tasks.
19
+	 *
20
+	 * Makes sure that app tasks are prioritized over plugin ones.
21
+	 *
22
+	 * @return array
23
+	 */
24
+	public function allAppAndPluginTasks()
25
+	{
26
+		if ($this->tasks !== null) {
27
+			return $this->tasks;
28
+		}
29 29
 
30
-        $paths = App::path('Shell/Task');
31
-        $this->tasks = [];
30
+		$paths = App::path('Shell/Task');
31
+		$this->tasks = [];
32 32
 
33
-        foreach ($paths as $path) {
34
-            $Folder = new Folder($path);
35
-            $this->tasks = $this->getAppPaths($Folder);
36
-        }
37
-        $plugins = Plugin::loaded();
38
-        foreach ($plugins as $plugin) {
39
-            $pluginPaths = App::path('Shell/Task', $plugin);
40
-            foreach ($pluginPaths as $pluginPath) {
41
-                $Folder = new Folder($pluginPath);
42
-                $pluginTasks = $this->getPluginPaths($Folder, $plugin);
43
-                $this->tasks = array_merge($this->tasks, $pluginTasks);
44
-            }
45
-        }
33
+		foreach ($paths as $path) {
34
+			$Folder = new Folder($path);
35
+			$this->tasks = $this->getAppPaths($Folder);
36
+		}
37
+		$plugins = Plugin::loaded();
38
+		foreach ($plugins as $plugin) {
39
+			$pluginPaths = App::path('Shell/Task', $plugin);
40
+			foreach ($pluginPaths as $pluginPath) {
41
+				$Folder = new Folder($pluginPath);
42
+				$pluginTasks = $this->getPluginPaths($Folder, $plugin);
43
+				$this->tasks = array_merge($this->tasks, $pluginTasks);
44
+			}
45
+		}
46 46
 
47
-        return $this->tasks;
48
-    }
47
+		return $this->tasks;
48
+	}
49 49
 
50
-    /**
51
-     *
52
-     * @param \Cake\Filesystem\Folder $Folder The directory
53
-     *
54
-     * @return array
55
-     */
56
-    protected function getAppPaths(Folder $Folder)
57
-    {
58
-        $res = array_merge($this->tasks, $Folder->find('Queue.+\.php'));
59
-        foreach ($res as &$r) {
60
-            $r = basename($r, 'Task.php');
61
-        }
50
+	/**
51
+	 *
52
+	 * @param \Cake\Filesystem\Folder $Folder The directory
53
+	 *
54
+	 * @return array
55
+	 */
56
+	protected function getAppPaths(Folder $Folder)
57
+	{
58
+		$res = array_merge($this->tasks, $Folder->find('Queue.+\.php'));
59
+		foreach ($res as &$r) {
60
+			$r = basename($r, 'Task.php');
61
+		}
62 62
 
63
-        return $res;
64
-    }
63
+		return $res;
64
+	}
65 65
 
66
-    /**
67
-     *
68
-     * @param \Cake\Filesystem\Folder $Folder The directory
69
-     * @param string $plugin The plugin name
70
-     *
71
-     * @return array
72
-     */
73
-    protected function getPluginPaths(Folder $Folder, $plugin)
74
-    {
75
-        $res = $Folder->find('Queue.+Task\.php');
76
-        foreach ($res as $key => $r) {
77
-            $name = basename($r, 'Task.php');
78
-            if (in_array($name, $this->tasks)) {
79
-                unset($res[$key]);
80
-                continue;
81
-            }
82
-            $res[$key] = $plugin . '.' . $name;
83
-        }
66
+	/**
67
+	 *
68
+	 * @param \Cake\Filesystem\Folder $Folder The directory
69
+	 * @param string $plugin The plugin name
70
+	 *
71
+	 * @return array
72
+	 */
73
+	protected function getPluginPaths(Folder $Folder, $plugin)
74
+	{
75
+		$res = $Folder->find('Queue.+Task\.php');
76
+		foreach ($res as $key => $r) {
77
+			$name = basename($r, 'Task.php');
78
+			if (in_array($name, $this->tasks)) {
79
+				unset($res[$key]);
80
+				continue;
81
+			}
82
+			$res[$key] = $plugin . '.' . $name;
83
+		}
84 84
 
85
-        return $res;
86
-    }
85
+		return $res;
86
+	}
87 87
 }
Please login to merge, or discard this patch.
src/Shell/Task/QueueTask.php 1 patch
Indentation   +45 added lines, -45 removed lines patch added patch discarded remove patch
@@ -18,57 +18,57 @@
 block discarded – undo
18 18
 abstract class QueueTask extends Shell implements QueueTaskInterface
19 19
 {
20 20
 
21
-    /**
22
-     *
23
-     * @var string
24
-     */
25
-    public $queueModelClass = 'Queue.QueuedTasks';
21
+	/**
22
+	 *
23
+	 * @var string
24
+	 */
25
+	public $queueModelClass = 'Queue.QueuedTasks';
26 26
 
27
-    /**
28
-     *
29
-     * @var \Queue\Model\Table\QueuedTasksTable
30
-     */
31
-    public $QueuedTasks;
27
+	/**
28
+	 *
29
+	 * @var \Queue\Model\Table\QueuedTasksTable
30
+	 */
31
+	public $QueuedTasks;
32 32
 
33
-    /**
34
-     * Timeout for run, after which the Task is reassigned to a new worker.
35
-     *
36
-     * @var int
37
-     */
38
-    public $timeout = 120;
33
+	/**
34
+	 * Timeout for run, after which the Task is reassigned to a new worker.
35
+	 *
36
+	 * @var int
37
+	 */
38
+	public $timeout = 120;
39 39
 
40
-    /**
41
-     * Number of times a failed instance of this task should be restarted before giving up.
42
-     *
43
-     * @var int
44
-     */
45
-    public $retries = 1;
40
+	/**
41
+	 * Number of times a failed instance of this task should be restarted before giving up.
42
+	 *
43
+	 * @var int
44
+	 */
45
+	public $retries = 1;
46 46
 
47
-    /**
48
-     *
49
-     * @param \Cake\Console\ConsoleIo|null $io IO
50
-     */
51
-    public function __construct(ConsoleIo $io = null)
52
-    {
53
-        parent::__construct($io);
47
+	/**
48
+	 *
49
+	 * @param \Cake\Console\ConsoleIo|null $io IO
50
+	 */
51
+	public function __construct(ConsoleIo $io = null)
52
+	{
53
+		parent::__construct($io);
54 54
 
55
-        $this->loadModel($this->queueModelClass);
56
-    }
55
+		$this->loadModel($this->queueModelClass);
56
+	}
57 57
 
58
-    /**
59
-     *
60
-     * @return string
61
-     * @throws \InvalidArgumentException
62
-     */
63
-    protected function queueTaskName()
64
-    {
65
-        $class = get_class($this);
58
+	/**
59
+	 *
60
+	 * @return string
61
+	 * @throws \InvalidArgumentException
62
+	 */
63
+	protected function queueTaskName()
64
+	{
65
+		$class = get_class($this);
66 66
 
67
-        preg_match('#\\\\Queue(.+)Task$#', $class, $matches);
68
-        if (!$matches) {
69
-            throw new InvalidArgumentException('Invalid class name: ' . $class);
70
-        }
67
+		preg_match('#\\\\Queue(.+)Task$#', $class, $matches);
68
+		if (!$matches) {
69
+			throw new InvalidArgumentException('Invalid class name: ' . $class);
70
+		}
71 71
 
72
-        return $matches[1];
73
-    }
72
+		return $matches[1];
73
+	}
74 74
 }
Please login to merge, or discard this patch.
src/Shell/Task/QueueExampleTask.php 1 patch
Indentation   +72 added lines, -72 removed lines patch added patch discarded remove patch
@@ -7,82 +7,82 @@
 block discarded – undo
7 7
 class QueueExampleTask extends QueueTask implements AddInterface
8 8
 {
9 9
 
10
-    /**
11
-     * Timeout for run, after which the task is reassigned to a new worker.
12
-     *
13
-     * @var int
14
-     */
15
-    public $timeout = 10;
10
+	/**
11
+	 * Timeout for run, after which the task is reassigned to a new worker.
12
+	 *
13
+	 * @var int
14
+	 */
15
+	public $timeout = 10;
16 16
 
17
-    /**
18
-     * Timeout for cleanup, after which completed jobs are deleted (in seconds).
19
-     *
20
-     * @var int
21
-     */
22
-    public $cleanupTimeout = 600;
17
+	/**
18
+	 * Timeout for cleanup, after which completed jobs are deleted (in seconds).
19
+	 *
20
+	 * @var int
21
+	 */
22
+	public $cleanupTimeout = 600;
23 23
 
24
-    /**
25
-     * Number of times a failed instance of this task should be restarted before giving up.
26
-     *
27
-     * @var int
28
-     */
29
-    public $retries = 0;
24
+	/**
25
+	 * Number of times a failed instance of this task should be restarted before giving up.
26
+	 *
27
+	 * @var int
28
+	 */
29
+	public $retries = 0;
30 30
 
31
-    /**
32
-     * Stores any failure messages triggered during run().
33
-     *
34
-     * @var string
35
-     */
36
-    public $failureMessage = '';
31
+	/**
32
+	 * Stores any failure messages triggered during run().
33
+	 *
34
+	 * @var string
35
+	 */
36
+	public $failureMessage = '';
37 37
 
38
-    /**
39
-     * Example add functionality.
40
-     * Will create one example job in the queue, which later will be executed using run();
41
-     *
42
-     * To invoke from CLI execute:
43
-     * - bin/cake queue add Example
44
-     *
45
-     * @return void
46
-     */
47
-    public function add(): void
48
-    {
49
-        $this->out(__d('queue', 'CakePHP Queue Example task.'));
50
-        $this->hr();
51
-        $this->out(__d('queue', 'This is a very simple example of a queueTask.'));
52
-        $this->out(__d('queue', 'Now adding an example Task Job into the Queue.'));
53
-        $this->out(__d('queue', 'This task will only produce some console output on the worker that it runs on.'));
54
-        $this->out(' ');
55
-        $this->out(__d('queue', 'To run a Worker use:'));
56
-        $this->out(__d('queue', '	cake queue runworker'));
57
-        $this->out(' ');
58
-        $this->out(__d('queue', 'You can find the sourcecode of this task in: '));
59
-        $this->out(__FILE__);
60
-        $this->out(' ');
38
+	/**
39
+	 * Example add functionality.
40
+	 * Will create one example job in the queue, which later will be executed using run();
41
+	 *
42
+	 * To invoke from CLI execute:
43
+	 * - bin/cake queue add Example
44
+	 *
45
+	 * @return void
46
+	 */
47
+	public function add(): void
48
+	{
49
+		$this->out(__d('queue', 'CakePHP Queue Example task.'));
50
+		$this->hr();
51
+		$this->out(__d('queue', 'This is a very simple example of a queueTask.'));
52
+		$this->out(__d('queue', 'Now adding an example Task Job into the Queue.'));
53
+		$this->out(__d('queue', 'This task will only produce some console output on the worker that it runs on.'));
54
+		$this->out(' ');
55
+		$this->out(__d('queue', 'To run a Worker use:'));
56
+		$this->out(__d('queue', '	cake queue runworker'));
57
+		$this->out(' ');
58
+		$this->out(__d('queue', 'You can find the sourcecode of this task in: '));
59
+		$this->out(__FILE__);
60
+		$this->out(' ');
61 61
 
62
-        // Adding a task of type 'example' with no additionally passed data
63
-        if ($this->QueuedTasks->createJob('Example')) {
64
-            $this->out(__d('queue', 'OK, job created, now run the worker'));
65
-        } else {
66
-            $this->err(__d('queue', 'Could not create Job'));
67
-        }
68
-    }
62
+		// Adding a task of type 'example' with no additionally passed data
63
+		if ($this->QueuedTasks->createJob('Example')) {
64
+			$this->out(__d('queue', 'OK, job created, now run the worker'));
65
+		} else {
66
+			$this->err(__d('queue', 'Could not create Job'));
67
+		}
68
+	}
69 69
 
70
-    /**
71
-     * Example run function.
72
-     * This function is executed, when a worker is executing a task.
73
-     * The return parameter will determine, if the task will be marked completed, or be requeued.
74
-     *
75
-     * @param array $data The array passed to QueuedTasksTable::createJob()
76
-     * @param int $taskId The id of the QueuedTask entity
77
-     * @return void
78
-     */
79
-    public function run(array $data, $taskId): void
80
-    {
81
-        $this->hr();
82
-        $this->out(__d('queue', 'CakePHP Queue Example task.'));
83
-        $this->hr();
84
-        $this->out(__d('queue', ' ->Success, the Example Task was run.<-'));
85
-        $this->out(' ');
86
-        $this->out(' ');
87
-    }
70
+	/**
71
+	 * Example run function.
72
+	 * This function is executed, when a worker is executing a task.
73
+	 * The return parameter will determine, if the task will be marked completed, or be requeued.
74
+	 *
75
+	 * @param array $data The array passed to QueuedTasksTable::createJob()
76
+	 * @param int $taskId The id of the QueuedTask entity
77
+	 * @return void
78
+	 */
79
+	public function run(array $data, $taskId): void
80
+	{
81
+		$this->hr();
82
+		$this->out(__d('queue', 'CakePHP Queue Example task.'));
83
+		$this->hr();
84
+		$this->out(__d('queue', ' ->Success, the Example Task was run.<-'));
85
+		$this->out(' ');
86
+		$this->out(' ');
87
+	}
88 88
 }
Please login to merge, or discard this patch.
src/Shell/Task/QueueTaskInterface.php 1 patch
Indentation   +8 added lines, -8 removed lines patch added patch discarded remove patch
@@ -13,12 +13,12 @@
 block discarded – undo
13 13
 interface QueueTaskInterface
14 14
 {
15 15
 
16
-    /**
17
-     * Main execution of the task.
18
-     *
19
-     * @param array $data The array passed to QueuedTasksTable::createJob()
20
-     * @param int $taskId The id of the QueuedTask entity
21
-     * @return void
22
-     */
23
-    public function run(array $data, $taskId): void;
16
+	/**
17
+	 * Main execution of the task.
18
+	 *
19
+	 * @param array $data The array passed to QueuedTasksTable::createJob()
20
+	 * @param int $taskId The id of the QueuedTask entity
21
+	 * @return void
22
+	 */
23
+	public function run(array $data, $taskId): void;
24 24
 }
Please login to merge, or discard this patch.
src/Model/Table/QueuedTasksTable.php 2 patches
Spacing   +3 added lines, -3 removed lines patch added patch discarded remove patch
@@ -189,7 +189,7 @@  discard block
 block discarded – undo
189 189
     {
190 190
         $driverName = $this->_getDriverName();
191 191
         $options = [
192
-            'fields' => function (Query $query) use ($driverName) {
192
+            'fields' => function(Query $query) use ($driverName) {
193 193
                 $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
194 194
                 $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
195 195
                 $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
@@ -236,7 +236,7 @@  discard block
 block discarded – undo
236 236
     public function getFullStats($taskName = null)
237 237
     {
238 238
         $driverName = $this->_getDriverName();
239
-        $fields = function (Query $query) use ($driverName) {
239
+        $fields = function(Query $query) use ($driverName) {
240 240
             $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
241 241
             switch ($driverName) {
242 242
                 case static::DRIVER_SQLSERVER:
@@ -370,7 +370,7 @@  discard block
 block discarded – undo
370 370
         }
371 371
 
372 372
         /** @var \Queue\Model\Entity\QueuedTask|null $task */
373
-        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
373
+        $task = $this->getConnection()->transactional(function() use ($query, $options, $now) {
374 374
             $task = $query->find('all', $options)
375 375
                 ->enableAutoFields(true)
376 376
                 ->epilog('FOR UPDATE')
Please login to merge, or discard this patch.
Indentation   +590 added lines, -590 removed lines patch added patch discarded remove patch
@@ -28,594 +28,594 @@
 block discarded – undo
28 28
 class QueuedTasksTable extends Table
29 29
 {
30 30
 
31
-    const DRIVER_MYSQL = 'Mysql';
32
-
33
-    const DRIVER_POSTGRES = 'Postgres';
34
-
35
-    const DRIVER_SQLSERVER = 'Sqlserver';
36
-
37
-    const STATS_LIMIT = 100000;
38
-
39
-    /**
40
-     *
41
-     * @var string|null
42
-     */
43
-    protected $_key;
44
-
45
-    /**
46
-     * Initialize method
47
-     *
48
-     * @param array $config The configuration for the Table.
49
-     * @return void
50
-     */
51
-    public function initialize(array $config)
52
-    {
53
-        parent::initialize($config);
54
-
55
-        $this->setTable('queued_tasks');
56
-        $this->setDisplayField('id');
57
-        $this->setPrimaryKey('id');
58
-
59
-        $this->addBehavior('Timestamp');
60
-    }
61
-
62
-    /**
63
-     * set connection name
64
-     *
65
-     * @return string
66
-     */
67
-    public static function defaultConnectionName()
68
-    {
69
-        $connection = Configure::read('Queue.connection');
70
-        if (!empty($connection)) {
71
-            return $connection;
72
-        }
73
-
74
-        return parent::defaultConnectionName();
75
-    }
76
-
77
-    /**
78
-     *
79
-     * @param \Cake\Event\Event $event Model event
80
-     * @param \ArrayObject $data The data
81
-     * @param \ArrayObject $options The options
82
-     * @return void
83
-     */
84
-    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
85
-    {
86
-        if (isset($data['data']) && $data['data'] === '') {
87
-            $data['data'] = null;
88
-        }
89
-    }
90
-
91
-    /**
92
-     * Adds a new job to the queue.
93
-     *
94
-     * @param string $taskName Task name
95
-     * @param array|null $data Array of data
96
-     * @param string $notBefore A datetime which indicates when the job may be executed
97
-     * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
-     */
99
-    public function createJob($taskName, array $data = null, string $notBefore = null)
100
-    {
101
-        $task = [
102
-            'task' => $taskName,
103
-            'data' => serialize($data),
104
-            'not_before' => $this->getDateTime()
105
-        ];
106
-
107
-        if (!empty($notBefore)) {
108
-            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
-        }
110
-
111
-        $queuedTask = $this->newEntity($task);
112
-
113
-        return $this->saveOrFail($queuedTask);
114
-    }
115
-
116
-    /**
117
-     *
118
-     * @param string|null $taskName Task name
119
-     * @return bool
120
-     */
121
-    public function isQueued($taskName = null)
122
-    {
123
-        $conditions = [
124
-            'completed IS' => null
125
-        ];
126
-        if ($taskName) {
127
-            $conditions['task'] = $taskName;
128
-        }
129
-
130
-        return (bool)$this->find()
131
-            ->where($conditions)
132
-            ->select([
133
-                'id'
134
-            ])
135
-            ->first();
136
-    }
137
-
138
-    /**
139
-     * Returns the number of items in the queue.
140
-     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
141
-     *
142
-     * @param string|null $taskName Task type to Count
143
-     * @return int
144
-     */
145
-    public function getLength($taskName = null)
146
-    {
147
-        $findConf = [
148
-            'conditions' => [
149
-                'completed IS' => null
150
-            ]
151
-        ];
152
-        if ($taskName !== null) {
153
-            $findConf['conditions']['task'] = $taskName;
154
-        }
155
-
156
-        return $this->find('all', $findConf)->count();
157
-    }
158
-
159
-    /**
160
-     * Return a list of all task types in the Queue.
161
-     *
162
-     * @return \Cake\ORM\Query
163
-     */
164
-    public function getTypes()
165
-    {
166
-        $findCond = [
167
-            'fields' => [
168
-                'task'
169
-            ],
170
-            'group' => [
171
-                'task'
172
-            ],
173
-            'keyField' => 'task',
174
-            'valueField' => 'task'
175
-        ];
176
-
177
-        return $this->find('list', $findCond);
178
-    }
179
-
180
-    /**
181
-     * Return some statistics about finished jobs still in the Database.
182
-     * TO-DO: rewrite as virtual field
183
-     *
184
-     * @return \Cake\ORM\Query
185
-     */
186
-    public function getStats()
187
-    {
188
-        $driverName = $this->_getDriverName();
189
-        $options = [
190
-            'fields' => function (Query $query) use ($driverName) {
191
-                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
192
-                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
193
-                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
194
-                switch ($driverName) {
195
-                    case static::DRIVER_SQLSERVER:
196
-                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
197
-                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
198
-                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
199
-                        break;
200
-                }
201
-                /**
202
-                 *
203
-                 * @var \Cake\ORM\Query
204
-                 */
205
-                return [
206
-                    'task',
207
-                    'num' => $query->func()->count('*'),
208
-                    'alltime' => $alltime,
209
-                    'runtime' => $runtime,
210
-                    'fetchdelay' => $fetchdelay
211
-                ];
212
-            },
213
-            'conditions' => [
214
-                'completed IS NOT' => null
215
-            ],
216
-            'group' => [
217
-                'task'
218
-            ]
219
-        ];
220
-
221
-        return $this->find('all', $options);
222
-    }
223
-
224
-    /**
225
-     * Returns [
226
-     * 'Task' => [
227
-     * 'YYYY-MM-DD' => INT,
228
-     * ...
229
-     * ]
230
-     * ]
231
-     *
232
-     * @param string|null $taskName The task name
233
-     * @return array
234
-     */
235
-    public function getFullStats($taskName = null)
236
-    {
237
-        $driverName = $this->_getDriverName();
238
-        $fields = function (Query $query) use ($driverName) {
239
-            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
240
-            switch ($driverName) {
241
-                case static::DRIVER_SQLSERVER:
242
-                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
243
-                    break;
244
-            }
245
-
246
-            return [
247
-                'task',
248
-                'created',
249
-                'duration' => $runtime
250
-            ];
251
-        };
252
-
253
-        $conditions = [
254
-            'completed IS NOT' => null
255
-        ];
256
-        if ($taskName) {
257
-            $conditions['task'] = $taskName;
258
-        }
259
-
260
-        $tasks = $this->find()
261
-            ->select($fields)
262
-            ->where($conditions)
263
-            ->enableHydration(false)
264
-            ->orderDesc('id')
265
-            ->limit(static::STATS_LIMIT)
266
-            ->all()
267
-            ->toArray();
268
-
269
-        $result = [];
270
-
271
-        $days = [];
272
-
273
-        foreach ($tasks as $task) {
274
-            /** @var \DateTime $created */
275
-            $created = $task['created'];
276
-            $day = $created->format('Y-m-d');
277
-            if (!isset($days[$day])) {
278
-                $days[$day] = $day;
279
-            }
280
-
281
-            $result[$task['task']][$day][] = $task['duration'];
282
-        }
283
-
284
-        foreach ($result as $type => $tasks) {
285
-            foreach ($tasks as $day => $durations) {
286
-                $average = array_sum($durations) / count($durations);
287
-                $result[$type][$day] = (int)$average;
288
-            }
289
-
290
-            foreach ($days as $day) {
291
-                if (isset($result[$type][$day])) {
292
-                    continue;
293
-                }
294
-
295
-                $result[$type][$day] = 0;
296
-            }
297
-
298
-            ksort($result[$type]);
299
-        }
300
-
301
-        return $result;
302
-    }
303
-
304
-    /**
305
-     * Look for a new job that can be processed with the current abilities and
306
-     * from the specified group (or any if null).
307
-     *
308
-     * @param array $capabilities Available QueueWorkerTasks.
309
-     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
310
-     * @return \Queue\Model\Entity\QueuedTask|null
311
-     */
312
-    public function requestJob(array $capabilities, array $types = [])
313
-    {
314
-        $now = $this->getDateTime();
315
-        $nowStr = $now->toDateTimeString();
316
-        $driverName = $this->_getDriverName();
317
-
318
-        $query = $this->find();
319
-        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
320
-        switch ($driverName) {
321
-            case static::DRIVER_SQLSERVER:
322
-                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
323
-                break;
324
-            case static::DRIVER_POSTGRES:
325
-                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
326
-                break;
327
-        }
328
-        $options = [
329
-            'conditions' => [
330
-                'completed IS' => null,
331
-                'OR' => []
332
-            ],
333
-            'fields' => [
334
-                'age' => $age
335
-            ],
336
-            'order' => [
337
-                'age' => 'ASC',
338
-                'id' => 'ASC'
339
-            ]
340
-        ];
341
-
342
-        if ($types) {
343
-            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
344
-        }
345
-
346
-        // Generate the task specific conditions.
347
-        foreach ($capabilities as $task) {
348
-            list ($plugin, $name) = pluginSplit($task['name']);
349
-            $timeoutAt = $now->copy();
350
-            $tmp = [
351
-                'task' => $name,
352
-                'AND' => [
353
-                    [
354
-                        'OR' => [
355
-                            'not_before <' => $nowStr,
356
-                            'not_before IS' => null
357
-                        ]
358
-                    ],
359
-                    [
360
-                        'OR' => [
361
-                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
362
-                            'fetched IS' => null
363
-                        ]
364
-                    ]
365
-                ],
366
-                'failed_count <' => ($task['retries'] + 1)
367
-            ];
368
-            $options['conditions']['OR'][] = $tmp;
369
-        }
370
-
371
-        /** @var \Queue\Model\Entity\QueuedTask|null $task */
372
-        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
373
-            $task = $query->find('all', $options)
374
-                ->enableAutoFields(true)
375
-                ->epilog('FOR UPDATE')
376
-                ->first();
377
-
378
-            if (!$task) {
379
-                return null;
380
-            }
381
-
382
-            $key = $this->key();
383
-            $task = $this->patchEntity($task, [
384
-                'worker_key' => $key,
385
-                'fetched' => $now
386
-            ]);
387
-
388
-            return $this->saveOrFail($task);
389
-        });
390
-
391
-        if (!$task) {
392
-            return null;
393
-        }
394
-
395
-        return $task;
396
-    }
397
-
398
-    /**
399
-     * Mark a task as Completed, removing it from the queue.
400
-     *
401
-     * @param \Queue\Model\Entity\QueuedTask $task Task
402
-     * @return bool Success
403
-     */
404
-    public function markJobDone(QueuedTask $task)
405
-    {
406
-        $fields = [
407
-            'completed' => $this->getDateTime()
408
-        ];
409
-        $task = $this->patchEntity($task, $fields);
410
-
411
-        return (bool)$this->save($task);
412
-    }
413
-
414
-    /**
415
-     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
416
-     *
417
-     * @param \Queue\Model\Entity\QueuedTask $task Task
418
-     * @param string|null $failureMessage Optional message to append to the failure_message field.
419
-     * @return bool Success
420
-     */
421
-    public function markJobFailed(QueuedTask $task, $failureMessage = null)
422
-    {
423
-        $fields = [
424
-            'failed_count' => $task->failed_count + 1,
425
-            'failure_message' => $failureMessage
426
-        ];
427
-        $task = $this->patchEntity($task, $fields);
428
-
429
-        return (bool)$this->save($task);
430
-    }
431
-
432
-    /**
433
-     * Reset current jobs
434
-     *
435
-     * @param int|null $id ID
436
-     *
437
-     * @return int Success
438
-     */
439
-    public function reset($id = null)
440
-    {
441
-        $fields = [
442
-            'completed' => null,
443
-            'fetched' => null,
444
-            'failed_count' => 0,
445
-            'worker_key' => null,
446
-            'failure_message' => null
447
-        ];
448
-        $conditions = [
449
-            'completed IS' => null
450
-        ];
451
-        if ($id) {
452
-            $conditions['id'] = $id;
453
-        }
454
-
455
-        return $this->updateAll($fields, $conditions);
456
-    }
457
-
458
-    /**
459
-     *
460
-     * @param string $taskName Task name
461
-     *
462
-     * @return int
463
-     */
464
-    public function rerun($taskName)
465
-    {
466
-        $fields = [
467
-            'completed' => null,
468
-            'fetched' => null,
469
-            'failed_count' => 0,
470
-            'worker_key' => null,
471
-            'failure_message' => null
472
-        ];
473
-        $conditions = [
474
-            'completed IS NOT' => null,
475
-            'task' => $taskName
476
-        ];
477
-
478
-        return $this->updateAll($fields, $conditions);
479
-    }
480
-
481
-    /**
482
-     * Cleanup/Delete Completed Tasks.
483
-     *
484
-     * @return void
485
-     */
486
-    public function cleanOldJobs()
487
-    {
488
-        if (!Configure::read('Queue.cleanuptimeout')) {
489
-            return;
490
-        }
491
-
492
-        $this->deleteAll([
493
-            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
494
-        ]);
495
-    }
496
-
497
-    /**
498
-     *
499
-     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
500
-     * @param array $taskConfiguration Task configuration
501
-     * @return string
502
-     */
503
-    public function getFailedStatus($queuedTask, array $taskConfiguration)
504
-    {
505
-        $failureMessageRequeued = 'requeued';
506
-
507
-        $queuedTaskName = 'Queue' . $queuedTask->task;
508
-        if (empty($taskConfiguration[$queuedTaskName])) {
509
-            return $failureMessageRequeued;
510
-        }
511
-        $retries = $taskConfiguration[$queuedTaskName]['retries'];
512
-        if ($queuedTask->failed_count <= $retries) {
513
-            return $failureMessageRequeued;
514
-        }
515
-
516
-        return 'aborted';
517
-    }
518
-
519
-    /**
520
-     * Generates a unique Identifier for the current worker thread.
521
-     *
522
-     * Useful to identify the currently running processes for this thread.
523
-     *
524
-     * @return string Identifier
525
-     */
526
-    public function key()
527
-    {
528
-        if ($this->_key !== null) {
529
-            return $this->_key;
530
-        }
531
-        $this->_key = sha1(microtime());
532
-        if (!$this->_key) {
533
-            throw new RuntimeException('Invalid key generated');
534
-        }
535
-
536
-        return $this->_key;
537
-    }
538
-
539
-    /**
540
-     * Resets worker Identifier
541
-     *
542
-     * @return void
543
-     */
544
-    public function clearKey()
545
-    {
546
-        $this->_key = null;
547
-    }
548
-
549
-    /**
550
-     * truncate()
551
-     *
552
-     * @return void
553
-     */
554
-    public function truncate()
555
-    {
556
-        $sql = $this->getSchema()->truncateSql($this->_connection);
557
-        foreach ($sql as $snippet) {
558
-            $this->_connection->execute($snippet);
559
-        }
560
-    }
561
-
562
-    /**
563
-     * get the name of the driver
564
-     *
565
-     * @return string
566
-     */
567
-    protected function _getDriverName()
568
-    {
569
-        $className = explode('\\', $this->getConnection()->config()['driver']);
570
-        $name = end($className);
571
-
572
-        return $name;
573
-    }
574
-
575
-    /**
576
-     *
577
-     * @param array $conditions Conditions
578
-     * @param string $key Key
579
-     * @param array $values Values
580
-     * @return array
581
-     */
582
-    protected function addFilter(array $conditions, $key, array $values)
583
-    {
584
-        $include = [];
585
-        $exclude = [];
586
-        foreach ($values as $value) {
587
-            if (substr($value, 0, 1) === '-') {
588
-                $exclude[] = substr($value, 1);
589
-            } else {
590
-                $include[] = $value;
591
-            }
592
-        }
593
-
594
-        if ($include) {
595
-            $conditions[$key . ' IN'] = $include;
596
-        }
597
-        if ($exclude) {
598
-            $conditions[$key . ' NOT IN'] = $exclude;
599
-        }
600
-
601
-        return $conditions;
602
-    }
603
-
604
-    /**
605
-     * Returns a DateTime object from different input.
606
-     *
607
-     * Without argument this will be "now".
608
-     *
609
-     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
610
-     *
611
-     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
612
-     */
613
-    protected function getDateTime($notBefore = null)
614
-    {
615
-        if (is_object($notBefore)) {
616
-            return $notBefore;
617
-        }
618
-
619
-        return new FrozenTime($notBefore);
620
-    }
31
+	const DRIVER_MYSQL = 'Mysql';
32
+
33
+	const DRIVER_POSTGRES = 'Postgres';
34
+
35
+	const DRIVER_SQLSERVER = 'Sqlserver';
36
+
37
+	const STATS_LIMIT = 100000;
38
+
39
+	/**
40
+	 *
41
+	 * @var string|null
42
+	 */
43
+	protected $_key;
44
+
45
+	/**
46
+	 * Initialize method
47
+	 *
48
+	 * @param array $config The configuration for the Table.
49
+	 * @return void
50
+	 */
51
+	public function initialize(array $config)
52
+	{
53
+		parent::initialize($config);
54
+
55
+		$this->setTable('queued_tasks');
56
+		$this->setDisplayField('id');
57
+		$this->setPrimaryKey('id');
58
+
59
+		$this->addBehavior('Timestamp');
60
+	}
61
+
62
+	/**
63
+	 * set connection name
64
+	 *
65
+	 * @return string
66
+	 */
67
+	public static function defaultConnectionName()
68
+	{
69
+		$connection = Configure::read('Queue.connection');
70
+		if (!empty($connection)) {
71
+			return $connection;
72
+		}
73
+
74
+		return parent::defaultConnectionName();
75
+	}
76
+
77
+	/**
78
+	 *
79
+	 * @param \Cake\Event\Event $event Model event
80
+	 * @param \ArrayObject $data The data
81
+	 * @param \ArrayObject $options The options
82
+	 * @return void
83
+	 */
84
+	public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
85
+	{
86
+		if (isset($data['data']) && $data['data'] === '') {
87
+			$data['data'] = null;
88
+		}
89
+	}
90
+
91
+	/**
92
+	 * Adds a new job to the queue.
93
+	 *
94
+	 * @param string $taskName Task name
95
+	 * @param array|null $data Array of data
96
+	 * @param string $notBefore A datetime which indicates when the job may be executed
97
+	 * @return \Queue\Model\Entity\QueuedTask Saved job entity
98
+	 */
99
+	public function createJob($taskName, array $data = null, string $notBefore = null)
100
+	{
101
+		$task = [
102
+			'task' => $taskName,
103
+			'data' => serialize($data),
104
+			'not_before' => $this->getDateTime()
105
+		];
106
+
107
+		if (!empty($notBefore)) {
108
+			$task['not_before'] = $this->getDateTime(strtotime($notBefore));
109
+		}
110
+
111
+		$queuedTask = $this->newEntity($task);
112
+
113
+		return $this->saveOrFail($queuedTask);
114
+	}
115
+
116
+	/**
117
+	 *
118
+	 * @param string|null $taskName Task name
119
+	 * @return bool
120
+	 */
121
+	public function isQueued($taskName = null)
122
+	{
123
+		$conditions = [
124
+			'completed IS' => null
125
+		];
126
+		if ($taskName) {
127
+			$conditions['task'] = $taskName;
128
+		}
129
+
130
+		return (bool)$this->find()
131
+			->where($conditions)
132
+			->select([
133
+				'id'
134
+			])
135
+			->first();
136
+	}
137
+
138
+	/**
139
+	 * Returns the number of items in the queue.
140
+	 * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
141
+	 *
142
+	 * @param string|null $taskName Task type to Count
143
+	 * @return int
144
+	 */
145
+	public function getLength($taskName = null)
146
+	{
147
+		$findConf = [
148
+			'conditions' => [
149
+				'completed IS' => null
150
+			]
151
+		];
152
+		if ($taskName !== null) {
153
+			$findConf['conditions']['task'] = $taskName;
154
+		}
155
+
156
+		return $this->find('all', $findConf)->count();
157
+	}
158
+
159
+	/**
160
+	 * Return a list of all task types in the Queue.
161
+	 *
162
+	 * @return \Cake\ORM\Query
163
+	 */
164
+	public function getTypes()
165
+	{
166
+		$findCond = [
167
+			'fields' => [
168
+				'task'
169
+			],
170
+			'group' => [
171
+				'task'
172
+			],
173
+			'keyField' => 'task',
174
+			'valueField' => 'task'
175
+		];
176
+
177
+		return $this->find('list', $findCond);
178
+	}
179
+
180
+	/**
181
+	 * Return some statistics about finished jobs still in the Database.
182
+	 * TO-DO: rewrite as virtual field
183
+	 *
184
+	 * @return \Cake\ORM\Query
185
+	 */
186
+	public function getStats()
187
+	{
188
+		$driverName = $this->_getDriverName();
189
+		$options = [
190
+			'fields' => function (Query $query) use ($driverName) {
191
+				$alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
192
+				$runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
193
+				$fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
194
+				switch ($driverName) {
195
+					case static::DRIVER_SQLSERVER:
196
+						$alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
197
+						$runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
198
+						$fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
199
+						break;
200
+				}
201
+				/**
202
+				 *
203
+				 * @var \Cake\ORM\Query
204
+				 */
205
+				return [
206
+					'task',
207
+					'num' => $query->func()->count('*'),
208
+					'alltime' => $alltime,
209
+					'runtime' => $runtime,
210
+					'fetchdelay' => $fetchdelay
211
+				];
212
+			},
213
+			'conditions' => [
214
+				'completed IS NOT' => null
215
+			],
216
+			'group' => [
217
+				'task'
218
+			]
219
+		];
220
+
221
+		return $this->find('all', $options);
222
+	}
223
+
224
+	/**
225
+	 * Returns [
226
+	 * 'Task' => [
227
+	 * 'YYYY-MM-DD' => INT,
228
+	 * ...
229
+	 * ]
230
+	 * ]
231
+	 *
232
+	 * @param string|null $taskName The task name
233
+	 * @return array
234
+	 */
235
+	public function getFullStats($taskName = null)
236
+	{
237
+		$driverName = $this->_getDriverName();
238
+		$fields = function (Query $query) use ($driverName) {
239
+			$runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
240
+			switch ($driverName) {
241
+				case static::DRIVER_SQLSERVER:
242
+					$runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
243
+					break;
244
+			}
245
+
246
+			return [
247
+				'task',
248
+				'created',
249
+				'duration' => $runtime
250
+			];
251
+		};
252
+
253
+		$conditions = [
254
+			'completed IS NOT' => null
255
+		];
256
+		if ($taskName) {
257
+			$conditions['task'] = $taskName;
258
+		}
259
+
260
+		$tasks = $this->find()
261
+			->select($fields)
262
+			->where($conditions)
263
+			->enableHydration(false)
264
+			->orderDesc('id')
265
+			->limit(static::STATS_LIMIT)
266
+			->all()
267
+			->toArray();
268
+
269
+		$result = [];
270
+
271
+		$days = [];
272
+
273
+		foreach ($tasks as $task) {
274
+			/** @var \DateTime $created */
275
+			$created = $task['created'];
276
+			$day = $created->format('Y-m-d');
277
+			if (!isset($days[$day])) {
278
+				$days[$day] = $day;
279
+			}
280
+
281
+			$result[$task['task']][$day][] = $task['duration'];
282
+		}
283
+
284
+		foreach ($result as $type => $tasks) {
285
+			foreach ($tasks as $day => $durations) {
286
+				$average = array_sum($durations) / count($durations);
287
+				$result[$type][$day] = (int)$average;
288
+			}
289
+
290
+			foreach ($days as $day) {
291
+				if (isset($result[$type][$day])) {
292
+					continue;
293
+				}
294
+
295
+				$result[$type][$day] = 0;
296
+			}
297
+
298
+			ksort($result[$type]);
299
+		}
300
+
301
+		return $result;
302
+	}
303
+
304
+	/**
305
+	 * Look for a new job that can be processed with the current abilities and
306
+	 * from the specified group (or any if null).
307
+	 *
308
+	 * @param array $capabilities Available QueueWorkerTasks.
309
+	 * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
310
+	 * @return \Queue\Model\Entity\QueuedTask|null
311
+	 */
312
+	public function requestJob(array $capabilities, array $types = [])
313
+	{
314
+		$now = $this->getDateTime();
315
+		$nowStr = $now->toDateTimeString();
316
+		$driverName = $this->_getDriverName();
317
+
318
+		$query = $this->find();
319
+		$age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
320
+		switch ($driverName) {
321
+			case static::DRIVER_SQLSERVER:
322
+				$age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
323
+				break;
324
+			case static::DRIVER_POSTGRES:
325
+				$age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
326
+				break;
327
+		}
328
+		$options = [
329
+			'conditions' => [
330
+				'completed IS' => null,
331
+				'OR' => []
332
+			],
333
+			'fields' => [
334
+				'age' => $age
335
+			],
336
+			'order' => [
337
+				'age' => 'ASC',
338
+				'id' => 'ASC'
339
+			]
340
+		];
341
+
342
+		if ($types) {
343
+			$options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
344
+		}
345
+
346
+		// Generate the task specific conditions.
347
+		foreach ($capabilities as $task) {
348
+			list ($plugin, $name) = pluginSplit($task['name']);
349
+			$timeoutAt = $now->copy();
350
+			$tmp = [
351
+				'task' => $name,
352
+				'AND' => [
353
+					[
354
+						'OR' => [
355
+							'not_before <' => $nowStr,
356
+							'not_before IS' => null
357
+						]
358
+					],
359
+					[
360
+						'OR' => [
361
+							'fetched <' => $timeoutAt->subSeconds($task['timeout']),
362
+							'fetched IS' => null
363
+						]
364
+					]
365
+				],
366
+				'failed_count <' => ($task['retries'] + 1)
367
+			];
368
+			$options['conditions']['OR'][] = $tmp;
369
+		}
370
+
371
+		/** @var \Queue\Model\Entity\QueuedTask|null $task */
372
+		$task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
373
+			$task = $query->find('all', $options)
374
+				->enableAutoFields(true)
375
+				->epilog('FOR UPDATE')
376
+				->first();
377
+
378
+			if (!$task) {
379
+				return null;
380
+			}
381
+
382
+			$key = $this->key();
383
+			$task = $this->patchEntity($task, [
384
+				'worker_key' => $key,
385
+				'fetched' => $now
386
+			]);
387
+
388
+			return $this->saveOrFail($task);
389
+		});
390
+
391
+		if (!$task) {
392
+			return null;
393
+		}
394
+
395
+		return $task;
396
+	}
397
+
398
+	/**
399
+	 * Mark a task as Completed, removing it from the queue.
400
+	 *
401
+	 * @param \Queue\Model\Entity\QueuedTask $task Task
402
+	 * @return bool Success
403
+	 */
404
+	public function markJobDone(QueuedTask $task)
405
+	{
406
+		$fields = [
407
+			'completed' => $this->getDateTime()
408
+		];
409
+		$task = $this->patchEntity($task, $fields);
410
+
411
+		return (bool)$this->save($task);
412
+	}
413
+
414
+	/**
415
+	 * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
416
+	 *
417
+	 * @param \Queue\Model\Entity\QueuedTask $task Task
418
+	 * @param string|null $failureMessage Optional message to append to the failure_message field.
419
+	 * @return bool Success
420
+	 */
421
+	public function markJobFailed(QueuedTask $task, $failureMessage = null)
422
+	{
423
+		$fields = [
424
+			'failed_count' => $task->failed_count + 1,
425
+			'failure_message' => $failureMessage
426
+		];
427
+		$task = $this->patchEntity($task, $fields);
428
+
429
+		return (bool)$this->save($task);
430
+	}
431
+
432
+	/**
433
+	 * Reset current jobs
434
+	 *
435
+	 * @param int|null $id ID
436
+	 *
437
+	 * @return int Success
438
+	 */
439
+	public function reset($id = null)
440
+	{
441
+		$fields = [
442
+			'completed' => null,
443
+			'fetched' => null,
444
+			'failed_count' => 0,
445
+			'worker_key' => null,
446
+			'failure_message' => null
447
+		];
448
+		$conditions = [
449
+			'completed IS' => null
450
+		];
451
+		if ($id) {
452
+			$conditions['id'] = $id;
453
+		}
454
+
455
+		return $this->updateAll($fields, $conditions);
456
+	}
457
+
458
+	/**
459
+	 *
460
+	 * @param string $taskName Task name
461
+	 *
462
+	 * @return int
463
+	 */
464
+	public function rerun($taskName)
465
+	{
466
+		$fields = [
467
+			'completed' => null,
468
+			'fetched' => null,
469
+			'failed_count' => 0,
470
+			'worker_key' => null,
471
+			'failure_message' => null
472
+		];
473
+		$conditions = [
474
+			'completed IS NOT' => null,
475
+			'task' => $taskName
476
+		];
477
+
478
+		return $this->updateAll($fields, $conditions);
479
+	}
480
+
481
+	/**
482
+	 * Cleanup/Delete Completed Tasks.
483
+	 *
484
+	 * @return void
485
+	 */
486
+	public function cleanOldJobs()
487
+	{
488
+		if (!Configure::read('Queue.cleanuptimeout')) {
489
+			return;
490
+		}
491
+
492
+		$this->deleteAll([
493
+			'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
494
+		]);
495
+	}
496
+
497
+	/**
498
+	 *
499
+	 * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
500
+	 * @param array $taskConfiguration Task configuration
501
+	 * @return string
502
+	 */
503
+	public function getFailedStatus($queuedTask, array $taskConfiguration)
504
+	{
505
+		$failureMessageRequeued = 'requeued';
506
+
507
+		$queuedTaskName = 'Queue' . $queuedTask->task;
508
+		if (empty($taskConfiguration[$queuedTaskName])) {
509
+			return $failureMessageRequeued;
510
+		}
511
+		$retries = $taskConfiguration[$queuedTaskName]['retries'];
512
+		if ($queuedTask->failed_count <= $retries) {
513
+			return $failureMessageRequeued;
514
+		}
515
+
516
+		return 'aborted';
517
+	}
518
+
519
+	/**
520
+	 * Generates a unique Identifier for the current worker thread.
521
+	 *
522
+	 * Useful to identify the currently running processes for this thread.
523
+	 *
524
+	 * @return string Identifier
525
+	 */
526
+	public function key()
527
+	{
528
+		if ($this->_key !== null) {
529
+			return $this->_key;
530
+		}
531
+		$this->_key = sha1(microtime());
532
+		if (!$this->_key) {
533
+			throw new RuntimeException('Invalid key generated');
534
+		}
535
+
536
+		return $this->_key;
537
+	}
538
+
539
+	/**
540
+	 * Resets worker Identifier
541
+	 *
542
+	 * @return void
543
+	 */
544
+	public function clearKey()
545
+	{
546
+		$this->_key = null;
547
+	}
548
+
549
+	/**
550
+	 * truncate()
551
+	 *
552
+	 * @return void
553
+	 */
554
+	public function truncate()
555
+	{
556
+		$sql = $this->getSchema()->truncateSql($this->_connection);
557
+		foreach ($sql as $snippet) {
558
+			$this->_connection->execute($snippet);
559
+		}
560
+	}
561
+
562
+	/**
563
+	 * get the name of the driver
564
+	 *
565
+	 * @return string
566
+	 */
567
+	protected function _getDriverName()
568
+	{
569
+		$className = explode('\\', $this->getConnection()->config()['driver']);
570
+		$name = end($className);
571
+
572
+		return $name;
573
+	}
574
+
575
+	/**
576
+	 *
577
+	 * @param array $conditions Conditions
578
+	 * @param string $key Key
579
+	 * @param array $values Values
580
+	 * @return array
581
+	 */
582
+	protected function addFilter(array $conditions, $key, array $values)
583
+	{
584
+		$include = [];
585
+		$exclude = [];
586
+		foreach ($values as $value) {
587
+			if (substr($value, 0, 1) === '-') {
588
+				$exclude[] = substr($value, 1);
589
+			} else {
590
+				$include[] = $value;
591
+			}
592
+		}
593
+
594
+		if ($include) {
595
+			$conditions[$key . ' IN'] = $include;
596
+		}
597
+		if ($exclude) {
598
+			$conditions[$key . ' NOT IN'] = $exclude;
599
+		}
600
+
601
+		return $conditions;
602
+	}
603
+
604
+	/**
605
+	 * Returns a DateTime object from different input.
606
+	 *
607
+	 * Without argument this will be "now".
608
+	 *
609
+	 * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
610
+	 *
611
+	 * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
612
+	 */
613
+	protected function getDateTime($notBefore = null)
614
+	{
615
+		if (is_object($notBefore)) {
616
+			return $notBefore;
617
+		}
618
+
619
+		return new FrozenTime($notBefore);
620
+	}
621 621
 }
Please login to merge, or discard this patch.