Completed
Pull Request — master (#35)
by
unknown
03:22
created
tests/TestCase/Queue/TaskFinderTest.php 1 patch
Indentation   +46 added lines, -46 removed lines patch added patch discarded remove patch
@@ -7,50 +7,50 @@
 block discarded – undo
7 7
 class TaskFinderTest extends TestCase
8 8
 {
9 9
 
10
-    /**
11
-     *
12
-     * @var \Queue\Shell\QueueShell|\PHPUnit_Framework_MockObject_MockObject
13
-     */
14
-    public $QueueShell;
15
-
16
-    /**
17
-     *
18
-     * @var \Queue\Queue\TaskFinder
19
-     */
20
-    protected $taskFinder;
21
-
22
-    /**
23
-     * Fixtures to load
24
-     *
25
-     * @var array
26
-     */
27
-    public $fixtures = [
28
-        'plugin.Queue.QueuedTasks',
29
-    ];
30
-
31
-    /**
32
-     * Setup Defaults
33
-     *
34
-     * @return void
35
-     */
36
-    public function setUp()
37
-    {
38
-        parent::setUp();
39
-    }
40
-
41
-    /**
42
-     *
43
-     * @return void
44
-     */
45
-    public function testAllAppAndPluginTasks()
46
-    {
47
-        $this->taskFinder = new TaskFinder();
48
-
49
-        $result = $this->taskFinder->allAppAndPluginTasks();
50
-        $this->assertCount(1, $result);
51
-        $this->assertArraySubset([
52
-            'Queue.QueueExample'
53
-        ], $result);
54
-        $this->assertTrue(!in_array('Foo.QueueFoo', $result));
55
-    }
10
+	/**
11
+	 *
12
+	 * @var \Queue\Shell\QueueShell|\PHPUnit_Framework_MockObject_MockObject
13
+	 */
14
+	public $QueueShell;
15
+
16
+	/**
17
+	 *
18
+	 * @var \Queue\Queue\TaskFinder
19
+	 */
20
+	protected $taskFinder;
21
+
22
+	/**
23
+	 * Fixtures to load
24
+	 *
25
+	 * @var array
26
+	 */
27
+	public $fixtures = [
28
+		'plugin.Queue.QueuedTasks',
29
+	];
30
+
31
+	/**
32
+	 * Setup Defaults
33
+	 *
34
+	 * @return void
35
+	 */
36
+	public function setUp()
37
+	{
38
+		parent::setUp();
39
+	}
40
+
41
+	/**
42
+	 *
43
+	 * @return void
44
+	 */
45
+	public function testAllAppAndPluginTasks()
46
+	{
47
+		$this->taskFinder = new TaskFinder();
48
+
49
+		$result = $this->taskFinder->allAppAndPluginTasks();
50
+		$this->assertCount(1, $result);
51
+		$this->assertArraySubset([
52
+			'Queue.QueueExample'
53
+		], $result);
54
+		$this->assertTrue(!in_array('Foo.QueueFoo', $result));
55
+	}
56 56
 }
Please login to merge, or discard this patch.
tests/TestCase/Model/Table/QueuedTasksTableTest.php 1 patch
Indentation   +260 added lines, -260 removed lines patch added patch discarded remove patch
@@ -19,292 +19,292 @@
 block discarded – undo
19 19
 class QueuedTasksTableTest extends TestCase
20 20
 {
21 21
 
22
-    /**
23
-     *
24
-     * @var \Queue\Model\Table\QueuedTasksTable
25
-     */
26
-    protected $QueuedTasks;
22
+	/**
23
+	 *
24
+	 * @var \Queue\Model\Table\QueuedTasksTable
25
+	 */
26
+	protected $QueuedTasks;
27 27
 
28
-    /**
29
-     * Fixtures
30
-     *
31
-     * @var array
32
-     */
33
-    public $fixtures = [
34
-        'plugin.Queue.QueuedTasks'
35
-    ];
28
+	/**
29
+	 * Fixtures
30
+	 *
31
+	 * @var array
32
+	 */
33
+	public $fixtures = [
34
+		'plugin.Queue.QueuedTasks'
35
+	];
36 36
 
37
-    /**
38
-     * setUp method
39
-     *
40
-     * @return void
41
-     */
42
-    public function setUp()
43
-    {
44
-        parent::setUp();
45
-        $config = TableRegistry::getTableLocator()->exists('QueuedTasks') ? [] : [
46
-            'className' => QueuedTasksTable::class
47
-        ];
48
-        $this->QueuedTasks = TableRegistry::getTableLocator()->get('QueuedTasks', $config);
49
-    }
37
+	/**
38
+	 * setUp method
39
+	 *
40
+	 * @return void
41
+	 */
42
+	public function setUp()
43
+	{
44
+		parent::setUp();
45
+		$config = TableRegistry::getTableLocator()->exists('QueuedTasks') ? [] : [
46
+			'className' => QueuedTasksTable::class
47
+		];
48
+		$this->QueuedTasks = TableRegistry::getTableLocator()->get('QueuedTasks', $config);
49
+	}
50 50
 
51
-    /**
52
-     * Basic Instance test
53
-     *
54
-     * @return void
55
-     */
56
-    public function testQueueInstance()
57
-    {
58
-        $this->assertInstanceOf(QueuedTasksTable::class, $this->QueuedTasks);
59
-    }
51
+	/**
52
+	 * Basic Instance test
53
+	 *
54
+	 * @return void
55
+	 */
56
+	public function testQueueInstance()
57
+	{
58
+		$this->assertInstanceOf(QueuedTasksTable::class, $this->QueuedTasks);
59
+	}
60 60
 
61
-    /**
62
-     * Test the basic create and length evaluation functions.
63
-     *
64
-     * @return void
65
-     */
66
-    public function testCreateAndCount()
67
-    {
68
-        // at first, the queue should contain 0 items.
69
-        $this->assertSame(0, $this->QueuedTasks->getLength());
61
+	/**
62
+	 * Test the basic create and length evaluation functions.
63
+	 *
64
+	 * @return void
65
+	 */
66
+	public function testCreateAndCount()
67
+	{
68
+		// at first, the queue should contain 0 items.
69
+		$this->assertSame(0, $this->QueuedTasks->getLength());
70 70
 
71
-        // create a job
72
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test1', [
73
-            'some' => 'random',
74
-            'test' => 'data'
75
-        ]));
71
+		// create a job
72
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test1', [
73
+			'some' => 'random',
74
+			'test' => 'data'
75
+		]));
76 76
 
77
-        // test if queue Length is 1 now.
78
-        $this->assertSame(1, $this->QueuedTasks->getLength());
77
+		// test if queue Length is 1 now.
78
+		$this->assertSame(1, $this->QueuedTasks->getLength());
79 79
 
80
-        // create some more jobs
81
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
82
-            'some' => 'random',
83
-            'test' => 'data2'
84
-        ]));
85
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
86
-            'some' => 'random',
87
-            'test' => 'data3'
88
-        ]));
89
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('test3', [
90
-            'some' => 'random',
91
-            'test' => 'data4'
92
-        ]));
80
+		// create some more jobs
81
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
82
+			'some' => 'random',
83
+			'test' => 'data2'
84
+		]));
85
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test2', [
86
+			'some' => 'random',
87
+			'test' => 'data3'
88
+		]));
89
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('test3', [
90
+			'some' => 'random',
91
+			'test' => 'data4'
92
+		]));
93 93
 
94
-        // overall queueLength shpould now be 4
95
-        $this->assertSame(4, $this->QueuedTasks->getLength());
94
+		// overall queueLength shpould now be 4
95
+		$this->assertSame(4, $this->QueuedTasks->getLength());
96 96
 
97
-        // there should be 1 task of type 'test1', one of type 'test3' and 2 of type 'test2'
98
-        $this->assertSame(1, $this->QueuedTasks->getLength('test1'));
99
-        $this->assertSame(2, $this->QueuedTasks->getLength('test2'));
100
-        $this->assertSame(1, $this->QueuedTasks->getLength('test3'));
101
-    }
97
+		// there should be 1 task of type 'test1', one of type 'test3' and 2 of type 'test2'
98
+		$this->assertSame(1, $this->QueuedTasks->getLength('test1'));
99
+		$this->assertSame(2, $this->QueuedTasks->getLength('test2'));
100
+		$this->assertSame(1, $this->QueuedTasks->getLength('test3'));
101
+	}
102 102
 
103
-    /**
104
-     * Test the basic create and fetch functions.
105
-     *
106
-     * @return void
107
-     */
108
-    public function testCreateAndFetch()
109
-    {
110
-        $this->_needsConnection();
103
+	/**
104
+	 * Test the basic create and fetch functions.
105
+	 *
106
+	 * @return void
107
+	 */
108
+	public function testCreateAndFetch()
109
+	{
110
+		$this->_needsConnection();
111 111
 
112
-        // $capabilities is a list of tasks the worker can run.
113
-        $capabilities = [
114
-            'task1' => [
115
-                'name' => 'task1',
116
-                'timeout' => 100,
117
-                'retries' => 2
118
-            ]
119
-        ];
120
-        $testData = [
121
-            'x1' => 'y1',
122
-            'x2' => 'y2',
123
-            'x3' => 'y3',
124
-            'x4' => 'y4'
125
-        ];
112
+		// $capabilities is a list of tasks the worker can run.
113
+		$capabilities = [
114
+			'task1' => [
115
+				'name' => 'task1',
116
+				'timeout' => 100,
117
+				'retries' => 2
118
+			]
119
+		];
120
+		$testData = [
121
+			'x1' => 'y1',
122
+			'x2' => 'y2',
123
+			'x3' => 'y3',
124
+			'x4' => 'y4'
125
+		];
126 126
 
127
-        // start off empty.
128
-        $this->assertSame([], $this->QueuedTasks->find()
129
-            ->toArray());
130
-        // at first, the queue should contain 0 items.
131
-        $this->assertSame(0, $this->QueuedTasks->getLength());
132
-        // there are no jobs, so we cant fetch any.
133
-        $this->assertNull($this->QueuedTasks->requestJob($capabilities));
134
-        // insert one job.
135
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $testData));
127
+		// start off empty.
128
+		$this->assertSame([], $this->QueuedTasks->find()
129
+			->toArray());
130
+		// at first, the queue should contain 0 items.
131
+		$this->assertSame(0, $this->QueuedTasks->getLength());
132
+		// there are no jobs, so we cant fetch any.
133
+		$this->assertNull($this->QueuedTasks->requestJob($capabilities));
134
+		// insert one job.
135
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', $testData));
136 136
 
137
-        // fetch and check the first job.
138
-        $job = $this->QueuedTasks->requestJob($capabilities);
139
-        $this->assertSame(1, $job->id);
140
-        $this->assertSame('task1', $job->task);
141
-        $this->assertSame(0, $job->failed_count);
142
-        $this->assertNull($job->completed);
143
-        $this->assertSame($testData, unserialize($job->data));
137
+		// fetch and check the first job.
138
+		$job = $this->QueuedTasks->requestJob($capabilities);
139
+		$this->assertSame(1, $job->id);
140
+		$this->assertSame('task1', $job->task);
141
+		$this->assertSame(0, $job->failed_count);
142
+		$this->assertNull($job->completed);
143
+		$this->assertSame($testData, unserialize($job->data));
144 144
 
145
-        // after this job has been fetched, it may not be reassigned.
146
-        $result = $this->QueuedTasks->requestJob($capabilities);
147
-        $this->assertNull($result);
145
+		// after this job has been fetched, it may not be reassigned.
146
+		$result = $this->QueuedTasks->requestJob($capabilities);
147
+		$this->assertNull($result);
148 148
 
149
-        // queue length is still 1 since the first job did not finish.
150
-        $this->assertSame(1, $this->QueuedTasks->getLength());
149
+		// queue length is still 1 since the first job did not finish.
150
+		$this->assertSame(1, $this->QueuedTasks->getLength());
151 151
 
152
-        // Now mark Task1 as done
153
-        $this->assertTrue($this->QueuedTasks->markJobDone($job));
152
+		// Now mark Task1 as done
153
+		$this->assertTrue($this->QueuedTasks->markJobDone($job));
154 154
 
155
-        // Should be 0 again.
156
-        $this->assertSame(0, $this->QueuedTasks->getLength());
157
-    }
155
+		// Should be 0 again.
156
+		$this->assertSame(0, $this->QueuedTasks->getLength());
157
+	}
158 158
 
