1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Controller; |
4
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Doctrine\DoctrineJobManager; |
6
|
|
|
use Dtc\QueueBundle\Exception\UnsupportedException; |
7
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
8
|
|
|
use Dtc\QueueBundle\Model\Worker; |
9
|
|
|
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method; |
10
|
|
|
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route; |
11
|
|
|
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Template; |
12
|
|
|
use Symfony\Bundle\FrameworkBundle\Controller\Controller; |
13
|
|
|
use Symfony\Component\HttpFoundation\Request; |
14
|
|
|
use Symfony\Component\HttpFoundation\StreamedResponse; |
15
|
|
|
|
16
|
|
|
class QueueController extends Controller |
17
|
|
|
{ |
18
|
|
|
use ControllerTrait; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* Summary stats. |
22
|
|
|
* |
23
|
|
|
* @Route("/") |
24
|
|
|
* @Route("/status/") |
25
|
|
|
* @Template("@DtcQueue/Queue/status.html.twig") |
26
|
|
|
*/ |
27
|
1 |
|
public function statusAction() |
28
|
|
|
{ |
29
|
1 |
|
$params = []; |
30
|
1 |
|
$jobManager = $this->get('dtc_queue.manager.job'); |
31
|
|
|
|
32
|
1 |
|
$params['status'] = $jobManager->getStatus(); |
33
|
1 |
|
$this->addCssJs($params); |
34
|
|
|
|
35
|
1 |
|
return $params; |
36
|
|
|
} |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* List jobs in system by default. |
40
|
|
|
* |
41
|
|
|
* @Route("/jobs_all", name="dtc_queue_jobs_all") |
42
|
|
|
* |
43
|
|
|
* @throws UnsupportedException|\Exception |
44
|
|
|
*/ |
45
|
1 |
|
public function jobsAllAction() |
46
|
|
|
{ |
47
|
1 |
|
$this->validateManagerType('dtc_queue.manager.job'); |
48
|
1 |
|
$this->checkDtcGridBundle(); |
49
|
|
|
|
50
|
1 |
|
$class1 = $this->container->getParameter('dtc_queue.class.job'); |
51
|
1 |
|
$class2 = $this->container->getParameter('dtc_queue.class.job_archive'); |
52
|
1 |
|
$label1 = 'Non-Archived Jobs'; |
53
|
1 |
|
$label2 = 'Archived Jobs'; |
54
|
|
|
|
55
|
1 |
|
$params = $this->getDualGridParams($class1, $class2, $label1, $label2); |
56
|
|
|
|
57
|
1 |
|
return $this->render('@DtcQueue/Queue/grid.html.twig', $params); |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @Route("/archive", name="dtc_queue_archive") |
62
|
|
|
* @Method({"POST"}) |
63
|
|
|
* |
64
|
|
|
* @throws UnsupportedException |
65
|
|
|
*/ |
66
|
1 |
|
public function archiveAction(Request $request) |
67
|
|
|
{ |
68
|
1 |
|
return $this->streamResults($request, 'archiveAllJobs'); |
69
|
|
|
} |
70
|
|
|
|
71
|
|
|
/** |
72
|
|
|
* @Route("/reset-stalled", name="dtc_queue_reset_stalled") |
73
|
|
|
* |
74
|
|
|
* @return StreamedResponse |
75
|
|
|
* |
76
|
|
|
* @throws UnsupportedException |
77
|
|
|
*/ |
78
|
|
|
public function resetStalledAction(Request $request) |
79
|
|
|
{ |
80
|
|
|
return $this->streamResults($request, 'resetStalledJobs'); |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* @Route("/prune-stalled", name="dtc_queue_prune_stalled") |
85
|
|
|
* |
86
|
|
|
* @return StreamedResponse |
87
|
|
|
* |
88
|
|
|
* @throws UnsupportedException |
89
|
|
|
*/ |
90
|
|
|
public function pruneStalledAction(Request $request) |
91
|
|
|
{ |
92
|
|
|
return $this->streamResults($request, 'pruneStalledJobs'); |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
/** |
96
|
|
|
* @param $functionName |
97
|
|
|
* |
98
|
|
|
* @return StreamedResponse |
99
|
|
|
* |
100
|
|
|
* @throws UnsupportedException |
101
|
|
|
*/ |
102
|
1 |
|
protected function streamResults(Request $request, $functionName) |
103
|
|
|
{ |
104
|
1 |
|
$jobManager = $this->get('dtc_queue.manager.job'); |
105
|
1 |
|
if (!$jobManager instanceof DoctrineJobManager) { |
106
|
|
|
throw new UnsupportedException('$jobManager must be instance of '.DoctrineJobManager::class); |
107
|
|
|
} |
108
|
|
|
|
109
|
1 |
|
$streamingResponse = new StreamedResponse($this->getStreamFunction($request, $functionName)); |
110
|
1 |
|
$streamingResponse->headers->set('Content-Type', 'application/x-ndjson'); |
111
|
1 |
|
$streamingResponse->headers->set('X-Accel-Buffering', 'no'); |
112
|
|
|
|
113
|
1 |
|
return $streamingResponse; |
114
|
|
|
} |
115
|
|
|
|
116
|
|
|
/** |
117
|
|
|
* @param string $functionName |
118
|
|
|
* |
119
|
|
|
* @return \Closure |
120
|
|
|
*/ |
121
|
1 |
|
protected function getStreamFunction(Request $request, $functionName) |
122
|
|
|
{ |
123
|
1 |
|
$jobManager = $this->get('dtc_queue.manager.job'); |
124
|
1 |
|
$workerName = $request->get('workerName'); |
125
|
1 |
|
$methodName = $request->get('method'); |
126
|
1 |
|
$total = null; |
127
|
1 |
|
$callback = function ($count, $totalCount) use (&$total) { |
128
|
|
|
if (null !== $totalCount && null === $total) { |
129
|
|
|
$total = $totalCount; |
130
|
|
|
echo json_encode(['total' => $total]); |
131
|
|
|
echo "\n"; |
132
|
|
|
flush(); |
133
|
|
|
|
134
|
|
|
return; |
135
|
|
|
} |
136
|
|
|
echo json_encode(['count' => $count]); |
137
|
|
|
echo "\n"; |
138
|
|
|
flush(); |
139
|
1 |
|
}; |
140
|
|
|
|
141
|
1 |
|
return function () use ($jobManager, $callback, $workerName, $methodName, $functionName, &$total) { |
142
|
|
|
switch ($functionName) { |
143
|
|
|
case 'archiveAllJobs': |
144
|
|
|
$total = $jobManager->countLiveJobs($workerName, $methodName); |
145
|
|
|
echo json_encode(['total' => $total]); |
146
|
|
|
echo "\n"; |
147
|
|
|
flush(); |
148
|
|
|
if ($total > 0) { |
149
|
|
|
$jobManager->archiveAllJobs($workerName, $methodName, $callback); |
150
|
|
|
} |
151
|
|
|
break; |
152
|
|
|
default: |
153
|
|
|
$jobManager->$functionName($workerName, $methodName, $callback); |
154
|
|
|
break; |
155
|
|
|
} |
156
|
1 |
|
}; |
157
|
|
|
} |
158
|
|
|
|
159
|
|
|
/** |
160
|
|
|
* List jobs in system by default. |
161
|
|
|
* |
162
|
|
|
* @Template("@DtcQueue/Queue/jobs.html.twig") |
163
|
|
|
* @Route("/jobs", name="dtc_queue_jobs") |
164
|
|
|
* |
165
|
|
|
* @throws UnsupportedException|\Exception |
166
|
|
|
*/ |
167
|
1 |
|
public function jobsAction() |
168
|
|
|
{ |
169
|
1 |
|
$this->validateManagerType('dtc_queue.manager.job'); |
170
|
1 |
|
$this->checkDtcGridBundle(); |
171
|
1 |
|
$managerType = $this->container->getParameter('dtc_queue.manager.job'); |
172
|
1 |
|
$rendererFactory = $this->get('dtc_grid.renderer.factory'); |
173
|
1 |
|
$renderer = $rendererFactory->create('datatables'); |
174
|
1 |
|
$gridSource = $this->get('dtc_queue.grid_source.jobs_waiting.'.('mongodb' === $managerType ? 'odm' : $managerType)); |
175
|
1 |
|
$renderer->bind($gridSource); |
176
|
1 |
|
$params = $renderer->getParams(); |
177
|
1 |
|
$this->addCssJs($params); |
178
|
|
|
|
179
|
1 |
|
$params['worker_methods'] = $this->get('dtc_queue.manager.job')->getWorkersAndMethods(); |
180
|
1 |
|
$params['prompt_message'] = 'This will archive all non-running jobs'; |
181
|
|
|
|
182
|
1 |
|
return $params; |
183
|
|
|
} |
184
|
|
|
|
185
|
|
|
/** |
186
|
|
|
* List jobs in system by default. |
187
|
|
|
* |
188
|
|
|
* @Template("@DtcQueue/Queue/jobs_running.html.twig") |
189
|
|
|
* @Route("/jobs_running", name="dtc_queue_jobs_running") |
190
|
|
|
* |
191
|
|
|
* @throws UnsupportedException|\Exception |
192
|
|
|
*/ |
193
|
1 |
|
public function runningJobsAction() |
194
|
|
|
{ |
195
|
1 |
|
$this->validateManagerType('dtc_queue.manager.job'); |
196
|
1 |
|
$this->checkDtcGridBundle(); |
197
|
1 |
|
$managerType = $this->container->getParameter('dtc_queue.manager.job'); |
198
|
1 |
|
$rendererFactory = $this->get('dtc_grid.renderer.factory'); |
199
|
1 |
|
$renderer = $rendererFactory->create('datatables'); |
200
|
1 |
|
$gridSource = $this->get('dtc_queue.grid_source.jobs_running.'.('mongodb' === $managerType ? 'odm' : $managerType)); |
201
|
1 |
|
$renderer->bind($gridSource); |
202
|
1 |
|
$params = $renderer->getParams(); |
203
|
1 |
|
$this->addCssJs($params); |
204
|
|
|
|
205
|
1 |
|
$params['worker_methods'] = $this->get('dtc_queue.manager.job')->getWorkersAndMethods(BaseJob::STATUS_RUNNING); |
206
|
1 |
|
$params['prompt_message'] = 'This will prune all stalled jobs'; |
207
|
|
|
|
208
|
1 |
|
return $params; |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
/** |
212
|
|
|
* @param string $class1 |
213
|
|
|
* @param string $class2 |
214
|
|
|
* @param string $label1 |
215
|
|
|
* @param string $label2 |
216
|
|
|
* |
217
|
|
|
* @return array |
218
|
|
|
* |
219
|
|
|
* @throws \Exception |
220
|
|
|
*/ |
221
|
2 |
|
protected function getDualGridParams($class1, $class2, $label1, $label2) |
222
|
|
|
{ |
223
|
2 |
|
$rendererFactory = $this->get('dtc_grid.renderer.factory'); |
224
|
2 |
|
$renderer = $rendererFactory->create('datatables'); |
225
|
2 |
|
$gridSource = $this->get('dtc_grid.manager.source')->get($class1); |
226
|
2 |
|
$renderer->bind($gridSource); |
227
|
2 |
|
$params = $renderer->getParams(); |
228
|
|
|
|
229
|
2 |
|
$renderer2 = $rendererFactory->create('datatables'); |
230
|
2 |
|
$gridSource = $this->get('dtc_grid.manager.source')->get($class2); |
231
|
2 |
|
$renderer2->bind($gridSource); |
232
|
2 |
|
$params2 = $renderer2->getParams(); |
233
|
|
|
|
234
|
2 |
|
$params['archive_grid'] = $params2['dtc_grid']; |
235
|
|
|
|
236
|
2 |
|
$params['dtc_queue_grid_label1'] = $label1; |
237
|
2 |
|
$params['dtc_queue_grid_label2'] = $label2; |
238
|
2 |
|
$this->addCssJs($params); |
239
|
|
|
|
240
|
2 |
|
return $params; |
241
|
|
|
} |
242
|
|
|
|
243
|
|
|
/** |
244
|
|
|
* List jobs in system by default. |
245
|
|
|
* |
246
|
|
|
* @Route("/runs", name="dtc_queue_runs") |
247
|
|
|
* |
248
|
|
|
* @throws UnsupportedException|\Exception |
249
|
|
|
*/ |
250
|
1 |
|
public function runsAction() |
251
|
|
|
{ |
252
|
1 |
|
$this->validateRunManager(); |
253
|
1 |
|
$this->checkDtcGridBundle(); |
254
|
1 |
|
$class1 = $this->container->getParameter('dtc_queue.class.run'); |
255
|
1 |
|
$class2 = $this->container->getParameter('dtc_queue.class.run_archive'); |
256
|
1 |
|
$label1 = 'Live Runs'; |
257
|
1 |
|
$label2 = 'Archived Runs'; |
258
|
|
|
|
259
|
1 |
|
$params = $this->getDualGridParams($class1, $class2, $label1, $label2); |
260
|
|
|
|
261
|
1 |
|
return $this->render('@DtcQueue/Queue/grid.html.twig', $params); |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
/** |
265
|
|
|
* List registered workers in the system. |
266
|
|
|
* |
267
|
|
|
* @Route("/workers", name="dtc_queue_workers") |
268
|
|
|
* @Template("@DtcQueue/Queue/workers.html.twig") |
269
|
|
|
*/ |
270
|
1 |
|
public function workersAction() |
271
|
|
|
{ |
272
|
1 |
|
$workerManager = $this->get('dtc_queue.manager.worker'); |
273
|
1 |
|
$workers = $workerManager->getWorkers(); |
274
|
|
|
|
275
|
1 |
|
$workerList = []; |
276
|
1 |
|
foreach ($workers as $workerName => $worker) { |
277
|
|
|
/* @var Worker $worker */ |
|
|
|
|
278
|
|
|
$workerList[$workerName] = get_class($worker); |
279
|
|
|
} |
280
|
1 |
|
$params = ['workers' => $workerList]; |
281
|
1 |
|
$this->addCssJs($params); |
282
|
|
|
|
283
|
1 |
|
return $params; |
284
|
|
|
} |
285
|
|
|
|
286
|
|
|
/** |
287
|
|
|
* Validates that DtcGridBundle exists. |
288
|
|
|
* |
289
|
|
|
* @throws UnsupportedException |
290
|
|
|
*/ |
291
|
4 |
|
protected function checkDtcGridBundle() |
292
|
|
|
{ |
293
|
4 |
|
if (!class_exists('Dtc\GridBundle\DtcGridBundle')) { |
294
|
|
|
throw new UnsupportedException('DtcGridBundle (mmucklo/grid-bundle) needs to be installed.'); |
295
|
|
|
} |
296
|
4 |
|
} |
297
|
|
|
} |
298
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.