Completed
Pull Request — master (#35)
by
unknown
03:07
created
src/Queue/Config.php 1 patch
Indentation   +56 added lines, -56 removed lines patch added patch discarded remove patch
@@ -6,66 +6,66 @@
 block discarded – undo
6 6
 class Config
7 7
 {
8 8
 
9
-    /**
10
-     *
11
-     * @return int
12
-     */
13
-    public static function defaultWorkerTimeout()
14
-    {
15
-        return Configure::read('Queue.defaultWorkerTimeout', 600); // 10min
16
-    }
9
+	/**
10
+	 *
11
+	 * @return int
12
+	 */
13
+	public static function defaultWorkerTimeout()
14
+	{
15
+		return Configure::read('Queue.defaultWorkerTimeout', 600); // 10min
16
+	}
17 17
 
18
-    /**
19
-     *
20
-     * @return int
21
-     */
22
-    public static function workerMaxRuntime()
23
-    {
24
-        return Configure::read('Queue.workerMaxRuntime', 120);
25
-    }
18
+	/**
19
+	 *
20
+	 * @return int
21
+	 */
22
+	public static function workerMaxRuntime()
23
+	{
24
+		return Configure::read('Queue.workerMaxRuntime', 120);
25
+	}
26 26
 
27
-    /**
28
-     *
29
-     * @return int
30
-     */
31
-    public static function cleanupTimeout()
32
-    {
33
-        return Configure::read('Queue.cleanupTimeout', 2592000); // 30 days
34
-    }
27
+	/**
28
+	 *
29
+	 * @return int
30
+	 */
31
+	public static function cleanupTimeout()
32
+	{
33
+		return Configure::read('Queue.cleanupTimeout', 2592000); // 30 days
34
+	}
35 35
 
36
-    /**
37
-     *
38
-     * @return int
39
-     */
40
-    public static function sleepTime()
41
-    {
42
-        return Configure::read('Queue.sleepTime', 10);
43
-    }
36
+	/**
37
+	 *
38
+	 * @return int
39
+	 */
40
+	public static function sleepTime()
41
+	{
42
+		return Configure::read('Queue.sleepTime', 10);
43
+	}
44 44
 
45
-    /**
46
-     *
47
-     * @return int
48
-     */
49
-    public static function gcprob()
50
-    {
51
-        return Configure::read('Queue.gcprob', 10);
52
-    }
45
+	/**
46
+	 *
47
+	 * @return int
48
+	 */
49
+	public static function gcprob()
50
+	{
51
+		return Configure::read('Queue.gcprob', 10);
52
+	}
53 53
 
54
-    /**
55
-     *
56
-     * @return int
57
-     */
58
-    public static function defaultWorkerRetries()
59
-    {
60
-        return Configure::read('Queue.defaultWorkerRetries', 1);
61
-    }
54
+	/**
55
+	 *
56
+	 * @return int
57
+	 */
58
+	public static function defaultWorkerRetries()
59
+	{
60
+		return Configure::read('Queue.defaultWorkerRetries', 1);
61
+	}
62 62
 
63
-    /**
64
-     *
65
-     * @return int
66
-     */
67
-    public static function maxWorkers()
68
-    {
69
-        return Configure::read('Queue.maxWorkers', 1);
70
-    }
63
+	/**
64
+	 *
65
+	 * @return int
66
+	 */
67
+	public static function maxWorkers()
68
+	{
69
+		return Configure::read('Queue.maxWorkers', 1);
70
+	}
71 71
 }
Please login to merge, or discard this patch.
src/Shell/Task/AddInterface.php 1 patch
Indentation   +8 added lines, -8 removed lines patch added patch discarded remove patch
@@ -10,12 +10,12 @@
 block discarded – undo
10 10
 interface AddInterface
11 11
 {
12 12
 
13
-    /**
14
-     * Allows adding a task to the queue.
15
-     *
16
-     * Will create one example task in the queue, which later will be executed using run().
17
-     *
18
-     * @return void
19
-     */
20
-    public function add(): void;
13
+	/**
14
+	 * Allows adding a task to the queue.
15
+	 *
16
+	 * Will create one example task in the queue, which later will be executed using run().
17
+	 *
18
+	 * @return void
19
+	 */
20
+	public function add(): void;
21 21
 }
Please login to merge, or discard this patch.
src/Model/Entity/QueuedTask.php 1 patch
Indentation   +17 added lines, -17 removed lines patch added patch discarded remove patch
@@ -20,21 +20,21 @@
 block discarded – undo
20 20
 class QueuedTask extends Entity
21 21
 {
22 22
 
23
-    /**
24
-     *
25
-     * {@inheritdoc}
26
-     *
27
-     * @var array
28
-     */
29
-    protected $_accessible = [
30
-        'task' => true,
31
-        'data' => true,
32
-        'not_before' => true,
33
-        'fetched' => true,
34
-        'completed' => true,
35
-        'failed_count' => true,
36
-        'failure_message' => true,
37
-        'worker_key' => true,
38
-        'created' => true
39
-    ];
23
+	/**
24
+	 *
25
+	 * {@inheritdoc}
26
+	 *
27
+	 * @var array
28
+	 */
29
+	protected $_accessible = [
30
+		'task' => true,
31
+		'data' => true,
32
+		'not_before' => true,
33
+		'fetched' => true,
34
+		'completed' => true,
35
+		'failed_count' => true,
36
+		'failure_message' => true,
37
+		'worker_key' => true,
38
+		'created' => true
39
+	];
40 40
 }
Please login to merge, or discard this patch.
tests/Fixture/QueuedTasksFixture.php 1 patch
Indentation   +42 added lines, -42 removed lines patch added patch discarded remove patch
@@ -8,47 +8,47 @@
 block discarded – undo
8 8
  */
9 9
 class QueuedTasksFixture extends TestFixture
10 10
 {
11
-    /**
12
-     * Fields
13
-     *
14
-     * @var array
15
-     */
16
-    // @codingStandardsIgnoreStart
17
-    public $fields = [
18
-        'id' => ['type' => 'integer', 'length' => 10, 'unsigned' => true, 'null' => false, 'default' => null, 'comment' => '', 'autoIncrement' => true, 'precision' => null],
19
-        'task' => ['type' => 'string', 'length' => 255, 'null' => false, 'default' => null, 'collate' => 'utf8_general_ci', 'comment' => '', 'precision' => null, 'fixed' => null],
20
-        'data' => ['type' => 'text', 'length' => 16777215, 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'comment' => '', 'precision' => null],
21
-        'not_before' => ['type' => 'timestamp', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
22
-        'fetched' => ['type' => 'timestamp', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
23
-        'completed' => ['type' => 'timestamp', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
24
-        'failed_count' => ['type' => 'integer', 'length' => 10, 'unsigned' => true, 'null' => false, 'default' => '0', 'comment' => '', 'precision' => null, 'autoIncrement' => null],
25
-        'failure_message' => ['type' => 'text', 'length' => null, 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'comment' => '', 'precision' => null],
26
-        'worker_key' => ['type' => 'string', 'fixed' => true, 'length' => 40, 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'comment' => '', 'precision' => null],
27
-        'created' => ['type' => 'timestamp', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
28
-        '_indexes' => [
29
-            'completed' => ['type' => 'index', 'columns' => ['completed'], 'length' => []],
30
-            'worker_key' => ['type' => 'index', 'columns' => ['worker_key'], 'length' => []],
31
-            'task' => ['type' => 'index', 'columns' => ['task'], 'length' => []],
32
-        ],
33
-        '_constraints' => [
34
-            'primary' => ['type' => 'primary', 'columns' => ['id'], 'length' => []],
35
-        ],
36
-        '_options' => [
37
-            'engine' => 'InnoDB',
38
-            'collation' => 'utf8_general_ci'
39
-        ],
40
-    ];
41
-    // @codingStandardsIgnoreEnd
11
+	/**
12
+	 * Fields
13
+	 *
14
+	 * @var array
15
+	 */
16
+	// @codingStandardsIgnoreStart
17
+	public $fields = [
18
+		'id' => ['type' => 'integer', 'length' => 10, 'unsigned' => true, 'null' => false, 'default' => null, 'comment' => '', 'autoIncrement' => true, 'precision' => null],
19
+		'task' => ['type' => 'string', 'length' => 255, 'null' => false, 'default' => null, 'collate' => 'utf8_general_ci', 'comment' => '', 'precision' => null, 'fixed' => null],
20
+		'data' => ['type' => 'text', 'length' => 16777215, 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'comment' => '', 'precision' => null],
21
+		'not_before' => ['type' => 'timestamp', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
22
+		'fetched' => ['type' => 'timestamp', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
23
+		'completed' => ['type' => 'timestamp', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
24
+		'failed_count' => ['type' => 'integer', 'length' => 10, 'unsigned' => true, 'null' => false, 'default' => '0', 'comment' => '', 'precision' => null, 'autoIncrement' => null],
25
+		'failure_message' => ['type' => 'text', 'length' => null, 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'comment' => '', 'precision' => null],
26
+		'worker_key' => ['type' => 'string', 'fixed' => true, 'length' => 40, 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'comment' => '', 'precision' => null],
27
+		'created' => ['type' => 'timestamp', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
28
+		'_indexes' => [
29
+			'completed' => ['type' => 'index', 'columns' => ['completed'], 'length' => []],
30
+			'worker_key' => ['type' => 'index', 'columns' => ['worker_key'], 'length' => []],
31
+			'task' => ['type' => 'index', 'columns' => ['task'], 'length' => []],
32
+		],
33
+		'_constraints' => [
34
+			'primary' => ['type' => 'primary', 'columns' => ['id'], 'length' => []],
35
+		],
36
+		'_options' => [
37
+			'engine' => 'InnoDB',
38
+			'collation' => 'utf8_general_ci'
39
+		],
40
+	];
41
+	// @codingStandardsIgnoreEnd
42 42
     
43
-    /**
44
-     * Init method
45
-     *
46
-     * @return void
47
-     */
48
-    public function init()
49
-    {
50
-        $this->records = [
51
-        ];
52
-        parent::init();
53
-    }
43
+	/**
44
+	 * Init method
45
+	 *
46
+	 * @return void
47
+	 */
48
+	public function init()
49
+	{
50
+		$this->records = [
51
+		];
52
+		parent::init();
53
+	}
54 54
 }
Please login to merge, or discard this patch.
tests/config/app_queue.php 1 patch
Indentation   +10 added lines, -10 removed lines patch added patch discarded remove patch
@@ -1,18 +1,18 @@
 block discarded – undo
1 1
 <?php
2 2
 
3 3
 return [
4
-    'Queue' => [
5
-        // time (in seconds) after which a job is requeued if the worker doesn't report back
6
-        'defaultWorkerTimeout' => 1800,
4
+	'Queue' => [
5
+		// time (in seconds) after which a job is requeued if the worker doesn't report back
6
+		'defaultWorkerTimeout' => 1800,
7 7
 
8
-        // seconds of running time after which the worker will terminate (0 = unlimited)
9
-        'workerMaxRuntime' => 120,
8
+		// seconds of running time after which the worker will terminate (0 = unlimited)
9
+		'workerMaxRuntime' => 120,
10 10
 
11
-        // minimum time (in seconds) which a task remains in the database before being cleaned up.
12
-        'cleanupTimeout' => 2592000, // 30 days
11
+		// minimum time (in seconds) which a task remains in the database before being cleaned up.
12
+		'cleanupTimeout' => 2592000, // 30 days
13 13
 
14
-        /* Optional */
14
+		/* Optional */
15 15
 
16
-        'isSearchEnabled' => true,
17
-    ],
16
+		'isSearchEnabled' => true,
17
+	],
18 18
 ];
Please login to merge, or discard this patch.
src/Queue/TaskFinder.php 1 patch
Indentation   +68 added lines, -68 removed lines patch added patch discarded remove patch
@@ -8,80 +8,80 @@
 block discarded – undo
8 8
 class TaskFinder
9 9
 {
10 10
 
11
-    /**
12
-     *
13
-     * @var array|null
14
-     */
15
-    protected $tasks;
11
+	/**
12
+	 *
13
+	 * @var array|null
14
+	 */
15
+	protected $tasks;
16 16
 
17
-    /**
18
-     * Returns all possible Queue tasks.
19
-     *
20
-     * Makes sure that app tasks are prioritized over plugin ones.
21
-     *
22
-     * @return array
23
-     */
24
-    public function allAppAndPluginTasks()
25
-    {
26
-        if ($this->tasks !== null) {
27
-            return $this->tasks;
28
-        }
17
+	/**
18
+	 * Returns all possible Queue tasks.
19
+	 *
20
+	 * Makes sure that app tasks are prioritized over plugin ones.
21
+	 *
22
+	 * @return array
23
+	 */
24
+	public function allAppAndPluginTasks()
25
+	{
26
+		if ($this->tasks !== null) {
27
+			return $this->tasks;
28
+		}
29 29
 
30
-        $paths = App::path('Shell/Task');
31
-        $this->tasks = [];
30
+		$paths = App::path('Shell/Task');
31
+		$this->tasks = [];
32 32
 
33
-        foreach ($paths as $path) {
34
-            $Folder = new Folder($path);
35
-            $this->tasks = $this->getAppPaths($Folder);
36
-        }
37
-        $plugins = Plugin::loaded();
38
-        foreach ($plugins as $plugin) {
39
-            $pluginPaths = App::path('Shell/Task', $plugin);
40
-            foreach ($pluginPaths as $pluginPath) {
41
-                $Folder = new Folder($pluginPath);
42
-                $pluginTasks = $this->getPluginPaths($Folder, $plugin);
43
-                $this->tasks = array_merge($this->tasks, $pluginTasks);
44
-            }
45
-        }
33
+		foreach ($paths as $path) {
34
+			$Folder = new Folder($path);
35
+			$this->tasks = $this->getAppPaths($Folder);
36
+		}
37
+		$plugins = Plugin::loaded();
38
+		foreach ($plugins as $plugin) {
39
+			$pluginPaths = App::path('Shell/Task', $plugin);
40
+			foreach ($pluginPaths as $pluginPath) {
41
+				$Folder = new Folder($pluginPath);
42
+				$pluginTasks = $this->getPluginPaths($Folder, $plugin);
43
+				$this->tasks = array_merge($this->tasks, $pluginTasks);
44
+			}
45
+		}
46 46
 
47
-        return $this->tasks;
48
-    }
47
+		return $this->tasks;
48
+	}
49 49
 
50
-    /**
51
-     *
52
-     * @param \Cake\Filesystem\Folder $Folder The directory
53
-     *
54
-     * @return array
55
-     */
56
-    protected function getAppPaths(Folder $Folder)
57
-    {
58
-        $res = array_merge($this->tasks, $Folder->find('Queue.+\.php'));
59
-        foreach ($res as &$r) {
60
-            $r = basename($r, 'Task.php');
61
-        }
50
+	/**
51
+	 *
52
+	 * @param \Cake\Filesystem\Folder $Folder The directory
53
+	 *
54
+	 * @return array
55
+	 */
56
+	protected function getAppPaths(Folder $Folder)
57
+	{
58
+		$res = array_merge($this->tasks, $Folder->find('Queue.+\.php'));
59
+		foreach ($res as &$r) {
60
+			$r = basename($r, 'Task.php');
61
+		}
62 62
 
63
-        return $res;
64
-    }
63
+		return $res;
64
+	}
65 65
 
66
-    /**
67
-     *
68
-     * @param \Cake\Filesystem\Folder $Folder The directory
69
-     * @param string $plugin The plugin name
70
-     *
71
-     * @return array
72
-     */
73
-    protected function getPluginPaths(Folder $Folder, $plugin)
74
-    {
75
-        $res = $Folder->find('Queue.+Task\.php');
76
-        foreach ($res as $key => $r) {
77
-            $name = basename($r, 'Task.php');
78
-            if (in_array($name, $this->tasks)) {
79
-                unset($res[$key]);
80
-                continue;
81
-            }
82
-            $res[$key] = $plugin . '.' . $name;
83
-        }
66
+	/**
67
+	 *
68
+	 * @param \Cake\Filesystem\Folder $Folder The directory
69
+	 * @param string $plugin The plugin name
70
+	 *
71
+	 * @return array
72
+	 */
73
+	protected function getPluginPaths(Folder $Folder, $plugin)
74
+	{
75
+		$res = $Folder->find('Queue.+Task\.php');
76
+		foreach ($res as $key => $r) {
77
+			$name = basename($r, 'Task.php');
78
+			if (in_array($name, $this->tasks)) {
79
+				unset($res[$key]);
80
+				continue;
81
+			}
82
+			$res[$key] = $plugin . '.' . $name;
83
+		}
84 84
 
85
-        return $res;
86
-    }
85
+		return $res;
86
+	}
87 87
 }
Please login to merge, or discard this patch.
src/Shell/QueueShell.php 2 patches
Indentation   +555 added lines, -555 removed lines patch added patch discarded remove patch
@@ -30,570 +30,570 @@
 block discarded – undo
30 30
 class QueueShell extends Shell
31 31
 {
32 32
 
33
-    /**
34
-     *
35
-     * @var string
36
-     */
37
-    public $modelClass = 'Queue.QueuedTasks';
38
-
39
-    /**
40
-     *
41
-     * @var array|null
42
-     */
43
-    protected $_taskConf;
44
-
45
-    /**
46
-     *
47
-     * @var int
48
-     */
49
-    protected $_time = 0;
50
-
51
-    /**
52
-     *
53
-     * @var bool
54
-     */
55
-    protected $_exit = false;
56
-
57
-    /**
58
-     * Overwrite shell initialize to dynamically load all Queue Related Tasks.
59
-     *
60
-     * @return void
61
-     */
62
-    public function initialize()
63
-    {
64
-        $taskFinder = new TaskFinder();
65
-        $this->tasks = $taskFinder->allAppAndPluginTasks();
66
-
67
-        parent::initialize();
68
-    }
69
-
70
-    /**
71
-     *
72
-     * @return void
73
-     */
74
-    public function startup()
75
-    {
76
-        if ($this->param('quiet')) {
77
-            $this->interactive = false;
78
-        }
79
-
80
-        parent::startup();
81
-    }
82
-
83
-    /**
84
-     *
85
-     * @return string
86
-     */
87
-    public function _getDescription()
88
-    {
89
-        $tasks = [];
90
-        foreach ($this->taskNames as $loadedTask) {
91
-            $tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
92
-        }
93
-        $tasks = implode(PHP_EOL, $tasks);
94
-
95
-        $text = <<<TEXT
33
+	/**
34
+	 *
35
+	 * @var string
36
+	 */
37
+	public $modelClass = 'Queue.QueuedTasks';
38
+
39
+	/**
40
+	 *
41
+	 * @var array|null
42
+	 */
43
+	protected $_taskConf;
44
+
45
+	/**
46
+	 *
47
+	 * @var int
48
+	 */
49
+	protected $_time = 0;
50
+
51
+	/**
52
+	 *
53
+	 * @var bool
54
+	 */
55
+	protected $_exit = false;
56
+
57
+	/**
58
+	 * Overwrite shell initialize to dynamically load all Queue Related Tasks.
59
+	 *
60
+	 * @return void
61
+	 */
62
+	public function initialize()
63
+	{
64
+		$taskFinder = new TaskFinder();
65
+		$this->tasks = $taskFinder->allAppAndPluginTasks();
66
+
67
+		parent::initialize();
68
+	}
69
+
70
+	/**
71
+	 *
72
+	 * @return void
73
+	 */
74
+	public function startup()
75
+	{
76
+		if ($this->param('quiet')) {
77
+			$this->interactive = false;
78
+		}
79
+
80
+		parent::startup();
81
+	}
82
+
83
+	/**
84
+	 *
85
+	 * @return string
86
+	 */
87
+	public function _getDescription()
88
+	{
89
+		$tasks = [];
90
+		foreach ($this->taskNames as $loadedTask) {
91
+			$tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
92
+		}
93
+		$tasks = implode(PHP_EOL, $tasks);
94
+
95
+		$text = <<<TEXT
96 96
 Simple and minimalistic job queue (or deferred-task) system.
97 97
 
98 98
 Available Tasks:
99 99
 $tasks
100 100
 TEXT;
101
-        return $text;
102
-    }
103
-
104
-    /**
105
-     * Look for a Queue Task of hte passed name and try to call add() on it.
106
-     * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
107
-     *
108
-     * @return void
109
-     */
110
-    public function add()
111
-    {
112
-        if (count($this->args) < 1) {
113
-            $this->out('Please call like this:');
114
-            $this->out('       bin/cake queue add <taskname>');
115
-            $this->_displayAvailableTasks();
116
-
117
-            return;
118
-        }
119
-
120
-        $name = Inflector::camelize($this->args[0]);
121
-        if (in_array('Queue' . $name, $this->taskNames, true)) {
122
-            /** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
123
-            $task = $this->{'Queue' . $name};
124
-            if (!($task instanceof AddInterface)) {
125
-                $this->abort('This task does not support adding via CLI call');
126
-            }
127
-            $task->add();
128
-        } else {
129
-            $this->out('Error: Task not found: ' . $name);
130
-            $this->_displayAvailableTasks();
131
-        }
132
-    }
133
-
134
-    /**
135
-     * Output the task without Queue or Task
136
-     * example: QueueImageTask becomes Image on display
137
-     *
138
-     * @param string $task Task name
139
-     * @return string Cleaned task name
140
-     */
141
-    protected function _taskName($task)
142
-    {
143
-        if (strpos($task, 'Queue') === 0) {
144
-            return substr($task, 5);
145
-        }
146
-        return $task;
147
-    }
148
-
149
-    /**
150
-     * Run a QueueWorker loop.
151
-     * Runs a Queue Worker process which will try to find unassigned jobs in the queue
152
-     * which it may run and try to fetch and execute them.
153
-     *
154
-     * @return int|null
155
-     */
156
-    public function runworker()
157
-    {
158
-        try {
159
-            $pid = $this->_initPid();
160
-        } catch (PersistenceFailedException $exception) {
161
-            $this->err($exception->getMessage());
162
-            $limit = (int)Configure::read('Queue.maxWorkers');
163
-            if ($limit) {
164
-                $this->out('Cannot start worker: Too many workers already/still running on this server (' . $limit . '/' . $limit . ')');
165
-            }
166
-            return static::CODE_ERROR;
167
-        }
168
-
169
-        // Enable Garbage Collector (PHP >= 5.3)
170
-        if (function_exists('gc_enable')) {
171
-            gc_enable();
172
-        }
173
-        if (function_exists('pcntl_signal')) {
174
-            pcntl_signal(SIGTERM, [
175
-                &$this,
176
-                '_exit'
177
-            ]);
178
-            pcntl_signal(SIGINT, [
179
-                &$this,
180
-                '_exit'
181
-            ]);
182
-            pcntl_signal(SIGTSTP, [
183
-                &$this,
184
-                '_exit'
185
-            ]);
186
-            pcntl_signal(SIGQUIT, [
187
-                &$this,
188
-                '_exit'
189
-            ]);
190
-        }
191
-        $this->_exit = false;
192
-
193
-        $startTime = time();
194
-        $types = $this->_stringToArray($this->param('type'));
195
-
196
-        while (!$this->_exit) {
197
-            $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
198
-
199
-            $QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
200
-
201
-            if ($QueuedTask) {
202
-                $this->runJob($QueuedTask, $pid);
203
-            } elseif (Configure::read('Queue.exitWhenNothingToDo')) {
204
-                $this->out('nothing to do, exiting.');
205
-                $this->_exit = true;
206
-            } else {
207
-                $this->out('nothing to do, sleeping.');
208
-                sleep(Config::sleepTime());
209
-            }
210
-
211
-            // check if we are over the maximum runtime and end processing if so.
212
-            if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
213
-                $this->_exit = true;
214
-                $this->out('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
215
-            }
216
-            if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
217
-                $this->out(__d('queue', 'Performing old job cleanup.'));
218
-                $this->QueuedTasks->cleanOldJobs();
219
-            }
220
-            $this->hr();
221
-        }
222
-
223
-        $this->_deletePid($pid);
224
-
225
-        if ($this->param('verbose')) {
226
-            $this->_log('endworker', $pid);
227
-        }
228
-    }
229
-
230
-    /**
231
-     *
232
-     * @param \Queue\Model\Entity\QueuedTask $QueuedTask
233
-     * @param string $pid
234
-     * @return void
235
-     */
236
-    protected function runJob(QueuedTask $QueuedTask, $pid)
237
-    {
238
-        $this->out('Running Job of type "' . $QueuedTask->task . '"');
239
-        $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, $pid, false);
240
-        $taskName = 'Queue' . $QueuedTask->task;
241
-
242
-        try {
243
-            $this->_time = time();
244
-
245
-            $data = unserialize($QueuedTask->data);
246
-            /** @var \Queue\Shell\Task\QueueTask $task */
247
-            $task = $this->{$taskName};
248
-            if (!$task instanceof QueueTaskInterface) {
249
-                throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
250
-            }
251
-
252
-            $return = $task->run((array)$data, $QueuedTask->id);
253
-            if ($return !== null) {
254
-                trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
255
-            }
256
-            $failureMessage = $taskName . ' failed';
257
-        } catch (Throwable $e) {
258
-            $return = false;
259
-
260
-            $failureMessage = get_class($e) . ': ' . $e->getMessage();
261
-            if (!($e instanceof QueueException)) {
262
-                $failureMessage .= "\n" . $e->getTraceAsString();
263
-            }
264
-
265
-            $this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage, $pid);
266
-        } catch (Exception $e) {
267
-            $return = false;
268
-
269
-            $failureMessage = get_class($e) . ': ' . $e->getMessage();
270
-            $this->_logError($taskName . "\n" . $failureMessage, $pid);
271
-        }
272
-
273
-        if ($return === false) {
274
-            $this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
275
-            $failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
276
-            $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus, $pid);
277
-            $this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
278
-            return;
279
-        }
280
-
281
-        $this->QueuedTasks->markJobDone($QueuedTask);
282
-        $this->out('Job Finished.');
283
-    }
284
-
285
-    /**
286
-     * Manually trigger a Finished job cleanup.
287
-     *
288
-     * @return void
289
-     */
290
-    public function clean()
291
-    {
292
-        if (!Configure::read('Queue.cleanupTimeout')) {
293
-            $this->abort('You disabled cleanuptimout in config. Aborting.');
294
-        }
295
-
296
-        $this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
297
-        $this->QueuedTasks->cleanOldJobs();
298
-    }
299
-
300
-    /**
301
-     * Display current settings
302
-     *
303
-     * @return void
304
-     */
305
-    public function settings()
306
-    {
307
-        $this->out('Current Settings:');
308
-        $conf = (array)Configure::read('Queue');
309
-        foreach ($conf as $key => $val) {
310
-            if ($val === false) {
311
-                $val = 'no';
312
-            }
313
-            if ($val === true) {
314
-                $val = 'yes';
315
-            }
316
-            $this->out('* ' . $key . ': ' . print_r($val, true));
317
-        }
318
-
319
-        $this->out();
320
-    }
321
-
322
-    /**
323
-     * Display some statistics about Finished Jobs.
324
-     *
325
-     * @return void
326
-     */
327
-    public function stats()
328
-    {
329
-        $this->out('Jobs currently in the queue:');
330
-
331
-        $types = $this->QueuedTasks->getTypes()->toArray();
332
-        foreach ($types as $type) {
333
-            $this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
334
-        }
335
-        $this->hr();
336
-        $this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
337
-        $this->hr();
338
-        $this->out('Finished job statistics:');
339
-        $data = $this->QueuedTasks->getStats();
340
-        foreach ($data as $item) {
341
-            $this->out(' ' . $item['task'] . ': ');
342
-            $this->out('   Finished Jobs in Database: ' . $item['num']);
343
-            $this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
344
-            $this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
345
-            $this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
346
-        }
347
-    }
348
-
349
-    /**
350
-     * Get option parser method to parse commandline options
351
-     *
352
-     * @return \Cake\Console\ConsoleOptionParser
353
-     */
354
-    public function getOptionParser()
355
-    {
356
-        $subcommandParser = [
357
-            'options' => [ /*
101
+		return $text;
102
+	}
103
+
104
+	/**
105
+	 * Look for a Queue Task of hte passed name and try to call add() on it.
106
+	 * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
107
+	 *
108
+	 * @return void
109
+	 */
110
+	public function add()
111
+	{
112
+		if (count($this->args) < 1) {
113
+			$this->out('Please call like this:');
114
+			$this->out('       bin/cake queue add <taskname>');
115
+			$this->_displayAvailableTasks();
116
+
117
+			return;
118
+		}
119
+
120
+		$name = Inflector::camelize($this->args[0]);
121
+		if (in_array('Queue' . $name, $this->taskNames, true)) {
122
+			/** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
123
+			$task = $this->{'Queue' . $name};
124
+			if (!($task instanceof AddInterface)) {
125
+				$this->abort('This task does not support adding via CLI call');
126
+			}
127
+			$task->add();
128
+		} else {
129
+			$this->out('Error: Task not found: ' . $name);
130
+			$this->_displayAvailableTasks();
131
+		}
132
+	}
133
+
134
+	/**
135
+	 * Output the task without Queue or Task
136
+	 * example: QueueImageTask becomes Image on display
137
+	 *
138
+	 * @param string $task Task name
139
+	 * @return string Cleaned task name
140
+	 */
141
+	protected function _taskName($task)
142
+	{
143
+		if (strpos($task, 'Queue') === 0) {
144
+			return substr($task, 5);
145
+		}
146
+		return $task;
147
+	}
148
+
149
+	/**
150
+	 * Run a QueueWorker loop.
151
+	 * Runs a Queue Worker process which will try to find unassigned jobs in the queue
152
+	 * which it may run and try to fetch and execute them.
153
+	 *
154
+	 * @return int|null
155
+	 */
156
+	public function runworker()
157
+	{
158
+		try {
159
+			$pid = $this->_initPid();
160
+		} catch (PersistenceFailedException $exception) {
161
+			$this->err($exception->getMessage());
162
+			$limit = (int)Configure::read('Queue.maxWorkers');
163
+			if ($limit) {
164
+				$this->out('Cannot start worker: Too many workers already/still running on this server (' . $limit . '/' . $limit . ')');
165
+			}
166
+			return static::CODE_ERROR;
167
+		}
168
+
169
+		// Enable Garbage Collector (PHP >= 5.3)
170
+		if (function_exists('gc_enable')) {
171
+			gc_enable();
172
+		}
173
+		if (function_exists('pcntl_signal')) {
174
+			pcntl_signal(SIGTERM, [
175
+				&$this,
176
+				'_exit'
177
+			]);
178
+			pcntl_signal(SIGINT, [
179
+				&$this,
180
+				'_exit'
181
+			]);
182
+			pcntl_signal(SIGTSTP, [
183
+				&$this,
184
+				'_exit'
185
+			]);
186
+			pcntl_signal(SIGQUIT, [
187
+				&$this,
188
+				'_exit'
189
+			]);
190
+		}
191
+		$this->_exit = false;
192
+
193
+		$startTime = time();
194
+		$types = $this->_stringToArray($this->param('type'));
195
+
196
+		while (!$this->_exit) {
197
+			$this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
198
+
199
+			$QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
200
+
201
+			if ($QueuedTask) {
202
+				$this->runJob($QueuedTask, $pid);
203
+			} elseif (Configure::read('Queue.exitWhenNothingToDo')) {
204
+				$this->out('nothing to do, exiting.');
205
+				$this->_exit = true;
206
+			} else {
207
+				$this->out('nothing to do, sleeping.');
208
+				sleep(Config::sleepTime());
209
+			}
210
+
211
+			// check if we are over the maximum runtime and end processing if so.
212
+			if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
213
+				$this->_exit = true;
214
+				$this->out('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.');
215
+			}
216
+			if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
217
+				$this->out(__d('queue', 'Performing old job cleanup.'));
218
+				$this->QueuedTasks->cleanOldJobs();
219
+			}
220
+			$this->hr();
221
+		}
222
+
223
+		$this->_deletePid($pid);
224
+
225
+		if ($this->param('verbose')) {
226
+			$this->_log('endworker', $pid);
227
+		}
228
+	}
229
+
230
+	/**
231
+	 *
232
+	 * @param \Queue\Model\Entity\QueuedTask $QueuedTask
233
+	 * @param string $pid
234
+	 * @return void
235
+	 */
236
+	protected function runJob(QueuedTask $QueuedTask, $pid)
237
+	{
238
+		$this->out('Running Job of type "' . $QueuedTask->task . '"');
239
+		$this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, $pid, false);
240
+		$taskName = 'Queue' . $QueuedTask->task;
241
+
242
+		try {
243
+			$this->_time = time();
244
+
245
+			$data = unserialize($QueuedTask->data);
246
+			/** @var \Queue\Shell\Task\QueueTask $task */
247
+			$task = $this->{$taskName};
248
+			if (!$task instanceof QueueTaskInterface) {
249
+				throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
250
+			}
251
+
252
+			$return = $task->run((array)$data, $QueuedTask->id);
253
+			if ($return !== null) {
254
+				trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
255
+			}
256
+			$failureMessage = $taskName . ' failed';
257
+		} catch (Throwable $e) {
258
+			$return = false;
259
+
260
+			$failureMessage = get_class($e) . ': ' . $e->getMessage();
261
+			if (!($e instanceof QueueException)) {
262
+				$failureMessage .= "\n" . $e->getTraceAsString();
263
+			}
264
+
265
+			$this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage, $pid);
266
+		} catch (Exception $e) {
267
+			$return = false;
268
+
269
+			$failureMessage = get_class($e) . ': ' . $e->getMessage();
270
+			$this->_logError($taskName . "\n" . $failureMessage, $pid);
271
+		}
272
+
273
+		if ($return === false) {
274
+			$this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
275
+			$failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
276
+			$this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus, $pid);
277
+			$this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
278
+			return;
279
+		}
280
+
281
+		$this->QueuedTasks->markJobDone($QueuedTask);
282
+		$this->out('Job Finished.');
283
+	}
284
+
285
+	/**
286
+	 * Manually trigger a Finished job cleanup.
287
+	 *
288
+	 * @return void
289
+	 */
290
+	public function clean()
291
+	{
292
+		if (!Configure::read('Queue.cleanupTimeout')) {
293
+			$this->abort('You disabled cleanuptimout in config. Aborting.');
294
+		}
295
+
296
+		$this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
297
+		$this->QueuedTasks->cleanOldJobs();
298
+	}
299
+
300
+	/**
301
+	 * Display current settings
302
+	 *
303
+	 * @return void
304
+	 */
305
+	public function settings()
306
+	{
307
+		$this->out('Current Settings:');
308
+		$conf = (array)Configure::read('Queue');
309
+		foreach ($conf as $key => $val) {
310
+			if ($val === false) {
311
+				$val = 'no';
312
+			}
313
+			if ($val === true) {
314
+				$val = 'yes';
315
+			}
316
+			$this->out('* ' . $key . ': ' . print_r($val, true));
317
+		}
318
+
319
+		$this->out();
320
+	}
321
+
322
+	/**
323
+	 * Display some statistics about Finished Jobs.
324
+	 *
325
+	 * @return void
326
+	 */
327
+	public function stats()
328
+	{
329
+		$this->out('Jobs currently in the queue:');
330
+
331
+		$types = $this->QueuedTasks->getTypes()->toArray();
332
+		foreach ($types as $type) {
333
+			$this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
334
+		}
335
+		$this->hr();
336
+		$this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
337
+		$this->hr();
338
+		$this->out('Finished job statistics:');
339
+		$data = $this->QueuedTasks->getStats();
340
+		foreach ($data as $item) {
341
+			$this->out(' ' . $item['task'] . ': ');
342
+			$this->out('   Finished Jobs in Database: ' . $item['num']);
343
+			$this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
344
+			$this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
345
+			$this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
346
+		}
347
+	}
348
+
349
+	/**
350
+	 * Get option parser method to parse commandline options
351
+	 *
352
+	 * @return \Cake\Console\ConsoleOptionParser
353
+	 */
354
+	public function getOptionParser()
355
+	{
356
+		$subcommandParser = [
357
+			'options' => [ /*
358 358
                              * 'dry-run'=> array(
359 359
                              * 'short' => 'd',
360 360
                              * 'help' => 'Dry run the update, no jobs will actually be added.',
361 361
                              * 'boolean' => true
362 362
                              * ),
363 363
                              */
364
-            ]
365
-        ];
366
-        $subcommandParserFull = $subcommandParser;
367
-        $subcommandParserFull['options']['type'] = [
368
-            'short' => 't',
369
-            'help' => 'Type (comma separated list possible)',
370
-            'default' => null
371
-        ];
372
-
373
-        return parent::getOptionParser()->setDescription($this->_getDescription())
374
-            ->addSubcommand('clean', [
375
-            'help' => 'Remove old jobs (cleanup)',
376
-            'parser' => $subcommandParser
377
-        ])
378
-            ->addSubcommand('add', [
379
-            'help' => 'Add Job',
380
-            'parser' => $subcommandParser
381
-        ])
382
-            ->addSubcommand('stats', [
383
-            'help' => 'Stats',
384
-            'parser' => $subcommandParserFull
385
-        ])
386
-            ->addSubcommand('settings', [
387
-            'help' => 'Settings',
388
-            'parser' => $subcommandParserFull
389
-        ])
390
-            ->addSubcommand('runworker', [
391
-            'help' => 'Run Worker',
392
-            'parser' => $subcommandParserFull
393
-        ]);
394
-    }
395
-
396
-    /**
397
-     * Timestamped log.
398
-     *
399
-     * @param string $message Log type
400
-     * @param string|null $pid PID of the process
401
-     * @param bool $addDetails
402
-     * @return void
403
-     */
404
-    protected function _log($message, $pid = null, $addDetails = true)
405
-    {
406
-        if (!Configure::read('Queue.log')) {
407
-            return;
408
-        }
409
-
410
-        if ($addDetails) {
411
-            $timeNeeded = $this->_timeNeeded();
412
-            $memoryUsage = $this->_memoryUsage();
413
-            $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
414
-        }
415
-
416
-        if ($pid) {
417
-            $message .= ' (pid ' . $pid . ')';
418
-        }
419
-        Log::write('info', $message, [
420
-            'scope' => 'queue'
421
-        ]);
422
-    }
423
-
424
-    /**
425
-     *
426
-     * @param string $message
427
-     * @param string|null $pid PID of the process
428
-     * @return void
429
-     */
430
-    protected function _logError($message, $pid = null)
431
-    {
432
-        $timeNeeded = $this->_timeNeeded();
433
-        $memoryUsage = $this->_memoryUsage();
434
-        $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
435
-
436
-        if ($pid) {
437
-            $message .= ' (pid ' . $pid . ')';
438
-        }
439
-
440
-        Log::write('error', $message);
441
-    }
442
-
443
-    /**
444
-     * Returns a List of available QueueTasks and their individual configurations.
445
-     *
446
-     * @return array
447
-     */
448
-    protected function _getTaskConf()
449
-    {
450
-        if (!is_array($this->_taskConf)) {
451
-            $this->_taskConf = [];
452
-            foreach ($this->tasks as $task) {
453
-                list ($pluginName, $taskName) = pluginSplit($task);
454
-
455
-                $this->_taskConf[$taskName]['name'] = substr($taskName, 5);
456
-                $this->_taskConf[$taskName]['plugin'] = $pluginName;
457
-                if (property_exists($this->{$taskName}, 'timeout')) {
458
-                    $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
459
-                } else {
460
-                    $this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
461
-                }
462
-                if (property_exists($this->{$taskName}, 'retries')) {
463
-                    $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
464
-                } else {
465
-                    $this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
466
-                }
467
-                if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
468
-                    $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
469
-                } else {
470
-                    $this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
471
-                }
472
-            }
473
-        }
474
-        return $this->_taskConf;
475
-    }
476
-
477
-    /**
478
-     * Signal handling to queue worker for clean shutdown
479
-     *
480
-     * @param int $signal
481
-     * @return void
482
-     */
483
-    protected function _exit($signal)
484
-    {
485
-        $this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
486
-        $this->_exit = true;
487
-    }
488
-
489
-    /**
490
-     *
491
-     * @return void
492
-     */
493
-    protected function _displayAvailableTasks()
494
-    {
495
-        $this->out('Available Tasks:');
496
-        foreach ($this->taskNames as $loadedTask) {
497
-            $this->out("\t" . '* ' . $this->_taskName($loadedTask));
498
-        }
499
-    }
500
-
501
-    /**
502
-     *
503
-     * @return string
504
-     */
505
-    protected function _initPid()
506
-    {
507
-        $this->_pid = $this->_retrievePid();
508
-        return $this->_pid;
509
-    }
510
-
511
-    /**
512
-     *
513
-     * @return string
514
-     */
515
-    protected function _retrievePid()
516
-    {
517
-        if (function_exists('posix_getpid')) {
518
-            $pid = (string)posix_getpid();
519
-        } else {
520
-            $pid = $this->QueuedTasks->key();
521
-        }
522
-        return $pid;
523
-    }
524
-
525
-    /**
526
-     *
527
-     * @return string Memory usage in MB.
528
-     */
529
-    protected function _memoryUsage()
530
-    {
531
-        $limit = ini_get('memory_limit');
532
-
533
-        $used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
534
-        if ($limit !== '-1') {
535
-            $used .= '/' . $limit;
536
-        }
537
-
538
-        return $used;
539
-    }
540
-
541
-    /**
542
-     *
543
-     * @param string|null $pid
544
-     *
545
-     * @return void
546
-     */
547
-    protected function _deletePid($pid)
548
-    {
549
-        if (!$pid) {
550
-            $pid = $this->_pid;
551
-        }
552
-        if (!$pid) {
553
-            return;
554
-        }
555
-    }
556
-
557
-    /**
558
-     *
559
-     * @return string
560
-     */
561
-    protected function _timeNeeded()
562
-    {
563
-        $diff = $this->_time() - $this->_time($this->_time);
564
-        $seconds = max($diff, 1);
565
-
566
-        return $seconds . 's';
567
-    }
568
-
569
-    /**
570
-     *
571
-     * @param int|null $providedTime
572
-     *
573
-     * @return int
574
-     */
575
-    protected function _time($providedTime = null)
576
-    {
577
-        if ($providedTime) {
578
-            return $providedTime;
579
-        }
580
-
581
-        return time();
582
-    }
583
-
584
-    /**
585
-     *
586
-     * @param string|null $param
587
-     * @return array
588
-     */
589
-    protected function _stringToArray($param)
590
-    {
591
-        if (!$param) {
592
-            return [];
593
-        }
594
-
595
-        $array = Text::tokenize($param);
596
-
597
-        return array_filter($array);
598
-    }
364
+			]
365
+		];
366
+		$subcommandParserFull = $subcommandParser;
367
+		$subcommandParserFull['options']['type'] = [
368
+			'short' => 't',
369
+			'help' => 'Type (comma separated list possible)',
370
+			'default' => null
371
+		];
372
+
373
+		return parent::getOptionParser()->setDescription($this->_getDescription())
374
+			->addSubcommand('clean', [
375
+			'help' => 'Remove old jobs (cleanup)',
376
+			'parser' => $subcommandParser
377
+		])
378
+			->addSubcommand('add', [
379
+			'help' => 'Add Job',
380
+			'parser' => $subcommandParser
381
+		])
382
+			->addSubcommand('stats', [
383
+			'help' => 'Stats',
384
+			'parser' => $subcommandParserFull
385
+		])
386
+			->addSubcommand('settings', [
387
+			'help' => 'Settings',
388
+			'parser' => $subcommandParserFull
389
+		])
390
+			->addSubcommand('runworker', [
391
+			'help' => 'Run Worker',
392
+			'parser' => $subcommandParserFull
393
+		]);
394
+	}
395
+
396
+	/**
397
+	 * Timestamped log.
398
+	 *
399
+	 * @param string $message Log type
400
+	 * @param string|null $pid PID of the process
401
+	 * @param bool $addDetails
402
+	 * @return void
403
+	 */
404
+	protected function _log($message, $pid = null, $addDetails = true)
405
+	{
406
+		if (!Configure::read('Queue.log')) {
407
+			return;
408
+		}
409
+
410
+		if ($addDetails) {
411
+			$timeNeeded = $this->_timeNeeded();
412
+			$memoryUsage = $this->_memoryUsage();
413
+			$message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
414
+		}
415
+
416
+		if ($pid) {
417
+			$message .= ' (pid ' . $pid . ')';
418
+		}
419
+		Log::write('info', $message, [
420
+			'scope' => 'queue'
421
+		]);
422
+	}
423
+
424
+	/**
425
+	 *
426
+	 * @param string $message
427
+	 * @param string|null $pid PID of the process
428
+	 * @return void
429
+	 */
430
+	protected function _logError($message, $pid = null)
431
+	{
432
+		$timeNeeded = $this->_timeNeeded();
433
+		$memoryUsage = $this->_memoryUsage();
434
+		$message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
435
+
436
+		if ($pid) {
437
+			$message .= ' (pid ' . $pid . ')';
438
+		}
439
+
440
+		Log::write('error', $message);
441
+	}
442
+
443
+	/**
444
+	 * Returns a List of available QueueTasks and their individual configurations.
445
+	 *
446
+	 * @return array
447
+	 */
448
+	protected function _getTaskConf()
449
+	{
450
+		if (!is_array($this->_taskConf)) {
451
+			$this->_taskConf = [];
452
+			foreach ($this->tasks as $task) {
453
+				list ($pluginName, $taskName) = pluginSplit($task);
454
+
455
+				$this->_taskConf[$taskName]['name'] = substr($taskName, 5);
456
+				$this->_taskConf[$taskName]['plugin'] = $pluginName;
457
+				if (property_exists($this->{$taskName}, 'timeout')) {
458
+					$this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
459
+				} else {
460
+					$this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
461
+				}
462
+				if (property_exists($this->{$taskName}, 'retries')) {
463
+					$this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
464
+				} else {
465
+					$this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
466
+				}
467
+				if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
468
+					$this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
469
+				} else {
470
+					$this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
471
+				}
472
+			}
473
+		}
474
+		return $this->_taskConf;
475
+	}
476
+
477
+	/**
478
+	 * Signal handling to queue worker for clean shutdown
479
+	 *
480
+	 * @param int $signal
481
+	 * @return void
482
+	 */
483
+	protected function _exit($signal)
484
+	{
485
+		$this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
486
+		$this->_exit = true;
487
+	}
488
+
489
+	/**
490
+	 *
491
+	 * @return void
492
+	 */
493
+	protected function _displayAvailableTasks()
494
+	{
495
+		$this->out('Available Tasks:');
496
+		foreach ($this->taskNames as $loadedTask) {
497
+			$this->out("\t" . '* ' . $this->_taskName($loadedTask));
498
+		}
499
+	}
500
+
501
+	/**
502
+	 *
503
+	 * @return string
504
+	 */
505
+	protected function _initPid()
506
+	{
507
+		$this->_pid = $this->_retrievePid();
508
+		return $this->_pid;
509
+	}
510
+
511
+	/**
512
+	 *
513
+	 * @return string
514
+	 */
515
+	protected function _retrievePid()
516
+	{
517
+		if (function_exists('posix_getpid')) {
518
+			$pid = (string)posix_getpid();
519
+		} else {
520
+			$pid = $this->QueuedTasks->key();
521
+		}
522
+		return $pid;
523
+	}
524
+
525
+	/**
526
+	 *
527
+	 * @return string Memory usage in MB.
528
+	 */
529
+	protected function _memoryUsage()
530
+	{
531
+		$limit = ini_get('memory_limit');
532
+
533
+		$used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
534
+		if ($limit !== '-1') {
535
+			$used .= '/' . $limit;
536
+		}
537
+
538
+		return $used;
539
+	}
540
+
541
+	/**
542
+	 *
543
+	 * @param string|null $pid
544
+	 *
545
+	 * @return void
546
+	 */
547
+	protected function _deletePid($pid)
548
+	{
549
+		if (!$pid) {
550
+			$pid = $this->_pid;
551
+		}
552
+		if (!$pid) {
553
+			return;
554
+		}
555
+	}
556
+
557
+	/**
558
+	 *
559
+	 * @return string
560
+	 */
561
+	protected function _timeNeeded()
562
+	{
563
+		$diff = $this->_time() - $this->_time($this->_time);
564
+		$seconds = max($diff, 1);
565
+
566
+		return $seconds . 's';
567
+	}
568
+
569
+	/**
570
+	 *
571
+	 * @param int|null $providedTime
572
+	 *
573
+	 * @return int
574
+	 */
575
+	protected function _time($providedTime = null)
576
+	{
577
+		if ($providedTime) {
578
+			return $providedTime;
579
+		}
580
+
581
+		return time();
582
+	}
583
+
584
+	/**
585
+	 *
586
+	 * @param string|null $param
587
+	 * @return array
588
+	 */
589
+	protected function _stringToArray($param)
590
+	{
591
+		if (!$param) {
592
+			return [];
593
+		}
594
+
595
+		$array = Text::tokenize($param);
596
+
597
+		return array_filter($array);
598
+	}
599 599
 }
Please login to merge, or discard this patch.
Spacing   +2 added lines, -2 removed lines patch added patch discarded remove patch
@@ -17,7 +17,7 @@  discard block
 block discarded – undo
17 17
 use Queue\Shell\Task\QueueTaskInterface;
18 18
 use RuntimeException;
19 19
 use Throwable;
20
-declare(ticks = 1);
20
+declare(ticks=1);
21 21
 
22 22
 /**
23 23
  * Main shell to init and run queue workers.
@@ -354,7 +354,7 @@  discard block
 block discarded – undo
354 354
     public function getOptionParser()
355 355
     {
356 356
         $subcommandParser = [
357
-            'options' => [ /*
357
+            'options' => [/*
358 358
                              * 'dry-run'=> array(
359 359
                              * 'short' => 'd',
360 360
                              * 'help' => 'Dry run the update, no jobs will actually be added.',
Please login to merge, or discard this patch.
src/Shell/Task/QueueTask.php 1 patch
Indentation   +45 added lines, -45 removed lines patch added patch discarded remove patch
@@ -18,57 +18,57 @@
 block discarded – undo
18 18
 abstract class QueueTask extends Shell implements QueueTaskInterface
19 19
 {
20 20
 
21
-    /**
22
-     *
23
-     * @var string
24
-     */
25
-    public $queueModelClass = 'Queue.QueuedTasks';
21
+	/**
22
+	 *
23
+	 * @var string
24
+	 */
25
+	public $queueModelClass = 'Queue.QueuedTasks';
26 26
 
27
-    /**
28
-     *
29
-     * @var \Queue\Model\Table\QueuedTasksTable
30
-     */
31
-    public $QueuedTasks;
27
+	/**
28
+	 *
29
+	 * @var \Queue\Model\Table\QueuedTasksTable
30
+	 */
31
+	public $QueuedTasks;
32 32
 
33
-    /**
34
-     * Timeout for run, after which the Task is reassigned to a new worker.
35
-     *
36
-     * @var int
37
-     */
38
-    public $timeout = 120;
33
+	/**
34
+	 * Timeout for run, after which the Task is reassigned to a new worker.
35
+	 *
36
+	 * @var int
37
+	 */
38
+	public $timeout = 120;
39 39
 
40
-    /**
41
-     * Number of times a failed instance of this task should be restarted before giving up.
42
-     *
43
-     * @var int
44
-     */
45
-    public $retries = 1;
40
+	/**
41
+	 * Number of times a failed instance of this task should be restarted before giving up.
42
+	 *
43
+	 * @var int
44
+	 */
45
+	public $retries = 1;
46 46
 
47
-    /**
48
-     *
49
-     * @param \Cake\Console\ConsoleIo|null $io IO
50
-     */
51
-    public function __construct(ConsoleIo $io = null)
52
-    {
53
-        parent::__construct($io);
47
+	/**
48
+	 *
49
+	 * @param \Cake\Console\ConsoleIo|null $io IO
50
+	 */
51
+	public function __construct(ConsoleIo $io = null)
52
+	{
53
+		parent::__construct($io);
54 54
 
55
-        $this->loadModel($this->queueModelClass);
56
-    }
55
+		$this->loadModel($this->queueModelClass);
56
+	}
57 57
 
58
-    /**
59
-     *
60
-     * @return string
61
-     * @throws \InvalidArgumentException
62
-     */
63
-    protected function queueTaskName()
64
-    {
65
-        $class = get_class($this);
58
+	/**
59
+	 *
60
+	 * @return string
61
+	 * @throws \InvalidArgumentException
62
+	 */
63
+	protected function queueTaskName()
64
+	{
65
+		$class = get_class($this);
66 66
 
67
-        preg_match('#\\\\Queue(.+)Task$#', $class, $matches);
68
-        if (!$matches) {
69
-            throw new InvalidArgumentException('Invalid class name: ' . $class);
70
-        }
67
+		preg_match('#\\\\Queue(.+)Task$#', $class, $matches);
68
+		if (!$matches) {
69
+			throw new InvalidArgumentException('Invalid class name: ' . $class);
70
+		}
71 71
 
72
-        return $matches[1];
73
-    }
72
+		return $matches[1];
73
+	}
74 74
 }
Please login to merge, or discard this patch.
src/Shell/Task/QueueExampleTask.php 1 patch
Indentation   +72 added lines, -72 removed lines patch added patch discarded remove patch
@@ -7,82 +7,82 @@
 block discarded – undo
7 7
 class QueueExampleTask extends QueueTask implements AddInterface
8 8
 {
9 9
 
10
-    /**
11
-     * Timeout for run, after which the task is reassigned to a new worker.
12
-     *
13
-     * @var int
14
-     */
15
-    public $timeout = 10;
10
+	/**
11
+	 * Timeout for run, after which the task is reassigned to a new worker.
12
+	 *
13
+	 * @var int
14
+	 */
15
+	public $timeout = 10;
16 16
 
17
-    /**
18
-     * Timeout for cleanup, after which completed jobs are deleted (in seconds).
19
-     *
20
-     * @var int
21
-     */
22
-    public $cleanupTimeout = 600;
17
+	/**
18
+	 * Timeout for cleanup, after which completed jobs are deleted (in seconds).
19
+	 *
20
+	 * @var int
21
+	 */
22
+	public $cleanupTimeout = 600;
23 23
 
24
-    /**
25
-     * Number of times a failed instance of this task should be restarted before giving up.
26
-     *
27
-     * @var int
28
-     */
29
-    public $retries = 0;
24
+	/**
25
+	 * Number of times a failed instance of this task should be restarted before giving up.
26
+	 *
27
+	 * @var int
28
+	 */
29
+	public $retries = 0;
30 30
 
31
-    /**
32
-     * Stores any failure messages triggered during run().
33
-     *
34
-     * @var string
35
-     */
36
-    public $failureMessage = '';
31
+	/**
32
+	 * Stores any failure messages triggered during run().
33
+	 *
34
+	 * @var string
35
+	 */
36
+	public $failureMessage = '';
37 37
 
38
-    /**
39
-     * Example add functionality.
40
-     * Will create one example job in the queue, which later will be executed using run();
41
-     *
42
-     * To invoke from CLI execute:
43
-     * - bin/cake queue add Example
44
-     *
45
-     * @return void
46
-     */
47
-    public function add(): void
48
-    {
49
-        $this->out(__d('queue', 'CakePHP Queue Example task.'));
50
-        $this->hr();
51
-        $this->out(__d('queue', 'This is a very simple example of a queueTask.'));
52
-        $this->out(__d('queue', 'Now adding an example Task Job into the Queue.'));
53
-        $this->out(__d('queue', 'This task will only produce some console output on the worker that it runs on.'));
54
-        $this->out(' ');
55
-        $this->out(__d('queue', 'To run a Worker use:'));
56
-        $this->out(__d('queue', '	cake queue runworker'));
57
-        $this->out(' ');
58
-        $this->out(__d('queue', 'You can find the sourcecode of this task in: '));
59
-        $this->out(__FILE__);
60
-        $this->out(' ');
38
+	/**
39
+	 * Example add functionality.
40
+	 * Will create one example job in the queue, which later will be executed using run();
41
+	 *
42
+	 * To invoke from CLI execute:
43
+	 * - bin/cake queue add Example
44
+	 *
45
+	 * @return void
46
+	 */
47
+	public function add(): void
48
+	{
49
+		$this->out(__d('queue', 'CakePHP Queue Example task.'));
50
+		$this->hr();
51
+		$this->out(__d('queue', 'This is a very simple example of a queueTask.'));
52
+		$this->out(__d('queue', 'Now adding an example Task Job into the Queue.'));
53
+		$this->out(__d('queue', 'This task will only produce some console output on the worker that it runs on.'));
54
+		$this->out(' ');
55
+		$this->out(__d('queue', 'To run a Worker use:'));
56
+		$this->out(__d('queue', '	cake queue runworker'));
57
+		$this->out(' ');
58
+		$this->out(__d('queue', 'You can find the sourcecode of this task in: '));
59
+		$this->out(__FILE__);
60
+		$this->out(' ');
61 61
 
62
-        // Adding a task of type 'example' with no additionally passed data
63
-        if ($this->QueuedTasks->createJob('Example')) {
64
-            $this->out(__d('queue', 'OK, job created, now run the worker'));
65
-        } else {
66
-            $this->err(__d('queue', 'Could not create Job'));
67
-        }
68
-    }
62
+		// Adding a task of type 'example' with no additionally passed data
63
+		if ($this->QueuedTasks->createJob('Example')) {
64
+			$this->out(__d('queue', 'OK, job created, now run the worker'));
65
+		} else {
66
+			$this->err(__d('queue', 'Could not create Job'));
67
+		}
68
+	}
69 69
 
70
-    /**
71
-     * Example run function.
72
-     * This function is executed, when a worker is executing a task.
73
-     * The return parameter will determine, if the task will be marked completed, or be requeued.
74
-     *
75
-     * @param array $data The array passed to QueuedTasksTable::createJob()
76
-     * @param int $taskId The id of the QueuedTask entity
77
-     * @return void
78
-     */
79
-    public function run(array $data, $taskId): void
80
-    {
81
-        $this->hr();
82
-        $this->out(__d('queue', 'CakePHP Queue Example task.'));
83
-        $this->hr();
84
-        $this->out(__d('queue', ' ->Success, the Example Task was run.<-'));
85
-        $this->out(' ');
86
-        $this->out(' ');
87
-    }
70
+	/**
71
+	 * Example run function.
72
+	 * This function is executed, when a worker is executing a task.
73
+	 * The return parameter will determine, if the task will be marked completed, or be requeued.
74
+	 *
75
+	 * @param array $data The array passed to QueuedTasksTable::createJob()
76
+	 * @param int $taskId The id of the QueuedTask entity
77
+	 * @return void
78
+	 */
79
+	public function run(array $data, $taskId): void
80
+	{
81
+		$this->hr();
82
+		$this->out(__d('queue', 'CakePHP Queue Example task.'));
83
+		$this->hr();
84
+		$this->out(__d('queue', ' ->Success, the Example Task was run.<-'));
85
+		$this->out(' ');
86
+		$this->out(' ');
87
+	}
88 88
 }
Please login to merge, or discard this patch.