159
-    /**
160
-     * Test the delivery of jobs in sequence, skipping fetched but not completed tasks.
161
-     *
162
-     * @return void
163
-     */
164
-    public function testSequence()
165
-    {
166
-        $this->_needsConnection();
159
+	/**
160
+	 * Test the delivery of jobs in sequence, skipping fetched but not completed tasks.
161
+	 *
162
+	 * @return void
163
+	 */
164
+	public function testSequence()
165
+	{
166
+		$this->_needsConnection();
167 167
 
168
-        // $capabilities is a list of tasks the worker can run.
169
-        $capabilities = [
170
-            'task1' => [
171
-                'name' => 'task1',
172
-                'timeout' => 100,
173
-                'retries' => 2
174
-            ]
175
-        ];
176
-        // at first, the queue should contain 0 items.
177
-        $this->assertSame(0, $this->QueuedTasks->getLength());
178
-        // create some more jobs
179
-        foreach (range(0, 9) as $num) {
180
-            $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
181
-                'tasknum' => $num
182
-            ]));
183
-        }
184
-        // 10 jobs in the queue.
185
-        $this->assertSame(10, $this->QueuedTasks->getLength());
168
+		// $capabilities is a list of tasks the worker can run.
169
+		$capabilities = [
170
+			'task1' => [
171
+				'name' => 'task1',
172
+				'timeout' => 100,
173
+				'retries' => 2
174
+			]
175
+		];
176
+		// at first, the queue should contain 0 items.
177
+		$this->assertSame(0, $this->QueuedTasks->getLength());
178
+		// create some more jobs
179
+		foreach (range(0, 9) as $num) {
180
+			$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
181
+				'tasknum' => $num
182
+			]));
183
+		}
184
+		// 10 jobs in the queue.
185
+		$this->assertSame(10, $this->QueuedTasks->getLength());
186 186
 
187
-        // jobs should be fetched in the original sequence.
188
-        $array = [];
189
-        foreach (range(0, 4) as $num) {
190
-            $array[$num] = $this->QueuedTasks->requestJob($capabilities);
191
-            $jobData = unserialize($array[$num]['data']);
192
-            $this->assertSame($num, $jobData['tasknum']);
193
-        }
194
-        // now mark them as done
195
-        foreach (range(0, 4) as $num) {
196
-            $this->assertTrue($this->QueuedTasks->markJobDone($array[$num]));
197
-            $this->assertSame(9 - $num, $this->QueuedTasks->getLength());
198
-        }
187
+		// jobs should be fetched in the original sequence.
188
+		$array = [];
189
+		foreach (range(0, 4) as $num) {
190
+			$array[$num] = $this->QueuedTasks->requestJob($capabilities);
191
+			$jobData = unserialize($array[$num]['data']);
192
+			$this->assertSame($num, $jobData['tasknum']);
193
+		}
194
+		// now mark them as done
195
+		foreach (range(0, 4) as $num) {
196
+			$this->assertTrue($this->QueuedTasks->markJobDone($array[$num]));
197
+			$this->assertSame(9 - $num, $this->QueuedTasks->getLength());
198
+		}
199 199
 
200
-        // jobs should be fetched in the original sequence.
201
-        foreach (range(5, 9) as $num) {
202
-            $job = $this->QueuedTasks->requestJob($capabilities);
203
-            $jobData = unserialize($job->data);
204
-            $this->assertSame($num, $jobData['tasknum']);
205
-            $this->assertTrue($this->QueuedTasks->markJobDone($job));
206
-            $this->assertSame(9 - $num, $this->QueuedTasks->getLength());
207
-        }
208
-    }
200
+		// jobs should be fetched in the original sequence.
201
+		foreach (range(5, 9) as $num) {
202
+			$job = $this->QueuedTasks->requestJob($capabilities);
203
+			$jobData = unserialize($job->data);
204
+			$this->assertSame($num, $jobData['tasknum']);
205
+			$this->assertTrue($this->QueuedTasks->markJobDone($job));
206
+			$this->assertSame(9 - $num, $this->QueuedTasks->getLength());
207
+		}
208
+	}
209 209
 
210
-    /**
211
-     * Test creating Jobs to run close to a specified time, and strtotime parsing.
212
-     * Using toUnixString() function to convert Time object to timestamp, instead of strtotime
213
-     *
214
-     * @return null
215
-     */
216
-    public function testNotBefore()
217
-    {
218
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Min'));
219
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Day'));
220
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '2009-07-01 12:00:00'));
221
-        $data = $this->QueuedTasks->find('all')->toArray();
222
-        $this->assertWithinRange((new Time('+ 1 Min'))->toUnixString(), $data[0]['not_before']->toUnixString(), 60);
223
-        $this->assertWithinRange((new Time('+ 1 Day'))->toUnixString(), $data[1]['not_before']->toUnixString(), 60);
224
-        $this->assertWithinRange((new Time('2009-07-01 12:00:00'))->toUnixString(), $data[2]['not_before']->toUnixString(), 60);
225
-    }
210
+	/**
211
+	 * Test creating Jobs to run close to a specified time, and strtotime parsing.
212
+	 * Using toUnixString() function to convert Time object to timestamp, instead of strtotime
213
+	 *
214
+	 * @return null
215
+	 */
216
+	public function testNotBefore()
217
+	{
218
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Min'));
219
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '+ 1 Day'));
220
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', null, '2009-07-01 12:00:00'));
221
+		$data = $this->QueuedTasks->find('all')->toArray();
222
+		$this->assertWithinRange((new Time('+ 1 Min'))->toUnixString(), $data[0]['not_before']->toUnixString(), 60);
223
+		$this->assertWithinRange((new Time('+ 1 Day'))->toUnixString(), $data[1]['not_before']->toUnixString(), 60);
224
+		$this->assertWithinRange((new Time('2009-07-01 12:00:00'))->toUnixString(), $data[2]['not_before']->toUnixString(), 60);
225
+	}
226 226
 
227
-    /**
228
-     * Test Job reordering depending on 'notBefore' field.
229
-     * Jobs with an expired not_before field should be executed before any other job without specific timing info.
230
-     *
231
-     * @return void
232
-     */
233
-    public function testNotBeforeOrder()
234
-    {
235
-        $this->_needsConnection();
227
+	/**
228
+	 * Test Job reordering depending on 'notBefore' field.
229
+	 * Jobs with an expired not_before field should be executed before any other job without specific timing info.
230
+	 *
231
+	 * @return void
232
+	 */
233
+	public function testNotBeforeOrder()
234
+	{
235
+		$this->_needsConnection();
236 236
 
237
-        $capabilities = [
238
-            'task1' => [
239
-                'name' => 'task1',
240
-                'timeout' => 100,
241
-                'retries' => 2
242
-            ],
243
-            'dummytask' => [
244
-                'name' => 'dummytask',
245
-                'timeout' => 100,
246
-                'retries' => 2
247
-            ]
248
-        ];
249
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
250
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
251
-        // create a task with it's execution target some seconds in the past, so it should jump to the top of the testCreateAndFetchlist.
252
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
253
-            'three'
254
-        ], '- 3 Seconds'));
255
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
256
-            'two'
257
-        ], '- 5 Seconds'));
258
-        $this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
259
-            'one'
260
-        ], '- 7 Seconds'));
237
+		$capabilities = [
238
+			'task1' => [
239
+				'name' => 'task1',
240
+				'timeout' => 100,
241
+				'retries' => 2
242
+			],
243
+			'dummytask' => [
244
+				'name' => 'dummytask',
245
+				'timeout' => 100,
246
+				'retries' => 2
247
+			]
248
+		];
249
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
250
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('dummytask'));
251
+		// create a task with it's execution target some seconds in the past, so it should jump to the top of the testCreateAndFetchlist.
252
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
253
+			'three'
254
+		], '- 3 Seconds'));
255
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
256
+			'two'
257
+		], '- 5 Seconds'));
258
+		$this->assertTrue((bool)$this->QueuedTasks->createJob('task1', [
259
+			'one'
260
+		], '- 7 Seconds'));
261 261
 
262
-        // when using requestJob, the jobs we just created should be delivered in this order, NOT the order in which they where created.
263
-        $expected = [
264
-            [
265
-                'name' => 'task1',
266
-                'data' => [
267
-                    'one'
268
-                ]
269
-            ],
270
-            [
271
-                'name' => 'task1',
272
-                'data' => [
273
-                    'two'
274
-                ]
275
-            ],
276
-            [
277
-                'name' => 'task1',
278
-                'data' => [
279
-                    'three'
280
-                ]
281
-            ],
282
-            [
283
-                'name' => 'dummytask',
284
-                'data' => null
285
-            ],
286
-            [
287
-                'name' => 'dummytask',
288
-                'data' => null
289
-            ]
290
-        ];
262
+		// when using requestJob, the jobs we just created should be delivered in this order, NOT the order in which they where created.
263
+		$expected = [
264
+			[
265
+				'name' => 'task1',
266
+				'data' => [
267
+					'one'
268
+				]
269
+			],
270
+			[
271
+				'name' => 'task1',
272
+				'data' => [
273
+					'two'
274
+				]
275
+			],
276
+			[
277
+				'name' => 'task1',
278
+				'data' => [
279
+					'three'
280
+				]
281
+			],
282
+			[
283
+				'name' => 'dummytask',
284
+				'data' => null
285
+			],
286
+			[
287
+				'name' => 'dummytask',
288
+				'data' => null
289
+			]
290
+		];
291 291
 
292
-        foreach ($expected as $item) {
293
-            $tmp = $this->QueuedTasks->requestJob($capabilities);
292
+		foreach ($expected as $item) {
293
+			$tmp = $this->QueuedTasks->requestJob($capabilities);
294 294
 
295
-            $this->assertSame($item['name'], $tmp['task']);
296
-            $this->assertEquals($item['data'], unserialize($tmp['data']));
297
-        }
298
-    }
295
+			$this->assertSame($item['name'], $tmp['task']);
296
+			$this->assertEquals($item['data'], unserialize($tmp['data']));
297
+		}
298
+	}
299 299
 
300
-    /**
301
-     * Helper method for skipping tests that need a real connection.
302
-     *
303
-     * @return void
304
-     */
305
-    protected function _needsConnection()
306
-    {
307
-        $config = ConnectionManager::getConfig('test');
308
-        $this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
309
-    }
300
+	/**
301
+	 * Helper method for skipping tests that need a real connection.
302
+	 *
303
+	 * @return void
304
+	 */
305
+	protected function _needsConnection()
306
+	{
307
+		$config = ConnectionManager::getConfig('test');
308
+		$this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
309
+	}
310 310
 }
Please login to merge, or discard this patch.
tests/TestCase/Shell/QueueShellTest.php 1 patch
Indentation   +183 added lines, -183 removed lines patch added patch discarded remove patch
@@ -11,187 +11,187 @@
 block discarded – undo
11 11
 
12 12
 class QueueShellTest extends TestCase
