1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Controller; |
4
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Doctrine\DoctrineJobManager; |
6
|
|
|
use Dtc\QueueBundle\Exception\UnsupportedException; |
7
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
8
|
|
|
use Dtc\QueueBundle\Model\Worker; |
9
|
|
|
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method; |
10
|
|
|
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route; |
11
|
|
|
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Template; |
12
|
|
|
use Symfony\Bundle\FrameworkBundle\Controller\Controller; |
13
|
|
|
use Symfony\Component\HttpFoundation\Request; |
14
|
|
|
use Symfony\Component\HttpFoundation\StreamedResponse; |
15
|
|
|
|
16
|
|
|
class QueueController extends Controller |
17
|
|
|
{ |
18
|
|
|
use ControllerTrait; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* Summary stats. |
22
|
|
|
* |
23
|
|
|
* @Route("/") |
24
|
|
|
* @Route("/status/") |
25
|
|
|
* @Template("@DtcQueue/Queue/status.html.twig") |
26
|
|
|
*/ |
27
|
1 |
|
public function statusAction() |
28
|
|
|
{ |
29
|
1 |
|
$params = array(); |
30
|
1 |
|
$jobManager = $this->get('dtc_queue.manager.job'); |
31
|
|
|
|
32
|
1 |
|
$params['status'] = $jobManager->getStatus(); |
33
|
1 |
|
$this->addCssJs($params); |
34
|
|
|
|
35
|
1 |
|
return $params; |
36
|
|
|
} |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* List jobs in system by default. |
40
|
|
|
* |
41
|
|
|
* @Route("/jobs_all", name="dtc_queue_jobs_all") |
42
|
|
|
* |
43
|
|
|
* @throws UnsupportedException |
44
|
|
|
*/ |
45
|
1 |
View Code Duplication |
public function jobsAllAction() |
|
|
|
|
46
|
|
|
{ |
47
|
1 |
|
$this->validateManagerType('dtc_queue.manager.job'); |
48
|
1 |
|
$class1 = $this->container->getParameter('dtc_queue.class.job'); |
49
|
1 |
|
$class2 = $this->container->getParameter('dtc_queue.class.job_archive'); |
50
|
1 |
|
$label1 = 'Non-Archived Jobs'; |
51
|
1 |
|
$label2 = 'Archived Jobs'; |
52
|
|
|
|
53
|
1 |
|
$params = $this->getDualGridParams($class1, $class2, $label1, $label2); |
54
|
|
|
|
55
|
1 |
|
return $this->render('@DtcQueue/Queue/grid.html.twig', $params); |
56
|
|
|
} |
57
|
|
|
|
58
|
|
|
/** |
59
|
|
|
* @Route("/archive", name="dtc_queue_archive") |
60
|
|
|
* @Method({"POST"}) |
61
|
|
|
* |
62
|
|
|
* @throws UnsupportedException |
63
|
|
|
*/ |
64
|
1 |
|
public function archiveAction(Request $request) |
65
|
|
|
{ |
66
|
1 |
|
return $this->streamResults($request, 'archiveAllJobs'); |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* @Route("/reset-stalled", name="dtc_queue_reset_stalled") |
71
|
|
|
* |
72
|
|
|
* @param Request $request |
73
|
|
|
* |
74
|
|
|
* @return StreamedResponse |
75
|
|
|
* |
76
|
|
|
* @throws UnsupportedException |
77
|
|
|
*/ |
78
|
|
|
public function resetStalledAction(Request $request) |
79
|
|
|
{ |
80
|
|
|
return $this->streamResults($request, 'resetStalledJobs'); |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* @Route("/prune-stalled", name="dtc_queue_prune_stalled") |
85
|
|
|
* |
86
|
|
|
* @param Request $request |
87
|
|
|
* |
88
|
|
|
* @return StreamedResponse |
89
|
|
|
* |
90
|
|
|
* @throws UnsupportedException |
91
|
|
|
*/ |
92
|
|
|
public function pruneStalledAction(Request $request) |
93
|
|
|
{ |
94
|
|
|
return $this->streamResults($request, 'pruneStalledJobs'); |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
/** |
98
|
|
|
* @param Request $request |
99
|
|
|
* @param $functionName |
100
|
|
|
* |
101
|
|
|
* @return StreamedResponse |
102
|
|
|
* |
103
|
|
|
* @throws UnsupportedException |
104
|
|
|
*/ |
105
|
1 |
|
protected function streamResults(Request $request, $functionName) |
106
|
|
|
{ |
107
|
1 |
|
$jobManager = $this->get('dtc_queue.manager.job'); |
108
|
1 |
|
if (!$jobManager instanceof DoctrineJobManager) { |
109
|
|
|
throw new UnsupportedException('$jobManager must be instance of '.DoctrineJobManager::class); |
110
|
|
|
} |
111
|
|
|
|
112
|
1 |
|
$streamingResponse = new StreamedResponse($this->getStreamFunction($request, $functionName)); |
113
|
1 |
|
$streamingResponse->headers->set('Content-Type', 'application/x-ndjson'); |
114
|
1 |
|
$streamingResponse->headers->set('X-Accel-Buffering', 'no'); |
115
|
|
|
|
116
|
1 |
|
return $streamingResponse; |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
/** |
120
|
|
|
* @param Request $request |
121
|
|
|
* @param string $functionName |
122
|
|
|
* |
123
|
|
|
* @return \Closure |
124
|
|
|
*/ |
125
|
1 |
|
protected function getStreamFunction(Request $request, $functionName) |
126
|
|
|
{ |
127
|
1 |
|
$jobManager = $this->get('dtc_queue.manager.job'); |
128
|
1 |
|
$workerName = $request->get('workerName'); |
129
|
1 |
|
$methodName = $request->get('method'); |
130
|
1 |
|
$total = null; |
131
|
1 |
|
$callback = function ($count, $totalCount) use (&$total) { |
132
|
|
|
if (null !== $totalCount && null === $total) { |
133
|
|
|
$total = $totalCount; |
134
|
|
|
echo json_encode(['total' => $total]); |
135
|
|
|
echo "\n"; |
136
|
|
|
flush(); |
137
|
|
|
|
138
|
|
|
return; |
139
|
|
|
} |
140
|
|
|
echo json_encode(['count' => $count]); |
141
|
|
|
echo "\n"; |
142
|
|
|
flush(); |
143
|
1 |
|
}; |
144
|
|
|
|
145
|
1 |
|
return function () use ($jobManager, $callback, $workerName, $methodName, $functionName, &$total) { |
146
|
|
|
switch ($functionName) { |
147
|
|
|
case 'archiveAllJobs': |
148
|
|
|
$total = $jobManager->countLiveJobs($workerName, $methodName); |
149
|
|
|
echo json_encode(['total' => $total]); |
150
|
|
|
echo "\n"; |
151
|
|
|
flush(); |
152
|
|
|
if ($total > 0) { |
153
|
|
|
$jobManager->archiveAllJobs($workerName, $methodName, $callback); |
154
|
|
|
} |
155
|
|
|
break; |
156
|
|
|
default: |
157
|
|
|
$jobManager->$functionName($workerName, $methodName, $callback); |
158
|
|
|
break; |
159
|
|
|
} |
160
|
1 |
|
}; |
161
|
|
|
} |
162
|
|
|
|
163
|
|
|
/** |
164
|
|
|
* List jobs in system by default. |
165
|
|
|
* |
166
|
|
|
* @Template("@DtcQueue/Queue/jobs.html.twig") |
167
|
|
|
* @Route("/jobs", name="dtc_queue_jobs") |
168
|
|
|
* |
169
|
|
|
* @throws UnsupportedException |
170
|
|
|
*/ |
171
|
1 |
View Code Duplication |
public function jobsAction() |
|
|
|
|
172
|
|
|
{ |
173
|
1 |
|
$this->validateManagerType('dtc_queue.manager.job'); |
174
|
1 |
|
$managerType = $this->container->getParameter('dtc_queue.manager.job'); |
175
|
1 |
|
$rendererFactory = $this->get('dtc_grid.renderer.factory'); |
176
|
1 |
|
$renderer = $rendererFactory->create('datatables'); |
177
|
1 |
|
$gridSource = $this->get('dtc_queue.grid_source.jobs_waiting.'.('mongodb' === $managerType ? 'odm' : $managerType)); |
178
|
1 |
|
$renderer->bind($gridSource); |
179
|
1 |
|
$params = $renderer->getParams(); |
180
|
1 |
|
$this->addCssJs($params); |
181
|
|
|
|
182
|
1 |
|
$params['worker_methods'] = $this->get('dtc_queue.manager.job')->getWorkersAndMethods(); |
183
|
1 |
|
$params['prompt_message'] = 'This will archive all non-running jobs'; |
184
|
|
|
|
185
|
1 |
|
return $params; |
186
|
|
|
} |
187
|
|
|
|
188
|
|
|
/** |
189
|
|
|
* List jobs in system by default. |
190
|
|
|
* |
191
|
|
|
* @Template("@DtcQueue/Queue/jobs_running.html.twig") |
192
|
|
|
* @Route("/jobs_running", name="dtc_queue_jobs_running") |
193
|
|
|
*/ |
194
|
1 |
View Code Duplication |
public function runningJobsAction() |
|
|
|
|
195
|
|
|
{ |
196
|
1 |
|
$this->validateManagerType('dtc_queue.manager.job'); |
197
|
1 |
|
$managerType = $this->container->getParameter('dtc_queue.manager.job'); |
198
|
1 |
|
$rendererFactory = $this->get('dtc_grid.renderer.factory'); |
199
|
1 |
|
$renderer = $rendererFactory->create('datatables'); |
200
|
1 |
|
$gridSource = $this->get('dtc_queue.grid_source.jobs_running.'.('mongodb' === $managerType ? 'odm' : $managerType)); |
201
|
1 |
|
$renderer->bind($gridSource); |
202
|
1 |
|
$params = $renderer->getParams(); |
203
|
1 |
|
$this->addCssJs($params); |
204
|
|
|
|
205
|
1 |
|
$params['worker_methods'] = $this->get('dtc_queue.manager.job')->getWorkersAndMethods(BaseJob::STATUS_RUNNING); |
206
|
1 |
|
$params['prompt_message'] = 'This will prune all stalled jobs'; |
207
|
|
|
|
208
|
1 |
|
return $params; |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
/** |
212
|
|
|
* @param string $class1 |
213
|
|
|
* @param string $class2 |
214
|
|
|
* @param string $label1 |
215
|
|
|
* @param string $label2 |
216
|
|
|
* |
217
|
|
|
* @return \Symfony\Component\HttpFoundation\Response |
218
|
|
|
* |
219
|
|
|
* @throws \Exception |
220
|
|
|
*/ |
221
|
2 |
|
protected function getDualGridParams($class1, $class2, $label1, $label2) |
222
|
|
|
{ |
223
|
2 |
|
$rendererFactory = $this->get('dtc_grid.renderer.factory'); |
224
|
2 |
|
$renderer = $rendererFactory->create('datatables'); |
225
|
2 |
|
$gridSource = $this->get('dtc_grid.manager.source')->get($class1); |
226
|
2 |
|
$renderer->bind($gridSource); |
227
|
2 |
|
$params = $renderer->getParams(); |
228
|
|
|
|
229
|
2 |
|
$renderer2 = $rendererFactory->create('datatables'); |
230
|
2 |
|
$gridSource = $this->get('dtc_grid.manager.source')->get($class2); |
231
|
2 |
|
$renderer2->bind($gridSource); |
232
|
2 |
|
$params2 = $renderer2->getParams(); |
233
|
|
|
|
234
|
2 |
|
$params['archive_grid'] = $params2['dtc_grid']; |
235
|
|
|
|
236
|
2 |
|
$params['dtc_queue_grid_label1'] = $label1; |
237
|
2 |
|
$params['dtc_queue_grid_label2'] = $label2; |
238
|
2 |
|
$this->addCssJs($params); |
239
|
|
|
|
240
|
2 |
|
return $params; |
241
|
|
|
} |
242
|
|
|
|
243
|
|
|
/** |
244
|
|
|
* List jobs in system by default. |
245
|
|
|
* |
246
|
|
|
* @Route("/runs", name="dtc_queue_runs") |
247
|
|
|
*/ |
248
|
1 |
View Code Duplication |
public function runsAction() |
|
|
|
|
249
|
|
|
{ |
250
|
1 |
|
$this->validateRunManager(); |
251
|
1 |
|
$class1 = $this->container->getParameter('dtc_queue.class.run'); |
252
|
1 |
|
$class2 = $this->container->getParameter('dtc_queue.class.run_archive'); |
253
|
1 |
|
$label1 = 'Live Runs'; |
254
|
1 |
|
$label2 = 'Archived Runs'; |
255
|
|
|
|
256
|
1 |
|
$params = $this->getDualGridParams($class1, $class2, $label1, $label2); |
257
|
|
|
|
258
|
1 |
|
return $this->render('@DtcQueue/Queue/grid.html.twig', $params); |
259
|
|
|
} |
260
|
|
|
|
261
|
|
|
/** |
262
|
|
|
* List registered workers in the system. |
263
|
|
|
* |
264
|
|
|
* @Route("/workers", name="dtc_queue_workers") |
265
|
|
|
* @Template("@DtcQueue/Queue/workers.html.twig") |
266
|
|
|
*/ |
267
|
1 |
|
public function workersAction() |
268
|
|
|
{ |
269
|
1 |
|
$workerManager = $this->get('dtc_queue.manager.worker'); |
270
|
1 |
|
$workers = $workerManager->getWorkers(); |
271
|
|
|
|
272
|
1 |
|
$workerList = []; |
273
|
1 |
|
foreach ($workers as $workerName => $worker) { |
274
|
|
|
/* @var Worker $worker */ |
275
|
|
|
$workerList[$workerName] = get_class($worker); |
276
|
|
|
} |
277
|
1 |
|
$params = ['workers' => $workerList]; |
278
|
1 |
|
$this->addCssJs($params); |
279
|
|
|
|
280
|
1 |
|
return $params; |
281
|
|
|
} |
282
|
|
|
} |
283
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.