13 13
 {
14
-    use ToolsTestTrait;
15
-
16
-    /**
17
-     *
18
-     * @var \Queue\Shell\QueueShell|\PHPUnit_Framework_MockObject_MockObject
19
-     */
20
-    public $QueueShell;
21
-
22
-    /**
23
-     *
24
-     * @var \Tools\TestSuite\ConsoleOutput
25
-     */
26
-    public $out;
27
-
28
-    /**
29
-     *
30
-     * @var \Tools\TestSuite\ConsoleOutput
31
-     */
32
-    public $err;
33
-
34
-    /**
35
-     * Fixtures to load
36
-     *
37
-     * @var array
38
-     */
39
-    public $fixtures = [
40
-        'plugin.Queue.QueuedTasks'
41
-    ];
42
-
43
-    /**
44
-     * Setup Defaults
45
-     *
46
-     * @return void
47
-     */
48
-    public function setUp()
49
-    {
50
-        parent::setUp();
51
-
52
-        $this->out = new ConsoleOutput();
53
-        $this->err = new ConsoleOutput();
54
-        $io = new ConsoleIo($this->out, $this->err);
55
-
56
-        $this->QueueShell = $this->getMockBuilder(QueueShell::class)
57
-            ->setMethods([
58
-                'in',
59
-                'err',
60
-                '_stop'
61
-            ])
62
-            ->setConstructorArgs([
63
-                $io
64
-            ])
65
-            ->getMock();
66
-        $this->QueueShell->initialize();
67
-
68
-        Configure::write('Queue', [
69
-            'sleepTime' => 2,
70
-            'defaultWorkerTimeout' => 3,
71
-            'workerMaxRuntime' => 5,
72
-            'cleanupTimeout' => 10,
73
-            'exitWhenNothingToDo' => false,
74
-            'log' => false
75
-        ]);
76
-    }
77
-
78
-    /**
79
-     *
80
-     * @return void
81
-     */
82
-    public function testObject()
83
-    {
84
-        $this->assertTrue(is_object($this->QueueShell));
85
-        $this->assertInstanceOf(QueueShell::class, $this->QueueShell);
86
-    }
87
-
88
-    /**
89
-     *
90
-     * @return void
91
-     */
92
-    public function testStats()
93
-    {
94
-        $this->_needsConnection();
95
-
96
-        $this->QueueShell->stats();
97
-        $this->assertContains('Total unfinished jobs: 0', $this->out->output());
98
-    }
99
-
100
-    /**
101
-     *
102
-     * @return void
103
-     */
104
-    public function testSettings()
105
-    {
106
-        $this->QueueShell->settings();
107
-        $this->assertContains('* cleanupTimeout: 10', $this->out->output());
108
-    }
109
-
110
-    /**
111
-     *
112
-     * @return void
113
-     */
114
-    public function testAddInexistent()
115
-    {
116
-        $this->QueueShell->args[] = 'FooBar';
117
-        $this->QueueShell->add();
118
-        $this->assertContains('Error: Task not found: FooBar', $this->out->output());
119
-    }
120
-
121
-    /**
122
-     *
123
-     * @return void
124
-     */
125
-    public function testAdd()
126
-    {
127
-        $this->QueueShell->args[] = 'Example';
128
-        $this->QueueShell->add();
129
-
130
-        $this->assertContains('OK, job created, now run the worker', $this->out->output(), print_r($this->out->output(), true));
131
-    }
132
-
133
-    /**
134
-     *
135
-     * @return void
136
-     */
137
-    public function testTimeNeeded()
138
-    {
139
-        $this->QueueShell = $this->getMockBuilder(QueueShell::class)
140
-            ->setMethods([
141
-                '_time'
142
-            ])
143
-            ->getMock();
144
-
145
-        $first = time();
146
-        $second = $first - HOUR + MINUTE;
147
-        $this->QueueShell->expects($this->at(0))
148
-            ->method('_time')
149
-            ->will($this->returnValue($first));
150
-        $this->QueueShell->expects($this->at(1))
151
-            ->method('_time')
152
-            ->will($this->returnValue($second));
153
-        $this->QueueShell->expects($this->exactly(2))
154
-            ->method('_time')
155
-            ->withAnyParameters();
156
-
157
-        $result = $this->invokeMethod($this->QueueShell, '_timeNeeded');
158
-        $this->assertSame('3540s', $result);
159
-    }
160
-
161
-    /**
162
-     *
163
-     * @return void
164
-     */
165
-    public function testMemoryUsage()
166
-    {
167
-        $result = $this->invokeMethod($this->QueueShell, '_memoryUsage');
168
-        $this->assertRegExp('/^\d+MB/', $result, 'Should be e.g. `17MB` or `17MB/1GB` etc.');
169
-    }
170
-
171
-    /**
172
-     *
173
-     * @return void
174
-     */
175
-    public function testStringToArray()
176
-    {
177
-        $string = 'Foo,Bar,';
178
-        $result = $this->invokeMethod($this->QueueShell, '_stringToArray', [$string]);
179
-
180
-        $expected = [
181
-            'Foo',
182
-            'Bar'
183
-        ];
184
-        $this->assertSame($expected, $result);
185
-    }
186
-
187
-    /**
188
-     * Helper method for skipping tests that need a real connection.
189
-     *
190
-     * @return void
191
-     */
192
-    protected function _needsConnection()
193
-    {
194
-        $config = ConnectionManager::getConfig('test');
195
-        $this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
196
-    }
14
+	use ToolsTestTrait;
15
+
16
+	/**
17
+	 *
18
+	 * @var \Queue\Shell\QueueShell|\PHPUnit_Framework_MockObject_MockObject
19
+	 */
20
+	public $QueueShell;
21
+
22
+	/**
23
+	 *
24
+	 * @var \Tools\TestSuite\ConsoleOutput
25
+	 */
26
+	public $out;
27
+
28
+	/**
29
+	 *
30
+	 * @var \Tools\TestSuite\ConsoleOutput
31
+	 */
32
+	public $err;
33
+
34
+	/**
35
+	 * Fixtures to load
36
+	 *
37
+	 * @var array
38
+	 */
39
+	public $fixtures = [
40
+		'plugin.Queue.QueuedTasks'
41
+	];
42
+
43
+	/**
44
+	 * Setup Defaults
45
+	 *
46
+	 * @return void
47
+	 */
48
+	public function setUp()
49
+	{
50
+		parent::setUp();
51
+
52
+		$this->out = new ConsoleOutput();
53
+		$this->err = new ConsoleOutput();
54
+		$io = new ConsoleIo($this->out, $this->err);
55
+
56
+		$this->QueueShell = $this->getMockBuilder(QueueShell::class)
57
+			->setMethods([
58
+				'in',
59
+				'err',
60
+				'_stop'
61
+			])
62
+			->setConstructorArgs([
63
+				$io
64
+			])
65
+			->getMock();
66
+		$this->QueueShell->initialize();
67
+
68
+		Configure::write('Queue', [
69
+			'sleepTime' => 2,
70
+			'defaultWorkerTimeout' => 3,
71
+			'workerMaxRuntime' => 5,
72
+			'cleanupTimeout' => 10,
73
+			'exitWhenNothingToDo' => false,
74
+			'log' => false
75
+		]);
76
+	}
77
+
78
+	/**
79
+	 *
80
+	 * @return void
81
+	 */
82
+	public function testObject()
83
+	{
84
+		$this->assertTrue(is_object($this->QueueShell));
85
+		$this->assertInstanceOf(QueueShell::class, $this->QueueShell);
86
+	}
87
+
88
+	/**
89
+	 *
90
+	 * @return void
91
+	 */
92
+	public function testStats()
93
+	{
94
+		$this->_needsConnection();
95
+
96
+		$this->QueueShell->stats();
97
+		$this->assertContains('Total unfinished jobs: 0', $this->out->output());
98
+	}
99
+
100
+	/**
101
+	 *
102
+	 * @return void
103
+	 */
104
+	public function testSettings()
105
+	{
106
+		$this->QueueShell->settings();
107
+		$this->assertContains('* cleanupTimeout: 10', $this->out->output());
108
+	}
109
+
110
+	/**
111
+	 *
112
+	 * @return void
113
+	 */
114
+	public function testAddInexistent()
115
+	{
116
+		$this->QueueShell->args[] = 'FooBar';
117
+		$this->QueueShell->add();
118
+		$this->assertContains('Error: Task not found: FooBar', $this->out->output());
119
+	}
120
+
121
+	/**
122
+	 *
123
+	 * @return void
124
+	 */
125
+	public function testAdd()
126
+	{
127
+		$this->QueueShell->args[] = 'Example';
128
+		$this->QueueShell->add();
129
+
130
+		$this->assertContains('OK, job created, now run the worker', $this->out->output(), print_r($this->out->output(), true));
131
+	}
132
+
133
+	/**
134
+	 *
135
+	 * @return void
136
+	 */
137
+	public function testTimeNeeded()
138
+	{
139
+		$this->QueueShell = $this->getMockBuilder(QueueShell::class)
140
+			->setMethods([
141
+				'_time'
142
+			])
143
+			->getMock();
144
+
145
+		$first = time();
146
+		$second = $first - HOUR + MINUTE;
147
+		$this->QueueShell->expects($this->at(0))
148
+			->method('_time')
149
+			->will($this->returnValue($first));
150
+		$this->QueueShell->expects($this->at(1))
151
+			->method('_time')
152
+			->will($this->returnValue($second));
153
+		$this->QueueShell->expects($this->exactly(2))
154
+			->method('_time')
155
+			->withAnyParameters();
156
+
157
+		$result = $this->invokeMethod($this->QueueShell, '_timeNeeded');
158
+		$this->assertSame('3540s', $result);
159
+	}
160
+
161
+	/**
162
+	 *
163
+	 * @return void
164
+	 */
165
+	public function testMemoryUsage()
166
+	{
167
+		$result = $this->invokeMethod($this->QueueShell, '_memoryUsage');
168
+		$this->assertRegExp('/^\d+MB/', $result, 'Should be e.g. `17MB` or `17MB/1GB` etc.');
169
+	}
170
+
171
+	/**
172
+	 *
173
+	 * @return void
174
+	 */
175
+	public function testStringToArray()
176
+	{
177
+		$string = 'Foo,Bar,';
178
+		$result = $this->invokeMethod($this->QueueShell, '_stringToArray', [$string]);
179
+
180
+		$expected = [
181
+			'Foo',
182
+			'Bar'
183
+		];
184
+		$this->assertSame($expected, $result);
185
+	}
186
+
187
+	/**
188
+	 * Helper method for skipping tests that need a real connection.
189
+	 *
190
+	 * @return void
191
+	 */
192
+	protected function _needsConnection()
193
+	{
194
+		$config = ConnectionManager::getConfig('test');
195
+		$this->skipIf(strpos($config['driver'], 'Mysql') === false, 'Only Mysql is working yet for this.');
196
+	}
197 197
 }
Please login to merge, or discard this patch.
src/Model/Table/QueuedTasksTable.php 1 patch
Indentation   +532 added lines, -532 removed lines patch added patch discarded remove patch
@@ -28,536 +28,536 @@
 block discarded – undo
28 28
 class QueuedTasksTable extends Table
29 29
 {
30 30
 
31
-    const DRIVER_MYSQL = 'Mysql';
32
-
33
-    const DRIVER_POSTGRES = 'Postgres';
34
-
35
-    const DRIVER_SQLSERVER = 'Sqlserver';
36
-
37
-    const STATS_LIMIT = 100000;
38
-
39
-    /**
40
-     * Initialize method
41
-     *
42
-     * @param array $config The configuration for the Table.
43
-     * @return void
44
-     */
45
-    public function initialize(array $config)
46
-    {
47
-        parent::initialize($config);
48
-
49
-        $this->setTable('queued_tasks');
50
-        $this->setDisplayField('id');
51
-        $this->setPrimaryKey('id');
52
-
53
-        $this->addBehavior('Timestamp');
54
-    }
55
-
56
-    /**
57
-     * set connection name
58
-     *
59
-     * @return string
60
-     */
61
-    public static function defaultConnectionName()
62
-    {
63
-        $connection = Configure::read('Queue.connection');
64
-        if (!empty($connection)) {
65
-            return $connection;
66
-        }
67
-
68
-        return parent::defaultConnectionName();
69
-    }
70
-
71
-    /**
72
-     *
73
-     * @param \Cake\Event\Event $event Model event
74
-     * @param \ArrayObject $data The data
75
-     * @param \ArrayObject $options The options
76
-     * @return void
77
-     */
78
-    public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
79
-    {
80
-        if (isset($data['data']) && $data['data'] === '') {
81
-            $data['data'] = null;
82
-        }
83
-    }
84
-
85
-    /**
86
-     * Adds a new job to the queue.
87
-     *
88
-     * @param string $taskName Task name
89
-     * @param array|null $data Array of data
90
-     * @param string $notBefore A datetime which indicates when the job may be executed
91
-     * @return \Queue\Model\Entity\QueuedTask Saved job entity
92
-     */
93
-    public function createJob($taskName, array $data = null, string $notBefore = null)
94
-    {
95
-        $task = [
96
-            'task' => $taskName,
97
-            'data' => serialize($data),
98
-            'not_before' => $this->getDateTime()
99
-        ];
100
-
101
-        if (!empty($notBefore)) {
102
-            $task['not_before'] = $this->getDateTime(strtotime($notBefore));
103
-        }
104
-
105
-        $queuedTask = $this->newEntity($task);
106
-
107
-        return $this->saveOrFail($queuedTask);
108
-    }
109
-
110
-    /**
111
-     * Returns the number of items in the queue.
112
-     * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
113
-     *
114
-     * @param string|null $taskName Task type to Count
115
-     * @return int
116
-     */
117
-    public function getLength($taskName = null)
118
-    {
119
-        $findConf = [
120
-            'conditions' => [
121
-                'completed IS' => null
122
-            ]
123
-        ];
124
-        if ($taskName !== null) {
125
-            $findConf['conditions']['task'] = $taskName;
126
-        }
127
-
128
-        return $this->find('all', $findConf)->count();
129
-    }
130
-
131
-    /**
132
-     * Return a list of all task types in the Queue.
133
-     *
134
-     * @return \Cake\ORM\Query
135
-     */
136
-    public function getTypes()
137
-    {
138
-        $findCond = [
139
-            'fields' => [
140
-                'task'
141
-            ],
142
-            'group' => [
143
-                'task'
144
-            ],
145
-            'keyField' => 'task',
146
-            'valueField' => 'task'
147
-        ];
148
-
149
-        return $this->find('list', $findCond);
150
-    }
151
-
152
-    /**
153
-     * Return some statistics about finished jobs still in the Database.
154
-     * TO-DO: rewrite as virtual field
155
-     *
156
-     * @return \Cake\ORM\Query
157
-     */
158
-    public function getStats()
159
-    {
160
-        $driverName = $this->_getDriverName();
161
-        $options = [
162
-            'fields' => function (Query $query) use ($driverName) {
163
-                $alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
164
-                $runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
165
-                $fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
166
-                switch ($driverName) {
167
-                    case static::DRIVER_SQLSERVER:
168
-                        $alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
169
-                        $runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
170
-                        $fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
171
-                        break;
172
-                }
173
-                /**
174
-                 *
175
-                 * @var \Cake\ORM\Query
176
-                 */
177
-                return [
178
-                    'task',
179
-                    'num' => $query->func()->count('*'),
180
-                    'alltime' => $alltime,
181
-                    'runtime' => $runtime,
182
-                    'fetchdelay' => $fetchdelay
183
-                ];
184
-            },
185
-            'conditions' => [
186
-                'completed IS NOT' => null
187
-            ],
188
-            'group' => [
189
-                'task'
190
-            ]
191
-        ];
192
-
193
-        return $this->find('all', $options);
194
-    }
195
-
196
-    /**
197
-     * Returns [
198
-     * 'Task' => [
199
-     * 'YYYY-MM-DD' => INT,
200
-     * ...
201
-     * ]
202
-     * ]
203
-     *
204
-     * @param string|null $taskName The task name
205
-     * @return array
206
-     */
207
-    public function getFullStats($taskName = null)
208
-    {
209
-        $driverName = $this->_getDriverName();
210
-        $fields = function (Query $query) use ($driverName) {
211
-            $runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
212
-            switch ($driverName) {
213
-                case static::DRIVER_SQLSERVER:
214
-                    $runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
215
-                    break;
216
-            }
217
-
218
-            return [
219
-                'task',
220
-                'created',
221
-                'duration' => $runtime
222
-            ];
223
-        };
224
-
225
-        $conditions = [
226
-            'completed IS NOT' => null
227
-        ];
228
-        if ($taskName) {
229
-            $conditions['task'] = $taskName;
230
-        }
231
-
232
-        $tasks = $this->find()
233
-            ->select($fields)
234
-            ->where($conditions)
235
-            ->enableHydration(false)
236
-            ->orderDesc('id')
237
-            ->limit(static::STATS_LIMIT)
238
-            ->all()
239
-            ->toArray();
240
-
241
-        $result = [];
242
-
243
-        $days = [];
244
-
245
-        foreach ($tasks as $task) {
246
-            /** @var \DateTime $created */
247
-            $created = $task['created'];
248
-            $day = $created->format('Y-m-d');
249
-            if (!isset($days[$day])) {
250
-                $days[$day] = $day;
251
-            }
252
-
253
-            $result[$task['task']][$day][] = $task['duration'];
254
-        }
255
-
256
-        foreach ($result as $type => $tasks) {
257
-            foreach ($tasks as $day => $durations) {
258
-                $average = array_sum($durations) / count($durations);
259
-                $result[$type][$day] = (int)$average;
260
-            }
261
-
262
-            foreach ($days as $day) {
263
-                if (isset($result[$type][$day])) {
264
-                    continue;
265
-                }
266
-
267
-                $result[$type][$day] = 0;
268
-            }
269
-
270
-            ksort($result[$type]);
271
-        }
272
-
273
-        return $result;
274
-    }
275
-
276
-    /**
277
-     * Look for a new job that can be processed with the current abilities and
278
-     * from the specified group (or any if null).
279
-     *
280
-     * @param array $capabilities Available QueueWorkerTasks.
281
-     * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
282
-     * @return \Queue\Model\Entity\QueuedTask|null
283
-     */
284
-    public function requestJob(array $capabilities, array $types = [])
285
-    {
286
-        $now = $this->getDateTime();
287
-        $nowStr = $now->toDateTimeString();
288
-        $driverName = $this->_getDriverName();
289
-
290
-        $query = $this->find();
291
-        $age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
292
-        switch ($driverName) {
293
-            case static::DRIVER_SQLSERVER:
294
-                $age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
295
-                break;
296
-            case static::DRIVER_POSTGRES:
297
-                $age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
298
-                break;
299
-        }
300
-        $options = [
301
-            'conditions' => [
302
-                'completed IS' => null,
303
-                'OR' => []
304
-            ],
305
-            'fields' => [
306
-                'age' => $age
307
-            ],
308
-            'order' => [
309
-                'age' => 'ASC',
310
-                'id' => 'ASC'
311
-            ]
312
-        ];
313
-
314
-        if ($types) {
315
-            $options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
316
-        }
317
-
318
-        // Generate the task specific conditions.
319
-        foreach ($capabilities as $task) {
320
-            list ($plugin, $name) = pluginSplit($task['name']);
321
-            $timeoutAt = $now->copy();
322
-            $tmp = [
323
-                'task' => $name,
324
-                'AND' => [
325
-                    [
326
-                        'OR' => [
327
-                            'not_before <=' => $nowStr,
328
-                            'not_before IS' => null
329
-                        ]
330
-                    ],
331
-                    [
332
-                        'OR' => [
333
-                            'fetched <' => $timeoutAt->subSeconds($task['timeout']),
334
-                            'fetched IS' => null
335
-                        ]
336
-                    ]
337
-                ],
338
-                'failed_count <' => ($task['retries'] + 1)
339
-            ];
340
-            $options['conditions']['OR'][] = $tmp;
341
-        }
342
-
343
-        /** @var \Queue\Model\Entity\QueuedTask|null $task */
344
-        $task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
345
-            $task = $query->find('all', $options)
346
-                ->enableAutoFields(true)
347
-                ->epilog('FOR UPDATE')
348
-                ->first();
349
-
350
-            if (!$task) {
351
-                return null;
352
-            }
353
-
354
-            $key = sha1(microtime());
355
-            $task = $this->patchEntity($task, [
356
-                'worker_key' => $key,
357
-                'fetched' => $now
358
-            ]);
359
-
360
-            return $this->saveOrFail($task);
361
-        });
362
-
363
-        if (!$task) {
364
-            return null;
365
-        }
366
-
367
-        return $task;
368
-    }
369
-
370
-    /**
371
-     * Mark a task as Completed, removing it from the queue.
372
-     *
373
-     * @param \Queue\Model\Entity\QueuedTask $task Task
374
-     * @return bool Success
375
-     */
376
-    public function markJobDone(QueuedTask $task)
377
-    {
378
-        $fields = [
379
-            'completed' => $this->getDateTime()
380
-        ];
381
-        $task = $this->patchEntity($task, $fields);
382
-
383
-        return (bool)$this->save($task);
384
-    }
385
-
386
-    /**
387
-     * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
388
-     *
389
-     * @param \Queue\Model\Entity\QueuedTask $task Task
390
-     * @param string|null $failureMessage Optional message to append to the failure_message field.
391
-     * @return bool Success
392
-     */
393
-    public function markJobFailed(QueuedTask $task, $failureMessage = null)
394
-    {
395
-        $fields = [
396
-            'failed_count' => $task->failed_count + 1,
397
-            'failure_message' => $failureMessage
398
-        ];
399
-        $task = $this->patchEntity($task, $fields);
400
-
401
-        return (bool)$this->save($task);
402
-    }
403
-
404
-    /**
405
-     * Reset current jobs
406
-     *
407
-     * @param int|null $id ID
408
-     *
409
-     * @return int Success
410
-     */
411
-    public function reset($id = null)
412
-    {
413
-        $fields = [
414
-            'completed' => null,
415
-            'fetched' => null,
416
-            'failed_count' => 0,
417
-            'worker_key' => null,
418
-            'failure_message' => null
419
-        ];
420
-        $conditions = [
421
-            'completed IS' => null
422
-        ];
423
-        if ($id) {
424
-            $conditions['id'] = $id;
425
-        }
426
-
427
-        return $this->updateAll($fields, $conditions);
428
-    }
429
-
430
-    /**
431
-     *
432
-     * @param string $taskName Task name
433
-     *
434
-     * @return int
435
-     */
436
-    public function rerun($taskName)
437
-    {
438
-        $fields = [
439
-            'completed' => null,
440
-            'fetched' => null,
441
-            'failed_count' => 0,
442
-            'worker_key' => null,
443
-            'failure_message' => null
444
-        ];
445
-        $conditions = [
446
-            'completed IS NOT' => null,
447
-            'task' => $taskName
448
-        ];
449
-
450
-        return $this->updateAll($fields, $conditions);
451
-    }
452
-
453
-    /**
454
-     * Cleanup/Delete Completed Tasks.
455
-     *
456
-     * @return void
457
-     */
458
-    public function cleanOldJobs()
459
-    {
460
-        if (!Configure::read('Queue.cleanuptimeout')) {
461
-            return;
462
-        }
463
-
464
-        $this->deleteAll([
465
-            'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
466
-        ]);
467
-    }
468
-
469
-    /**
470
-     *
471
-     * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
472
-     * @param array $taskConfiguration Task configuration
473
-     * @return string
474
-     */
475
-    public function getFailedStatus($queuedTask, array $taskConfiguration)
476
-    {
477
-        $failureMessageRequeued = 'requeued';
478
-
479
-        $queuedTaskName = 'Queue' . $queuedTask->task;
480
-        if (empty($taskConfiguration[$queuedTaskName])) {
481
-            return $failureMessageRequeued;
482
-        }
483
-        $retries = $taskConfiguration[$queuedTaskName]['retries'];
484
-        if ($queuedTask->failed_count <= $retries) {
485
-            return $failureMessageRequeued;
486
-        }
487
-
488
-        return 'aborted';
489
-    }
490
-
491
-    /**
492
-     * truncate()
493
-     *
494
-     * @return void
495
-     */
496
-    public function truncate()
497
-    {
498
-        $sql = $this->getSchema()->truncateSql($this->_connection);
499
-        foreach ($sql as $snippet) {
500
-            $this->_connection->execute($snippet);
501
-        }
502
-    }
503
-
504
-    /**
505
-     * get the name of the driver
506
-     *
507
-     * @return string
508
-     */
509
-    protected function _getDriverName()
510
-    {
511
-        $className = explode('\\', $this->getConnection()->config()['driver']);
512
-        $name = end($className);
513
-
514
-        return $name;
515
-    }
516
-
517
-    /**
518
-     *
519
-     * @param array $conditions Conditions
520
-     * @param string $key Key
521
-     * @param array $values Values
522
-     * @return array
523
-     */
524
-    protected function addFilter(array $conditions, $key, array $values)
525
-    {
526
-        $include = [];
527
-        $exclude = [];
528
-        foreach ($values as $value) {
529
-            if (substr($value, 0, 1) === '-') {
530
-                $exclude[] = substr($value, 1);
531
-            } else {
532
-                $include[] = $value;
533
-            }
534
-        }
535
-
536
-        if ($include) {
537
-            $conditions[$key . ' IN'] = $include;
538
-        }
539
-        if ($exclude) {
540
-            $conditions[$key . ' NOT IN'] = $exclude;
541
-        }
542
-
543
-        return $conditions;
544
-    }
545
-
546
-    /**
547
-     * Returns a DateTime object from different input.
548
-     *
549
-     * Without argument this will be "now".
550
-     *
551
-     * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
552
-     *
553
-     * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
554
-     */
555
-    protected function getDateTime($notBefore = null)
556
-    {
557
-        if (is_object($notBefore)) {
558
-            return $notBefore;
559
-        }
560
-
561
-        return new FrozenTime($notBefore);
562
-    }
31
+	const DRIVER_MYSQL = 'Mysql';
32
+
33
+	const DRIVER_POSTGRES = 'Postgres';
34
+
35
+	const DRIVER_SQLSERVER = 'Sqlserver';
36
+
37
+	const STATS_LIMIT = 100000;
38
+
39
+	/**
40
+	 * Initialize method
41
+	 *
42
+	 * @param array $config The configuration for the Table.
43
+	 * @return void
44
+	 */
45
+	public function initialize(array $config)
46
+	{
47
+		parent::initialize($config);
48
+
49
+		$this->setTable('queued_tasks');
50
+		$this->setDisplayField('id');
51
+		$this->setPrimaryKey('id');
52
+
53
+		$this->addBehavior('Timestamp');
54
+	}
55
+
56
+	/**
57
+	 * set connection name
58
+	 *
59
+	 * @return string
60
+	 */
61
+	public static function defaultConnectionName()
62
+	{
63
+		$connection = Configure::read('Queue.connection');
64
+		if (!empty($connection)) {
65
+			return $connection;
66
+		}
67
+
68
+		return parent::defaultConnectionName();
69
+	}
70
+
71
+	/**
72
+	 *
73
+	 * @param \Cake\Event\Event $event Model event
74
+	 * @param \ArrayObject $data The data
75
+	 * @param \ArrayObject $options The options
76
+	 * @return void
77
+	 */
78
+	public function beforeMarshal(Event $event, ArrayObject $data, ArrayObject $options)
79
+	{
80
+		if (isset($data['data']) && $data['data'] === '') {
81
+			$data['data'] = null;
82
+		}
83
+	}
84
+
85
+	/**
86
+	 * Adds a new job to the queue.
87
+	 *
88
+	 * @param string $taskName Task name
89
+	 * @param array|null $data Array of data
90
+	 * @param string $notBefore A datetime which indicates when the job may be executed
91
+	 * @return \Queue\Model\Entity\QueuedTask Saved job entity
92
+	 */
93
+	public function createJob($taskName, array $data = null, string $notBefore = null)
94
+	{
95
+		$task = [
96
+			'task' => $taskName,
97
+			'data' => serialize($data),
98
+			'not_before' => $this->getDateTime()
99
+		];
100
+
101
+		if (!empty($notBefore)) {
102
+			$task['not_before'] = $this->getDateTime(strtotime($notBefore));
103
+		}
104
+
105
+		$queuedTask = $this->newEntity($task);
106
+
107
+		return $this->saveOrFail($queuedTask);
108
+	}
109
+
110
+	/**
111
+	 * Returns the number of items in the queue.
112
+	 * Either returns the number of ALL pending jobs, or the number of pending jobs of the passed type.
113
+	 *
114
+	 * @param string|null $taskName Task type to Count
115
+	 * @return int
116
+	 */
117
+	public function getLength($taskName = null)
118
+	{
119
+		$findConf = [
120
+			'conditions' => [
121
+				'completed IS' => null
122
+			]
123
+		];
124
+		if ($taskName !== null) {
125
+			$findConf['conditions']['task'] = $taskName;
126
+		}
127
+
128
+		return $this->find('all', $findConf)->count();
129
+	}
130
+
131
+	/**
132
+	 * Return a list of all task types in the Queue.
133
+	 *
134
+	 * @return \Cake\ORM\Query
135
+	 */
136
+	public function getTypes()
137
+	{
138
+		$findCond = [
139
+			'fields' => [
140
+				'task'
141
+			],
142
+			'group' => [
143
+				'task'
144
+			],
145
+			'keyField' => 'task',
146
+			'valueField' => 'task'
147
+		];
148
+
149
+		return $this->find('list', $findCond);
150
+	}
151
+
152
+	/**
153
+	 * Return some statistics about finished jobs still in the Database.
154
+	 * TO-DO: rewrite as virtual field
155
+	 *
156
+	 * @return \Cake\ORM\Query
157
+	 */
158
+	public function getStats()
159
+	{
160
+		$driverName = $this->_getDriverName();
161
+		$options = [
162
+			'fields' => function (Query $query) use ($driverName) {
163
+				$alltime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)');
164
+				$runtime = $query->func()->avg('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
165
+				$fetchdelay = $query->func()->avg('UNIX_TIMESTAMP(fetched) - IF(not_before is NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))');
166
+				switch ($driverName) {
167
+					case static::DRIVER_SQLSERVER:
168
+						$alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
169
+						$runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
170
+						$fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN not_before IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', not_before) END)");
171
+						break;
172
+				}
173
+				/**
174
+				 *
175
+				 * @var \Cake\ORM\Query
176
+				 */
177
+				return [
178
+					'task',
179
+					'num' => $query->func()->count('*'),
180
+					'alltime' => $alltime,
181
+					'runtime' => $runtime,
182
+					'fetchdelay' => $fetchdelay
183
+				];
184
+			},
185
+			'conditions' => [
186
+				'completed IS NOT' => null
187
+			],
188
+			'group' => [
189
+				'task'
190
+			]
191
+		];
192
+
193
+		return $this->find('all', $options);
194
+	}
195
+
196
+	/**
197
+	 * Returns [
198
+	 * 'Task' => [
199
+	 * 'YYYY-MM-DD' => INT,
200
+	 * ...
201
+	 * ]
202
+	 * ]
203
+	 *
204
+	 * @param string|null $taskName The task name
205
+	 * @return array
206
+	 */
207
+	public function getFullStats($taskName = null)
208
+	{
209
+		$driverName = $this->_getDriverName();
210
+		$fields = function (Query $query) use ($driverName) {
211
+			$runtime = $query->newExpr('UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)');
212
+			switch ($driverName) {
213
+				case static::DRIVER_SQLSERVER:
214
+					$runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
215
+					break;
216
+			}
217
+
218
+			return [
219
+				'task',
220
+				'created',
221
+				'duration' => $runtime
222
+			];
223
+		};
224
+
225
+		$conditions = [
226
+			'completed IS NOT' => null
227
+		];
228
+		if ($taskName) {
229
+			$conditions['task'] = $taskName;
230
+		}
231
+
232
+		$tasks = $this->find()
233
+			->select($fields)
234
+			->where($conditions)
235
+			->enableHydration(false)
236
+			->orderDesc('id')
237
+			->limit(static::STATS_LIMIT)
238
+			->all()
239
+			->toArray();
240
+
241
+		$result = [];
242
+
243
+		$days = [];
244
+
245
+		foreach ($tasks as $task) {
246
+			/** @var \DateTime $created */
247
+			$created = $task['created'];
248
+			$day = $created->format('Y-m-d');
249
+			if (!isset($days[$day])) {
250
+				$days[$day] = $day;
251
+			}
252
+
253
+			$result[$task['task']][$day][] = $task['duration'];
254
+		}
255
+
256
+		foreach ($result as $type => $tasks) {
257
+			foreach ($tasks as $day => $durations) {
258
+				$average = array_sum($durations) / count($durations);
259
+				$result[$type][$day] = (int)$average;
260
+			}
261
+
262
+			foreach ($days as $day) {
263
+				if (isset($result[$type][$day])) {
264
+					continue;
265
+				}
266
+
267
+				$result[$type][$day] = 0;
268
+			}
269
+
270
+			ksort($result[$type]);
271
+		}
272
+
273
+		return $result;
274
+	}
275
+
276
+	/**
277
+	 * Look for a new job that can be processed with the current abilities and
278
+	 * from the specified group (or any if null).
279
+	 *
280
+	 * @param array $capabilities Available QueueWorkerTasks.
281
+	 * @param array $types Request a job from these types (or exclude certain types), or any otherwise.
282
+	 * @return \Queue\Model\Entity\QueuedTask|null
283
+	 */
284
+	public function requestJob(array $capabilities, array $types = [])
285
+	{
286
+		$now = $this->getDateTime();
287
+		$nowStr = $now->toDateTimeString();
288
+		$driverName = $this->_getDriverName();
289
+
290
+		$query = $this->find();
291
+		$age = $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", not_before), 0)');
292
+		switch ($driverName) {
293
+			case static::DRIVER_SQLSERVER:
294
+				$age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), not_before), 0)');
295
+				break;
296
+			case static::DRIVER_POSTGRES:
297
+				$age = $query->newExpr()->add('COALESCE((EXTRACT(EPOCH FROM now()) - EXTRACT(EPOCH FROM not_before)), 0)');
298
+				break;
299
+		}
300
+		$options = [
301
+			'conditions' => [
302
+				'completed IS' => null,
303
+				'OR' => []
304
+			],
305
+			'fields' => [
306
+				'age' => $age
307
+			],
308
+			'order' => [
309
+				'age' => 'ASC',
310
+				'id' => 'ASC'
311
+			]
312
+		];
313
+
314
+		if ($types) {
315
+			$options['conditions'] = $this->addFilter($options['conditions'], 'task', $types);
316
+		}
317
+
318
+		// Generate the task specific conditions.
319
+		foreach ($capabilities as $task) {
320
+			list ($plugin, $name) = pluginSplit($task['name']);
321
+			$timeoutAt = $now->copy();
322
+			$tmp = [
323
+				'task' => $name,
324
+				'AND' => [
325
+					[
326
+						'OR' => [
327
+							'not_before <=' => $nowStr,
328
+							'not_before IS' => null
329
+						]
330
+					],
331
+					[
332
+						'OR' => [
333
+							'fetched <' => $timeoutAt->subSeconds($task['timeout']),
334
+							'fetched IS' => null
335
+						]
336
+					]
337
+				],
338
+				'failed_count <' => ($task['retries'] + 1)
339
+			];
340
+			$options['conditions']['OR'][] = $tmp;
341
+		}
342
+
343
+		/** @var \Queue\Model\Entity\QueuedTask|null $task */
344
+		$task = $this->getConnection()->transactional(function () use ($query, $options, $now) {
345
+			$task = $query->find('all', $options)
346
+				->enableAutoFields(true)
347
+				->epilog('FOR UPDATE')
348
+				->first();
349
+
350
+			if (!$task) {
351
+				return null;
352
+			}
353
+
354
+			$key = sha1(microtime());
355
+			$task = $this->patchEntity($task, [
356
+				'worker_key' => $key,
357
+				'fetched' => $now
358
+			]);
359
+
360
+			return $this->saveOrFail($task);
361
+		});
362
+
363
+		if (!$task) {
364
+			return null;
365
+		}
366
+
367
+		return $task;
368
+	}
369
+
370
+	/**
371
+	 * Mark a task as Completed, removing it from the queue.
372
+	 *
373
+	 * @param \Queue\Model\Entity\QueuedTask $task Task
374
+	 * @return bool Success
375
+	 */
376
+	public function markJobDone(QueuedTask $task)
377
+	{
378
+		$fields = [
379
+			'completed' => $this->getDateTime()
380
+		];
381
+		$task = $this->patchEntity($task, $fields);
382
+
383
+		return (bool)$this->save($task);
384
+	}
385
+
386
+	/**
387
+	 * Mark a job as Failed, incrementing the failed-counter and Requeueing it.
388
+	 *
389
+	 * @param \Queue\Model\Entity\QueuedTask $task Task
390
+	 * @param string|null $failureMessage Optional message to append to the failure_message field.
391
+	 * @return bool Success
392
+	 */
393
+	public function markJobFailed(QueuedTask $task, $failureMessage = null)
394
+	{
395
+		$fields = [
396
+			'failed_count' => $task->failed_count + 1,
397
+			'failure_message' => $failureMessage
398
+		];
399
+		$task = $this->patchEntity($task, $fields);
400
+
401
+		return (bool)$this->save($task);
402
+	}
403
+
404
+	/**
405
+	 * Reset current jobs
406
+	 *
407
+	 * @param int|null $id ID
408
+	 *
409
+	 * @return int Success
410
+	 */
411
+	public function reset($id = null)
412
+	{
413
+		$fields = [
414
+			'completed' => null,
415
+			'fetched' => null,
416
+			'failed_count' => 0,
417
+			'worker_key' => null,
418
+			'failure_message' => null
419
+		];
420
+		$conditions = [
421
+			'completed IS' => null
422
+		];
423
+		if ($id) {
424
+			$conditions['id'] = $id;
425
+		}
426
+
427
+		return $this->updateAll($fields, $conditions);
428
+	}
429
+
430
+	/**
431
+	 *
432
+	 * @param string $taskName Task name
433
+	 *
434
+	 * @return int
435
+	 */
436
+	public function rerun($taskName)
437
+	{
438
+		$fields = [
439
+			'completed' => null,
440
+			'fetched' => null,
441
+			'failed_count' => 0,
442
+			'worker_key' => null,
443
+			'failure_message' => null
444
+		];
445
+		$conditions = [
446
+			'completed IS NOT' => null,
447
+			'task' => $taskName
448
+		];
449
+
450
+		return $this->updateAll($fields, $conditions);
451
+	}
452
+
453
+	/**
454
+	 * Cleanup/Delete Completed Tasks.
455
+	 *
456
+	 * @return void
457
+	 */
458
+	public function cleanOldJobs()
459
+	{
460
+		if (!Configure::read('Queue.cleanuptimeout')) {
461
+			return;
462
+		}
463
+
464
+		$this->deleteAll([
465
+			'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout')
466
+		]);
467
+	}
468
+
469
+	/**
470
+	 *
471
+	 * @param \Queue\Model\Entity\QueuedTask $queuedTask Queued task
472
+	 * @param array $taskConfiguration Task configuration
473
+	 * @return string
474
+	 */
475
+	public function getFailedStatus($queuedTask, array $taskConfiguration)
476
+	{
477
+		$failureMessageRequeued = 'requeued';
478
+
479
+		$queuedTaskName = 'Queue' . $queuedTask->task;
480
+		if (empty($taskConfiguration[$queuedTaskName])) {
481
+			return $failureMessageRequeued;
482
+		}
483
+		$retries = $taskConfiguration[$queuedTaskName]['retries'];
484
+		if ($queuedTask->failed_count <= $retries) {
485
+			return $failureMessageRequeued;
486
+		}
487
+
488
+		return 'aborted';
489
+	}
490
+
491
+	/**
492
+	 * truncate()
493
+	 *
494
+	 * @return void
495
+	 */
496
+	public function truncate()
497
+	{
498
+		$sql = $this->getSchema()->truncateSql($this->_connection);
499
+		foreach ($sql as $snippet) {
500
+			$this->_connection->execute($snippet);
501
+		}
502
+	}
503
+
504
+	/**
505
+	 * get the name of the driver
506
+	 *
507
+	 * @return string
508
+	 */
509
+	protected function _getDriverName()
510
+	{
511
+		$className = explode('\\', $this->getConnection()->config()['driver']);
512
+		$name = end($className);
513
+
514
+		return $name;
515
+	}
516
+
517
+	/**
518
+	 *
519
+	 * @param array $conditions Conditions
520
+	 * @param string $key Key
521
+	 * @param array $values Values
522
+	 * @return array
523
+	 */
524
+	protected function addFilter(array $conditions, $key, array $values)
525
+	{
526
+		$include = [];
527
+		$exclude = [];
528
+		foreach ($values as $value) {
529
+			if (substr($value, 0, 1) === '-') {
530
+				$exclude[] = substr($value, 1);
531
+			} else {
532
+				$include[] = $value;
533
+			}
534
+		}
535
+
536
+		if ($include) {
537
+			$conditions[$key . ' IN'] = $include;
538
+		}
539
+		if ($exclude) {
540
+			$conditions[$key . ' NOT IN'] = $exclude;
541
+		}
542
+
543
+		return $conditions;
544
+	}
545
+
546
+	/**
547
+	 * Returns a DateTime object from different input.
548
+	 *
549
+	 * Without argument this will be "now".
550
+	 *
551
+	 * @param int|string|\Cake\I18n\FrozenTime|\Cake\I18n\Time|null $notBefore Not before time
552
+	 *
553
+	 * @return \Cake\I18n\FrozenTime|\Cake\I18n\Time
554
+	 */
555
+	protected function getDateTime($notBefore = null)
556
+	{
557
+		if (is_object($notBefore)) {
558
+			return $notBefore;
559
+		}
560
+
561
+		return new FrozenTime($notBefore);
562
+	}
563 563
 }
Please login to merge, or discard this patch.
src/Shell/QueueShell.php 1 patch
Indentation   +505 added lines, -505 removed lines patch added patch discarded remove patch
@@ -30,521 +30,521 @@
 block discarded – undo
30 30
 class QueueShell extends Shell
31 31
 {
32 32
 
33
-    /**
34
-     *
35
-     * @var string
36
-     */
37
-    public $modelClass = 'Queue.QueuedTasks';
38
-
39
-    /**
40
-     *
41
-     * @var array|null
42
-     */
43
-    protected $_taskConf;
44
-
45
-    /**
46
-     *
47
-     * @var int
48
-     */
49
-    protected $_time = 0;
50
-
51
-    /**
52
-     *
53
-     * @var bool
54
-     */
55
-    protected $_exit = false;
56
-
57
-    /**
58
-     * Overwrite shell initialize to dynamically load all Queue Related Tasks.
59
-     *
60
-     * @return void
61
-     */
62
-    public function initialize()
63
-    {
64
-        $taskFinder = new TaskFinder();
65
-        $this->tasks = $taskFinder->allAppAndPluginTasks();
66
-
67
-        parent::initialize();
68
-    }
69
-
70
-    /**
71
-     *
72
-     * @return void
73
-     */
74
-    public function startup()
75
-    {
76
-        if ($this->param('quiet')) {
77
-            $this->interactive = false;
78
-        }
79
-
80
-        parent::startup();
81
-    }
82
-
83
-    /**
84
-     *
85
-     * @return string
86
-     */
87
-    public function getDescription()
88
-    {
89
-        $tasks = [];
90
-        foreach ($this->taskNames as $loadedTask) {
91
-            $tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
92
-        }
93
-        $tasks = implode(PHP_EOL, $tasks);
94
-
95
-        $text = <<<TEXT
33
+	/**
34
+	 *
35
+	 * @var string
36
+	 */
37
+	public $modelClass = 'Queue.QueuedTasks';
38
+
39
+	/**
40
+	 *
41
+	 * @var array|null
42
+	 */
43
+	protected $_taskConf;
44
+
45
+	/**
46
+	 *
47
+	 * @var int
48
+	 */
49
+	protected $_time = 0;
50
+
51
+	/**
52
+	 *
53
+	 * @var bool
54
+	 */
55
+	protected $_exit = false;
56
+
57
+	/**
58
+	 * Overwrite shell initialize to dynamically load all Queue Related Tasks.
59
+	 *
60
+	 * @return void
61
+	 */
62
+	public function initialize()
63
+	{
64
+		$taskFinder = new TaskFinder();
65
+		$this->tasks = $taskFinder->allAppAndPluginTasks();
66
+
67
+		parent::initialize();
68
+	}
69
+
70
+	/**
71
+	 *
72
+	 * @return void
73
+	 */
74
+	public function startup()
75
+	{
76
+		if ($this->param('quiet')) {
77
+			$this->interactive = false;
78
+		}
79
+
80
+		parent::startup();
81
+	}
82
+
83
+	/**
84
+	 *
85
+	 * @return string
86
+	 */
87
+	public function getDescription()
88
+	{
89
+		$tasks = [];
90
+		foreach ($this->taskNames as $loadedTask) {
91
+			$tasks[] = "\t" . '* ' . $this->_taskName($loadedTask);
92
+		}
93
+		$tasks = implode(PHP_EOL, $tasks);
94
+
95
+		$text = <<<TEXT
96 96
 Simple and minimalistic job queue (or deferred-task) system.
97 97
 
98 98
 Available Tasks:
99 99
 $tasks
100 100
 TEXT;
101 101
 
102
-        return $text;
103
-    }
104
-
105
-    /**
106
-     * Look for a Queue Task of hte passed name and try to call add() on it.
107
-     * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
108
-     *
109
-     * @return void
110
-     */
111
-    public function add()
112
-    {
113
-        if (count($this->args) < 1) {
114
-            $this->out('Please call like this:');
115
-            $this->out('       bin/cake queue add <taskname>');
116
-            $this->_displayAvailableTasks();
117
-
118
-            return;
119
-        }
120
-
121
-        $name = Inflector::camelize($this->args[0]);
122
-        if (in_array('Queue' . $name, $this->taskNames, true)) {
123
-            /** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
124
-            $task = $this->{'Queue' . $name};
125
-            if (!($task instanceof AddInterface)) {
126
-                $this->abort('This task does not support adding via CLI call');
127
-            }
128
-            $task->add();
129
-        } else {
130
-            $this->out('Error: Task not found: ' . $name);
131
-            $this->_displayAvailableTasks();
132
-        }
133
-    }
134
-
135
-    /**
136
-     * Output the task without Queue or Task
137
-     * example: QueueImageTask becomes Image on display
138
-     *
139
-     * @param string $task Task name
140
-     * @return string Cleaned task name
141
-     */
142
-    protected function _taskName($task)
143
-    {
144
-        if (strpos($task, 'Queue') === 0) {
145
-            return substr($task, 5);
146
-        }
147
-
148
-        return $task;
149
-    }
150
-
151
-    /**
152
-     * Run a QueueWorker loop.
153
-     * Runs a Queue Worker process which will try to find unassigned jobs in the queue
154
-     * which it may run and try to fetch and execute them.
155
-     *
156
-     * @return void
157
-     */
158
-    public function runworker()
159
-    {
160
-        // Enable Garbage Collector (PHP >= 5.3)
161
-        if (function_exists('gc_enable')) {
162
-            gc_enable();
163
-        }
164
-        if (function_exists('pcntl_signal')) {
165
-            pcntl_signal(SIGTERM, [
166
-                &$this,
167
-                '_exit'
168
-            ]);
169
-            pcntl_signal(SIGINT, [
170
-                &$this,
171
-                '_exit'
172
-            ]);
173
-            pcntl_signal(SIGTSTP, [
174
-                &$this,
175
-                '_exit'
176
-            ]);
177
-            pcntl_signal(SIGQUIT, [
178
-                &$this,
179
-                '_exit'
180
-            ]);
181
-        }
182
-        $this->_exit = false;
183
-
184
-        $startTime = time();
185
-        $types = $this->_stringToArray($this->param('type'));
186
-
187
-        while (!$this->_exit) {
188
-            $this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
189
-
190
-            $QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
191
-
192
-            if ($QueuedTask) {
193
-                $this->runJob($QueuedTask);
194
-            } elseif (Configure::read('Queue.exitWhenNothingToDo')) {
195
-                $this->out(__d('queue', 'nothing to do, exiting.'));
196
-                $this->_exit = true;
197
-            } else {
198
-                $this->out(__d('queue', 'nothing to do, sleeping.'));
199
-                sleep(Config::sleepTime());
200
-            }
201
-
202
-            // check if we are over the maximum runtime and end processing if so.
203
-            if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
204
-                $this->_exit = true;
205
-                $this->out(__d('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.'));
206
-            }
207
-            if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
208
-                $this->out(__d('queue', 'Performing old job cleanup.'));
209
-                $this->QueuedTasks->cleanOldJobs();
210
-            }
211
-            $this->hr();
212
-        }
213
-
214
-        if ($this->param('verbose')) {
215
-            $this->_log('endworker');
216
-        }
217
-    }
218
-
219
-    /**
220
-     *
221
-     * @param \Queue\Model\Entity\QueuedTask $QueuedTask Queued task
222
-     * @return void
223
-     */
224
-    protected function runJob(QueuedTask $QueuedTask)
225
-    {
226
-        $this->out('Running Job of type "' . $QueuedTask->task . '"');
227
-        $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, null, false);
228
-        $taskName = 'Queue' . $QueuedTask->task;
229
-
230
-        try {
231
-            $this->_time = time();
232
-
233
-            $data = unserialize($QueuedTask->data);
234
-            /** @var \Queue\Shell\Task\QueueTask $task */
235
-            $task = $this->{$taskName};
236
-            if (!$task instanceof QueueTaskInterface) {
237
-                throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
238
-            }
239
-
240
-            $return = $task->run((array)$data, $QueuedTask->id);
241
-            if ($return !== null) {
242
-                trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
243
-            }
244
-            $failureMessage = $taskName . ' failed';
245
-        } catch (Throwable $e) {
246
-            $return = false;
247
-
248
-            $failureMessage = get_class($e) . ': ' . $e->getMessage();
249
-            if (!($e instanceof QueueException)) {
250
-                $failureMessage .= "\n" . $e->getTraceAsString();
251
-            }
252
-
253
-            $this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage);
254
-        } catch (Exception $e) {
255
-            $return = false;
256
-
257
-            $failureMessage = get_class($e) . ': ' . $e->getMessage();
258
-            $this->_logError($taskName . "\n" . $failureMessage);
259
-        }
260
-
261
-        if ($return === false) {
262
-            $this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
263
-            $failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
264
-            $this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus);
265
-            $this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
266
-
267
-            return;
268
-        }
269
-
270
-        $this->QueuedTasks->markJobDone($QueuedTask);
271
-        $this->out('Job Finished.');
272
-    }
273
-
274
-    /**
275
-     * Manually trigger a Finished job cleanup.
276
-     *
277
-     * @return void
278
-     */
279
-    public function clean()
280
-    {
281
-        if (!Configure::read('Queue.cleanupTimeout')) {
282
-            $this->abort('You disabled cleanuptimout in config. Aborting.');
283
-        }
284
-
285
-        $this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
286
-        $this->QueuedTasks->cleanOldJobs();
287
-    }
288
-
289
-    /**
290
-     * Display current settings
291
-     *
292
-     * @return void
293
-     */
294
-    public function settings()
295
-    {
296
-        $this->out('Current Settings:');
297
-        $conf = (array)Configure::read('Queue');
298
-        foreach ($conf as $key => $val) {
299
-            if ($val === false) {
300
-                $val = 'no';
301
-            }
302
-            if ($val === true) {
303
-                $val = 'yes';
304
-            }
305
-            $this->out('* ' . $key . ': ' . print_r($val, true));
306
-        }
307
-
308
-        $this->out();
309
-    }
310
-
311
-    /**
312
-     * Display some statistics about Finished Jobs.
313
-     *
314
-     * @return void
315
-     */
316
-    public function stats()
317
-    {
318
-        $this->out('Jobs currently in the queue:');
319
-
320
-        $types = $this->QueuedTasks->getTypes()->toArray();
321
-        foreach ($types as $type) {
322
-            $this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
323
-        }
324
-        $this->hr();
325
-        $this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
326
-        $this->hr();
327
-        $this->out('Finished job statistics:');
328
-        $data = $this->QueuedTasks->getStats();
329
-        foreach ($data as $item) {
330
-            $this->out(' ' . $item['task'] . ': ');
331
-            $this->out('   Finished Jobs in Database: ' . $item['num']);
332
-            $this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
333
-            $this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
334
-            $this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
335
-        }
336
-    }
337
-
338
-    /**
339
-     * Get option parser method to parse commandline options
340
-     *
341
-     * @return \Cake\Console\ConsoleOptionParser
342
-     */
343
-    public function getOptionParser()
344
-    {
345
-        $subcommandParser = [
346
-            'options' => [
347
-                /*
102
+		return $text;
103
+	}
104
+
105
+	/**
106
+	 * Look for a Queue Task of hte passed name and try to call add() on it.
107
+	 * A QueueTask may provide an add function to enable the user to create new jobs via commandline.
108
+	 *
109
+	 * @return void
110
+	 */
111
+	public function add()
112
+	{
113
+		if (count($this->args) < 1) {
114
+			$this->out('Please call like this:');
115
+			$this->out('       bin/cake queue add <taskname>');
116
+			$this->_displayAvailableTasks();
117
+
118
+			return;
119
+		}
120
+
121
+		$name = Inflector::camelize($this->args[0]);
122
+		if (in_array('Queue' . $name, $this->taskNames, true)) {
123
+			/** @var \Queue\Shell\Task\QueueTask|\Queue\Shell\Task\AddInterface $task */
124
+			$task = $this->{'Queue' . $name};
125
+			if (!($task instanceof AddInterface)) {
126
+				$this->abort('This task does not support adding via CLI call');
127
+			}
128
+			$task->add();
129
+		} else {
130
+			$this->out('Error: Task not found: ' . $name);
131
+			$this->_displayAvailableTasks();
132
+		}
133
+	}
134
+
135
+	/**
136
+	 * Output the task without Queue or Task
137
+	 * example: QueueImageTask becomes Image on display
138
+	 *
139
+	 * @param string $task Task name
140
+	 * @return string Cleaned task name
141
+	 */
142
+	protected function _taskName($task)
143
+	{
144
+		if (strpos($task, 'Queue') === 0) {
145
+			return substr($task, 5);
146
+		}
147
+
148
+		return $task;
149
+	}
150
+
151
+	/**
152
+	 * Run a QueueWorker loop.
153
+	 * Runs a Queue Worker process which will try to find unassigned jobs in the queue
154
+	 * which it may run and try to fetch and execute them.
155
+	 *
156
+	 * @return void
157
+	 */
158
+	public function runworker()
159
+	{
160
+		// Enable Garbage Collector (PHP >= 5.3)
161
+		if (function_exists('gc_enable')) {
162
+			gc_enable();
163
+		}
164
+		if (function_exists('pcntl_signal')) {
165
+			pcntl_signal(SIGTERM, [
166
+				&$this,
167
+				'_exit'
168
+			]);
169
+			pcntl_signal(SIGINT, [
170
+				&$this,
171
+				'_exit'
172
+			]);
173
+			pcntl_signal(SIGTSTP, [
174
+				&$this,
175
+				'_exit'
176
+			]);
177
+			pcntl_signal(SIGQUIT, [
178
+				&$this,
179
+				'_exit'
180
+			]);
181
+		}
182
+		$this->_exit = false;
183
+
184
+		$startTime = time();
185
+		$types = $this->_stringToArray($this->param('type'));
186
+
187
+		while (!$this->_exit) {
188
+			$this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);
189
+
190
+			$QueuedTask = $this->QueuedTasks->requestJob($this->_getTaskConf(), $types);
191
+
192
+			if ($QueuedTask) {
193
+				$this->runJob($QueuedTask);
194
+			} elseif (Configure::read('Queue.exitWhenNothingToDo')) {
195
+				$this->out(__d('queue', 'nothing to do, exiting.'));
196
+				$this->_exit = true;
197
+			} else {
198
+				$this->out(__d('queue', 'nothing to do, sleeping.'));
199
+				sleep(Config::sleepTime());
200
+			}
201
+
202
+			// check if we are over the maximum runtime and end processing if so.
203
+			if (Configure::readOrFail('Queue.workerMaxRuntime') && (time() - $startTime) >= Configure::readOrFail('Queue.workerMaxRuntime')) {
204
+				$this->_exit = true;
205
+				$this->out(__d('queue', 'Reached runtime of ' . (time() - $startTime) . ' Seconds (Max ' . Configure::readOrFail('Queue.workerMaxRuntime') . '), terminating.'));
206
+			}
207
+			if ($this->_exit || mt_rand(0, 100) > (100 - (int)Config::gcprob())) {
208
+				$this->out(__d('queue', 'Performing old job cleanup.'));
209
+				$this->QueuedTasks->cleanOldJobs();
210
+			}
211
+			$this->hr();
212
+		}
213
+
214
+		if ($this->param('verbose')) {
215
+			$this->_log('endworker');
216
+		}
217
+	}
218
+
219
+	/**
220
+	 *
221
+	 * @param \Queue\Model\Entity\QueuedTask $QueuedTask Queued task
222
+	 * @return void
223
+	 */
224
+	protected function runJob(QueuedTask $QueuedTask)
225
+	{
226
+		$this->out('Running Job of type "' . $QueuedTask->task . '"');
227
+		$this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id, null, false);
228
+		$taskName = 'Queue' . $QueuedTask->task;
229
+
230
+		try {
231
+			$this->_time = time();
232
+
233
+			$data = unserialize($QueuedTask->data);
234
+			/** @var \Queue\Shell\Task\QueueTask $task */
235
+			$task = $this->{$taskName};
236
+			if (!$task instanceof QueueTaskInterface) {
237
+				throw new RuntimeException('Task must implement ' . QueueTaskInterface::class);
238
+			}
239
+
240
+			$return = $task->run((array)$data, $QueuedTask->id);
241
+			if ($return !== null) {
242
+				trigger_error('run() should be void and throw exception in error case now.', E_USER_DEPRECATED);
243
+			}
244
+			$failureMessage = $taskName . ' failed';
245
+		} catch (Throwable $e) {
246
+			$return = false;
247
+
248
+			$failureMessage = get_class($e) . ': ' . $e->getMessage();
249
+			if (!($e instanceof QueueException)) {
250
+				$failureMessage .= "\n" . $e->getTraceAsString();
251
+			}
252
+
253
+			$this->_logError($taskName . ' (job ' . $QueuedTask->id . ')' . "\n" . $failureMessage);
254
+		} catch (Exception $e) {
255
+			$return = false;
256
+
257
+			$failureMessage = get_class($e) . ': ' . $e->getMessage();
258
+			$this->_logError($taskName . "\n" . $failureMessage);
259
+		}
260
+
261
+		if ($return === false) {
262
+			$this->QueuedTasks->markJobFailed($QueuedTask, $failureMessage);
263
+			$failedStatus = $this->QueuedTasks->getFailedStatus($QueuedTask, $this->_getTaskConf());
264
+			$this->_log('job ' . $QueuedTask->task . ', id ' . $QueuedTask->id . ' failed and ' . $failedStatus);
265
+			$this->out('Job did not finish, ' . $failedStatus . ' after try ' . $QueuedTask->failed . '.');
266
+
267
+			return;
268
+		}
269
+
270
+		$this->QueuedTasks->markJobDone($QueuedTask);
271
+		$this->out('Job Finished.');
272
+	}
273
+
274
+	/**
275
+	 * Manually trigger a Finished job cleanup.
276
+	 *
277
+	 * @return void
278
+	 */
279
+	public function clean()
280
+	{
281
+		if (!Configure::read('Queue.cleanupTimeout')) {
282
+			$this->abort('You disabled cleanuptimout in config. Aborting.');
283
+		}
284
+
285
+		$this->out('Deleting old jobs, that have finished before ' . date('Y-m-d H:i:s', time() - (int)Configure::read('Queue.cleanupTimeout')));
286
+		$this->QueuedTasks->cleanOldJobs();
287
+	}
288
+
289
+	/**
290
+	 * Display current settings
291
+	 *
292
+	 * @return void
293
+	 */
294
+	public function settings()
295
+	{
296
+		$this->out('Current Settings:');
297
+		$conf = (array)Configure::read('Queue');
298
+		foreach ($conf as $key => $val) {
299
+			if ($val === false) {
300
+				$val = 'no';
301
+			}
302
+			if ($val === true) {
303
+				$val = 'yes';
304
+			}
305
+			$this->out('* ' . $key . ': ' . print_r($val, true));
306
+		}
307
+
308
+		$this->out();
309
+	}
310
+
311
+	/**
312
+	 * Display some statistics about Finished Jobs.
313
+	 *
314
+	 * @return void
315
+	 */
316
+	public function stats()
317
+	{
318
+		$this->out('Jobs currently in the queue:');
319
+
320
+		$types = $this->QueuedTasks->getTypes()->toArray();
321
+		foreach ($types as $type) {
322
+			$this->out('      ' . str_pad($type, 20, ' ', STR_PAD_RIGHT) . ': ' . $this->QueuedTasks->getLength($type));
323
+		}
324
+		$this->hr();
325
+		$this->out('Total unfinished jobs: ' . $this->QueuedTasks->getLength());
326
+		$this->hr();
327
+		$this->out('Finished job statistics:');
328
+		$data = $this->QueuedTasks->getStats();
329
+		foreach ($data as $item) {
330
+			$this->out(' ' . $item['task'] . ': ');
331
+			$this->out('   Finished Jobs in Database: ' . $item['num']);
332
+			$this->out('   Average Job existence    : ' . str_pad(Number::precision($item['alltime']), 8, ' ', STR_PAD_LEFT) . 's');
333
+			$this->out('   Average Execution delay  : ' . str_pad(Number::precision($item['fetchdelay']), 8, ' ', STR_PAD_LEFT) . 's');
334
+			$this->out('   Average Execution time   : ' . str_pad(Number::precision($item['runtime']), 8, ' ', STR_PAD_LEFT) . 's');
335
+		}
336
+	}
337
+
338
+	/**
339
+	 * Get option parser method to parse commandline options
340
+	 *
341
+	 * @return \Cake\Console\ConsoleOptionParser
342
+	 */
343
+	public function getOptionParser()
344
+	{
345
+		$subcommandParser = [
346
+			'options' => [
347
+				/*
348 348
                  * 'dry-run'=> array(
349 349
                  * 'short' => 'd',
350 350
                  * 'help' => 'Dry run the update, no jobs will actually be added.',
351 351
                  * 'boolean' => true
352 352
                  * ),
353 353
                  */
354
-            ]
355
-        ];
356
-        $subcommandParserFull = $subcommandParser;
357
-        $subcommandParserFull['options']['type'] = [
358
-            'short' => 't',
359
-            'help' => 'Type (comma separated list possible)',
360
-            'default' => null
361
-        ];
362
-
363
-        return parent::getOptionParser()->setDescription($this->getDescription())
364
-            ->addSubcommand('clean', [
365
-                'help' => 'Remove old jobs (cleanup)',
366
-                'parser' => $subcommandParser
367
-            ])
368
-            ->addSubcommand('add', [
369
-                'help' => 'Add Job',
370
-                'parser' => $subcommandParser
371
-            ])
372
-            ->addSubcommand('stats', [
373
-                'help' => 'Stats',
374
-                'parser' => $subcommandParserFull
375
-            ])
376
-            ->addSubcommand('settings', [
377
-                'help' => 'Settings',
378
-                'parser' => $subcommandParserFull
379
-            ])
380
-            ->addSubcommand('runworker', [
381
-                'help' => 'Run Worker',
382
-                'parser' => $subcommandParserFull
383
-            ]);
384
-    }
385
-
386
-    /**
387
-     * Timestamped log.
388
-     *
389
-     * @param string $message Log type
390
-     * @param string|null $pid PID of the process
391
-     * @param bool $addDetails Details
392
-     * @return void
393
-     */
394
-    protected function _log($message, $pid = null, $addDetails = true)
395
-    {
396
-        if (!Configure::read('Queue.log')) {
397
-            return;
398
-        }
399
-
400
-        if ($addDetails) {
401
-            $timeNeeded = $this->_timeNeeded();
402
-            $memoryUsage = $this->_memoryUsage();
403
-            $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
404
-        }
405
-
406
-        if ($pid) {
407
-            $message .= ' (pid ' . $pid . ')';
408
-        }
409
-        Log::write('info', $message, [
410
-            'scope' => 'queue'
411
-        ]);
412
-    }
413
-
414
-    /**
415
-     *
416
-     * @param string $message Message
417
-     * @param string|null $pid PID of the process
418
-     * @return void
419
-     */
420
-    protected function _logError($message, $pid = null)
421
-    {
422
-        $timeNeeded = $this->_timeNeeded();
423
-        $memoryUsage = $this->_memoryUsage();
424
-        $message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
425
-
426
-        if ($pid) {
427
-            $message .= ' (pid ' . $pid . ')';
428
-        }
429
-
430
-        Log::write('error', $message);
431
-    }
432
-
433
-    /**
434
-     * Returns a List of available QueueTasks and their individual configurations.
435
-     *
436
-     * @return array
437
-     */
438
-    protected function _getTaskConf()
439
-    {
440
-        if (!is_array($this->_taskConf)) {
441
-            $this->_taskConf = [];
442
-            foreach ($this->tasks as $task) {
443
-                list ($pluginName, $taskName) = pluginSplit($task);
444
-
445
-                $this->_taskConf[$taskName]['name'] = substr($taskName, 5);
446
-                $this->_taskConf[$taskName]['plugin'] = $pluginName;
447
-                if (property_exists($this->{$taskName}, 'timeout')) {
448
-                    $this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
449
-                } else {
450
-                    $this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
451
-                }
452
-                if (property_exists($this->{$taskName}, 'retries')) {
453
-                    $this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
454
-                } else {
455
-                    $this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
456
-                }
457
-                if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
458
-                    $this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
459
-                } else {
460
-                    $this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
461
-                }
462
-            }
463
-        }
464
-
465
-        return $this->_taskConf;
466
-    }
467
-
468
-    /**
469
-     * Signal handling to queue worker for clean shutdown
470
-     *
471
-     * @param int $signal The signal
472
-     * @return void
473
-     */
474
-    protected function _exit($signal)
475
-    {
476
-        $this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
477
-        $this->_exit = true;
478
-    }
479
-
480
-    /**
481
-     *
482
-     * @return void
483
-     */
484
-    protected function _displayAvailableTasks()
485
-    {
486
-        $this->out('Available Tasks:');
487
-        foreach ($this->taskNames as $loadedTask) {
488
-            $this->out("\t" . '* ' . $this->_taskName($loadedTask));
489
-        }
490
-    }
491
-
492
-    /**
493
-     *
494
-     * @return string Memory usage in MB.
495
-     */
496
-    protected function _memoryUsage()
497
-    {
498
-        $limit = ini_get('memory_limit');
499
-
500
-        $used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
501
-        if ($limit !== '-1') {
502
-            $used .= '/' . $limit;
503
-        }
504
-
505
-        return $used;
506
-    }
507
-
508
-    /**
509
-     *
510
-     * @return string
511
-     */
512
-    protected function _timeNeeded()
513
-    {
514
-        $diff = $this->_time() - $this->_time($this->_time);
515
-        $seconds = max($diff, 1);
516
-
517
-        return $seconds . 's';
518
-    }
519
-
520
-    /**
521
-     *
522
-     * @param int|null $providedTime Provided time
523
-     *
524
-     * @return int
525
-     */
526
-    protected function _time($providedTime = null)
527
-    {
528
-        if ($providedTime !== null) {
529
-            return $providedTime;
530
-        }
531
-
532
-        return time();
533
-    }
534
-
535
-    /**
536
-     *
537
-     * @param string|null $param String to convert
538
-     * @return array
539
-     */
540
-    protected function _stringToArray($param)
541
-    {
542
-        if (!$param) {
543
-            return [];
544
-        }
545
-
546
-        $array = Text::tokenize($param);
547
-
548
-        return array_filter($array);
549
-    }
354
+			]
355
+		];
356
+		$subcommandParserFull = $subcommandParser;
357
+		$subcommandParserFull['options']['type'] = [
358
+			'short' => 't',
359
+			'help' => 'Type (comma separated list possible)',
360
+			'default' => null
361
+		];
362
+
363
+		return parent::getOptionParser()->setDescription($this->getDescription())
364
+			->addSubcommand('clean', [
365
+				'help' => 'Remove old jobs (cleanup)',
366
+				'parser' => $subcommandParser
367
+			])
368
+			->addSubcommand('add', [
369
+				'help' => 'Add Job',
370
+				'parser' => $subcommandParser
371
+			])
372
+			->addSubcommand('stats', [
373
+				'help' => 'Stats',
374
+				'parser' => $subcommandParserFull
375
+			])
376
+			->addSubcommand('settings', [
377
+				'help' => 'Settings',
378
+				'parser' => $subcommandParserFull
379
+			])
380
+			->addSubcommand('runworker', [
381
+				'help' => 'Run Worker',
382
+				'parser' => $subcommandParserFull
383
+			]);
384
+	}
385
+
386
+	/**
387
+	 * Timestamped log.
388
+	 *
389
+	 * @param string $message Log type
390
+	 * @param string|null $pid PID of the process
391
+	 * @param bool $addDetails Details
392
+	 * @return void
393
+	 */
394
+	protected function _log($message, $pid = null, $addDetails = true)
395
+	{
396
+		if (!Configure::read('Queue.log')) {
397
+			return;
398
+		}
399
+
400
+		if ($addDetails) {
401
+			$timeNeeded = $this->_timeNeeded();
402
+			$memoryUsage = $this->_memoryUsage();
403
+			$message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
404
+		}
405
+
406
+		if ($pid) {
407
+			$message .= ' (pid ' . $pid . ')';
408
+		}
409
+		Log::write('info', $message, [
410
+			'scope' => 'queue'
411
+		]);
412
+	}
413
+
414
+	/**
415
+	 *
416
+	 * @param string $message Message
417
+	 * @param string|null $pid PID of the process
418
+	 * @return void
419
+	 */
420
+	protected function _logError($message, $pid = null)
421
+	{
422
+		$timeNeeded = $this->_timeNeeded();
423
+		$memoryUsage = $this->_memoryUsage();
424
+		$message .= ' [' . $timeNeeded . ', ' . $memoryUsage . ']';
425
+
426
+		if ($pid) {
427
+			$message .= ' (pid ' . $pid . ')';
428
+		}
429
+
430
+		Log::write('error', $message);
431
+	}
432
+
433
+	/**
434
+	 * Returns a List of available QueueTasks and their individual configurations.
435
+	 *
436
+	 * @return array
437
+	 */
438
+	protected function _getTaskConf()
439
+	{
440
+		if (!is_array($this->_taskConf)) {
441
+			$this->_taskConf = [];
442
+			foreach ($this->tasks as $task) {
443
+				list ($pluginName, $taskName) = pluginSplit($task);
444
+
445
+				$this->_taskConf[$taskName]['name'] = substr($taskName, 5);
446
+				$this->_taskConf[$taskName]['plugin'] = $pluginName;
447
+				if (property_exists($this->{$taskName}, 'timeout')) {
448
+					$this->_taskConf[$taskName]['timeout'] = $this->{$taskName}->timeout;
449
+				} else {
450
+					$this->_taskConf[$taskName]['timeout'] = Config::defaultWorkerTimeout();
451
+				}
452
+				if (property_exists($this->{$taskName}, 'retries')) {
453
+					$this->_taskConf[$taskName]['retries'] = $this->{$taskName}->retries;
454
+				} else {
455
+					$this->_taskConf[$taskName]['retries'] = Config::defaultWorkerRetries();
456
+				}
457
+				if (property_exists($this->{$taskName}, 'cleanupTimeout')) {
458
+					$this->_taskConf[$taskName]['cleanupTimeout'] = $this->{$taskName}->cleanupTimeout;
459
+				} else {
460
+					$this->_taskConf[$taskName]['cleanupTimeout'] = Config::cleanupTimeout();
461
+				}
462
+			}
463
+		}
464
+
465
+		return $this->_taskConf;
466
+	}
467
+
468
+	/**
469
+	 * Signal handling to queue worker for clean shutdown
470
+	 *
471
+	 * @param int $signal The signal
472
+	 * @return void
473
+	 */
474
+	protected function _exit($signal)
475
+	{
476
+		$this->out(__d('queue', 'Caught %d signal, exiting.', $signal));
477
+		$this->_exit = true;
478
+	}
479
+
480
+	/**
481
+	 *
482
+	 * @return void
483
+	 */
484
+	protected function _displayAvailableTasks()
485
+	{
486
+		$this->out('Available Tasks:');
487
+		foreach ($this->taskNames as $loadedTask) {
488
+			$this->out("\t" . '* ' . $this->_taskName($loadedTask));
489
+		}
490
+	}
491
+
492
+	/**
493
+	 *
494
+	 * @return string Memory usage in MB.
495
+	 */
496
+	protected function _memoryUsage()
497
+	{
498
+		$limit = ini_get('memory_limit');
499
+
500
+		$used = number_format(memory_get_peak_usage(true) / (1024 * 1024), 0) . 'MB';
501
+		if ($limit !== '-1') {
502
+			$used .= '/' . $limit;
503
+		}
504
+
505
+		return $used;
506
+	}
507
+
508
+	/**
509
+	 *
510
+	 * @return string
511
+	 */
512
+	protected function _timeNeeded()
513
+	{
514
+		$diff = $this->_time() - $this->_time($this->_time);
515
+		$seconds = max($diff, 1);
516
+
517
+		return $seconds . 's';
518
+	}
519
+
520
+	/**
521
+	 *
522
+	 * @param int|null $providedTime Provided time
523
+	 *
524
+	 * @return int
525
+	 */
526
+	protected function _time($providedTime = null)
527
+	{
528
+		if ($providedTime !== null) {
529
+			return $providedTime;
530
+		}
531
+
532
+		return time();
533
+	}
534
+
535
+	/**
536
+	 *
537
+	 * @param string|null $param String to convert
538
+	 * @return array
539
+	 */
540
+	protected function _stringToArray($param)
541
+	{
542
+		if (!$param) {
543
+			return [];
544
+		}
545
+
546
+		$array = Text::tokenize($param);
547
+
548
+		return array_filter($array);
549
+	}
550 550
 }
Please login to merge, or discard this patch.