1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ODM; |
4
|
|
|
|
5
|
|
|
use Doctrine\MongoDB\Query\Builder; |
6
|
|
|
use Dtc\QueueBundle\Doctrine\DoctrineJobManager; |
7
|
|
|
use Doctrine\ODM\MongoDB\DocumentManager; |
8
|
|
|
use Dtc\QueueBundle\Document\Job; |
9
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
10
|
|
|
use Dtc\QueueBundle\Util\Util; |
11
|
|
|
|
12
|
|
|
class JobManager extends DoctrineJobManager |
13
|
|
|
{ |
14
|
|
|
use CommonTrait; |
15
|
|
|
const REDUCE_FUNCTION = 'function(k, vals) { |
16
|
|
|
var result = {}; |
17
|
|
|
for (var index in vals) { |
18
|
|
|
var val = vals[index]; |
19
|
|
|
for (var i in val) { |
20
|
|
|
if (result.hasOwnProperty(i)) { |
21
|
|
|
result[i] += val[i]; |
22
|
|
|
} |
23
|
|
|
else { |
24
|
|
|
result[i] = val[i]; |
25
|
|
|
} |
26
|
|
|
} |
27
|
|
|
} |
28
|
|
|
return result; |
29
|
|
|
}'; |
30
|
|
|
|
31
|
|
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
32
|
|
|
{ |
33
|
3 |
|
/** @var DocumentManager $objectManager */ |
34
|
|
|
$objectManager = $this->getObjectManager(); |
35
|
|
|
$qb = $objectManager->createQueryBuilder($objectName); |
36
|
3 |
|
$qb |
37
|
3 |
|
->find() |
38
|
|
|
->field('status')->equals($status); |
39
|
3 |
|
|
40
|
3 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
41
|
|
|
$query = $qb->getQuery(); |
42
|
3 |
|
|
43
|
3 |
|
return $query->count(); |
44
|
|
|
} |
45
|
3 |
|
|
46
|
|
|
/** |
47
|
|
|
* @param string|null $workerName |
48
|
|
|
* @param string|null $method |
49
|
|
|
*/ |
50
|
|
|
public function pruneExceptionJobs($workerName = null, $method = null) |
51
|
|
|
{ |
52
|
1 |
|
/** @var DocumentManager $objectManager */ |
53
|
|
|
$objectManager = $this->getObjectManager(); |
54
|
|
|
$qb = $objectManager->createQueryBuilder($this->getJobArchiveClass()); |
55
|
1 |
|
$qb = $qb->remove(); |
56
|
1 |
|
$qb->field('status')->equals(BaseJob::STATUS_EXCEPTION); |
57
|
1 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
58
|
1 |
|
|
59
|
1 |
|
$query = $qb->getQuery(); |
60
|
|
|
$result = $query->execute(); |
61
|
1 |
|
if (isset($result['n'])) { |
62
|
1 |
|
return $result['n']; |
63
|
1 |
|
} |
64
|
1 |
|
|
65
|
|
|
return 0; |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
/** |
69
|
|
|
* @param Builder $builder |
70
|
|
|
* @param string|null $workerName |
71
|
|
|
* @param string|null $method |
72
|
|
|
*/ |
73
|
|
|
protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null) |
74
|
|
|
{ |
75
|
18 |
|
if (null !== $workerName) { |
76
|
|
|
$builder->field('workerName')->equals($workerName); |
77
|
18 |
|
} |
78
|
6 |
|
|
79
|
6 |
|
if (null !== $method) { |
80
|
|
|
$builder->field('method')->equals($method); |
81
|
18 |
|
} |
82
|
5 |
|
} |
83
|
5 |
|
|
84
|
18 |
|
/** |
85
|
|
|
* @param null $workerName |
86
|
|
|
* @param null $method |
87
|
|
|
* |
88
|
|
|
* @return int |
89
|
|
|
*/ |
90
|
|
|
protected function updateExpired($workerName = null, $method = null) |
91
|
|
|
{ |
92
|
2 |
|
/** @var DocumentManager $objectManager */ |
93
|
|
|
$objectManager = $this->getObjectManager(); |
94
|
|
|
$qb = $objectManager->createQueryBuilder($this->getJobClass()); |
95
|
2 |
|
$qb = $qb->updateMany(); |
96
|
2 |
|
$qb->field('expiresAt')->lte(Util::getMicrotimeDateTime()); |
97
|
2 |
|
$qb->field('status')->equals(BaseJob::STATUS_NEW); |
98
|
2 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
99
|
2 |
|
$qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED); |
100
|
2 |
|
$query = $qb->getQuery(); |
101
|
2 |
|
$result = $query->execute(); |
102
|
2 |
|
if (isset($result['n'])) { |
103
|
2 |
|
return $result['n']; |
104
|
2 |
|
} |
105
|
2 |
|
|
106
|
|
|
return 0; |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
/** |
110
|
|
|
* Removes archived jobs older than $olderThan. |
111
|
|
|
* |
112
|
|
|
* @param \DateTime $olderThan |
113
|
|
|
* |
114
|
|
|
* @return int |
115
|
|
|
*/ |
116
|
|
|
public function pruneArchivedJobs(\DateTime $olderThan) |
117
|
|
|
{ |
118
|
1 |
|
return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan); |
119
|
|
|
} |
120
|
1 |
|
|
121
|
|
View Code Duplication |
public function getWaitingJobCount($workerName = null, $method = null) |
|
|
|
|
122
|
|
|
{ |
123
|
2 |
|
/** @var DocumentManager $objectManager */ |
124
|
|
|
$objectManager = $this->getObjectManager(); |
125
|
|
|
$builder = $objectManager->createQueryBuilder($this->getJobClass()); |
126
|
2 |
|
$builder |
127
|
2 |
|
->find(); |
128
|
|
|
|
129
|
2 |
|
$this->addWorkerNameCriterion($builder, $workerName, $method); |
130
|
|
|
$this->addStandardPredicates($builder); |
131
|
2 |
|
|
132
|
|
|
$query = $builder->getQuery(); |
133
|
|
|
|
134
|
2 |
|
return $query->count(true); |
135
|
|
|
} |
136
|
2 |
|
|
137
|
2 |
|
/** |
138
|
2 |
|
* Get Status Jobs. |
139
|
2 |
|
* |
140
|
|
|
* @param string $documentName |
141
|
2 |
|
* |
142
|
|
|
* @return array |
143
|
|
|
*/ |
144
|
|
|
protected function getStatusByDocument($documentName) |
145
|
|
|
{ |
146
|
|
|
// Run a map reduce function get worker and status break down |
147
|
|
|
$mapFunc = "function() { |
148
|
|
|
var result = {}; |
149
|
|
|
result[this.status] = 1; |
150
|
|
|
var key = this.worker_name + '->' + this.method + '()'; |
151
|
3 |
|
emit(key, result); |
152
|
|
|
}"; |
153
|
|
|
$reduceFunc = self::REDUCE_FUNCTION; |
154
|
|
|
/** @var DocumentManager $objectManager */ |
155
|
|
|
$objectManager = $this->getObjectManager(); |
156
|
|
|
$builder = $objectManager->createQueryBuilder($documentName); |
157
|
|
|
$builder->map($mapFunc) |
158
|
|
|
->reduce($reduceFunc); |
159
|
3 |
|
$query = $builder->getQuery(); |
160
|
3 |
|
$results = $query->execute(); |
161
|
|
|
|
162
|
3 |
|
$allStatus = static::getAllStatuses(); |
163
|
3 |
|
|
164
|
3 |
|
$status = []; |
165
|
3 |
|
|
166
|
3 |
|
foreach ($results as $info) { |
167
|
3 |
|
$status[$info['_id']] = $info['value'] + $allStatus; |
168
|
|
|
} |
169
|
|
|
|
170
|
3 |
|
return $status; |
171
|
3 |
|
} |
172
|
3 |
|
|
173
|
3 |
|
public function getStatus() |
174
|
3 |
|
{ |
175
|
3 |
|
$result = $this->getStatusByDocument($this->getJobClass()); |
176
|
3 |
|
$status2 = $this->getStatusByDocument($this->getJobArchiveClass()); |
177
|
3 |
|
foreach ($status2 as $key => $value) { |
178
|
3 |
|
foreach ($value as $k => $v) { |
179
|
3 |
|
if (isset($result[$key][$k])) { |
180
|
3 |
|
$result[$key][$k] += $v; |
181
|
3 |
|
} else { |
182
|
|
|
$result[$key][$k] = $v; |
183
|
3 |
|
} |
184
|
|
|
} |
185
|
3 |
|
} |
186
|
1 |
|
|
187
|
3 |
|
$finalResult = []; |
188
|
|
|
foreach ($result as $key => $item) { |
189
|
3 |
|
ksort($item); |
190
|
|
|
$finalResult[$key] = $item; |
191
|
|
|
} |
192
|
3 |
|
|
193
|
|
|
return $finalResult; |
194
|
3 |
|
} |
195
|
3 |
|
|
196
|
3 |
|
/** |
197
|
1 |
|
* Get the next job to run (can be filtered by workername and method name). |
198
|
1 |
|
* |
199
|
1 |
|
* @param string $workerName |
200
|
1 |
|
* @param string $methodName |
201
|
|
|
* @param bool $prioritize |
202
|
|
|
* @param string|null $runId |
203
|
1 |
|
* |
204
|
3 |
|
* @return \Dtc\QueueBundle\Model\Job |
205
|
|
|
*/ |
206
|
3 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
207
|
3 |
|
{ |
208
|
1 |
|
$builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize); |
209
|
1 |
|
$builder |
210
|
3 |
|
->findAndUpdate() |
211
|
|
|
->returnNew(); |
212
|
3 |
|
|
213
|
|
|
$date = Util::getMicrotimeDateTime(); |
214
|
|
|
// Update |
215
|
|
|
$builder |
216
|
|
|
->field('startedAt')->set($date) |
217
|
|
|
->field('status')->set(BaseJob::STATUS_RUNNING) |
218
|
|
|
->field('runId')->set($runId); |
219
|
|
|
|
220
|
|
|
$query = $builder->getQuery(); |
221
|
|
|
|
222
|
|
|
$job = $query->execute(); |
223
|
|
|
|
224
|
|
|
return $job; |
225
|
11 |
|
} |
226
|
|
|
|
227
|
11 |
|
/** |
228
|
|
|
* @param string|null $workerName |
229
|
11 |
|
* @param string|null $methodName |
230
|
11 |
|
* @param bool $prioritize |
231
|
|
|
* |
232
|
11 |
|
* @return Builder |
233
|
|
|
*/ |
234
|
|
|
public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true) |
235
|
11 |
|
{ |
236
|
11 |
|
/** @var DocumentManager $objectManager */ |
237
|
11 |
|
$objectManager = $this->getObjectManager(); |
238
|
|
|
$builder = $objectManager->createQueryBuilder($this->getJobClass()); |
239
|
11 |
|
|
240
|
|
|
$this->addWorkerNameCriterion($builder, $workerName, $methodName); |
241
|
11 |
|
if ($prioritize) { |
242
|
|
|
$builder->sort([ |
243
|
11 |
|
'priority' => 'desc', |
244
|
|
|
'whenAt' => 'asc', |
245
|
|
|
]); |
246
|
|
|
} else { |
247
|
|
|
$builder->sort('whenAt', 'asc'); |
248
|
|
|
} |
249
|
|
|
|
250
|
|
|
// Filter |
251
|
|
|
$this->addStandardPredicates($builder); |
252
|
|
|
|
253
|
11 |
|
return $builder; |
254
|
|
|
} |
255
|
|
|
|
256
|
11 |
|
protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job) |
257
|
11 |
|
{ |
258
|
|
|
/** @var DocumentManager $objectManager */ |
259
|
11 |
|
$objectManager = $this->getObjectManager(); |
260
|
11 |
|
$builder = $objectManager->createQueryBuilder($this->getJobClass()); |
261
|
11 |
|
$builder->find(); |
262
|
11 |
|
|
263
|
11 |
|
$builder->sort('whenAt', 'asc'); |
264
|
11 |
|
$builder->field('status')->equals(BaseJob::STATUS_NEW) |
265
|
11 |
|
->field('crcHash')->equals($job->getCrcHash()); |
266
|
1 |
|
$oldJob = $builder->getQuery()->getSingleResult(); |
267
|
|
|
|
268
|
|
|
if (!$oldJob) { |
269
|
|
|
return null; |
270
|
11 |
|
} |
271
|
|
|
|
272
|
11 |
|
// Update priority or whenAt |
273
|
|
|
// This makes sure if someone else is updating at the same time |
274
|
|
|
// that we don't trounce their changes. |
275
|
1 |
|
$builder = $objectManager->createQueryBuilder($this->getJobClass()); |
276
|
|
|
$builder->findAndUpdate(); |
277
|
|
|
$builder->field('_id')->equals($oldJob->getId()); |
278
|
1 |
|
$builder->field('priority')->lt($job->getPriority()); |
279
|
1 |
|
$builder->field('priority')->set($job->getPriority()); |
280
|
1 |
|
$builder->getQuery()->execute(); |
281
|
|
|
|
282
|
1 |
|
$builder = $objectManager->createQueryBuilder($this->getJobClass()); |
283
|
1 |
|
$builder->findAndUpdate(); |
284
|
1 |
|
$builder->field('_id')->equals($oldJob->getId()); |
285
|
1 |
|
$builder->field('whenAt')->gt($job->getWhenAt()); |
286
|
|
|
$builder->field('whenAt')->set($job->getWhenAt()); |
287
|
1 |
|
$builder->getQuery()->execute(); |
288
|
|
|
|
289
|
|
|
if ($job->getWhenAt() < $oldJob->getWhenAt()) { |
290
|
|
|
$oldJob->setWhenAt($job->getWhenAt()); |
291
|
|
|
} |
292
|
|
|
if ($job->getPriority() > $oldJob->getPriority()) { |
293
|
|
|
$oldJob->setPriority($job->getPriority()); |
294
|
1 |
|
} |
295
|
1 |
|
|
296
|
1 |
|
return $oldJob; |
297
|
1 |
|
} |
298
|
1 |
|
|
299
|
1 |
|
/** |
300
|
|
|
* @param mixed $builder |
301
|
1 |
|
*/ |
302
|
1 |
|
protected function addStandardPredicates($builder) |
303
|
1 |
|
{ |
304
|
1 |
|
$date = Util::getMicrotimeDateTime(); |
305
|
1 |
|
$builder |
306
|
1 |
|
->addAnd( |
307
|
|
|
$builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)), |
308
|
1 |
|
$builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date)) |
309
|
1 |
|
) |
310
|
1 |
|
->field('status')->equals(BaseJob::STATUS_NEW); |
311
|
1 |
|
} |
312
|
1 |
|
|
313
|
1 |
|
public function getWorkersAndMethods() |
314
|
|
|
{ |
315
|
1 |
|
/** @var DocumentManager $documentManager */ |
316
|
|
|
$documentManager = $this->getObjectManager(); |
317
|
|
|
|
318
|
|
|
if (!method_exists($documentManager, 'createAggregationBuilder')) { |
319
|
|
|
return []; |
320
|
|
|
} |
321
|
12 |
|
|
322
|
|
|
$aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass()); |
323
|
12 |
|
|
324
|
|
|
$this->addStandardPredicates($aggregationBuilder->match()); |
325
|
12 |
|
|
326
|
12 |
|
$aggregationBuilder->group() |
327
|
12 |
|
->field('id') |
328
|
12 |
|
->expression( |
329
|
12 |
|
$aggregationBuilder->expr() |
330
|
12 |
|
->field('workerName')->expression('$workerName') |
331
|
|
|
->field('method')->expression('$method') |
332
|
1 |
|
); |
333
|
|
|
$results = $aggregationBuilder->execute()->toArray(); |
334
|
|
|
|
335
|
1 |
|
if (!$results) { |
336
|
|
|
return []; |
337
|
1 |
|
} |
338
|
|
|
|
339
|
|
|
$workersMethods = []; |
340
|
|
|
foreach ($results as $result) { |
341
|
1 |
|
if (isset($result['_id'])) { |
342
|
|
|
$workersMethods[$result['_id']['worker_name']][] = $result['_id']['method']; |
343
|
1 |
|
} |
344
|
|
|
} |
345
|
1 |
|
|
346
|
1 |
|
return $workersMethods; |
347
|
1 |
|
} |
348
|
1 |
|
|
349
|
1 |
|
/** |
350
|
1 |
|
* @param string $workerName |
351
|
1 |
|
* @param string $methodName |
352
|
1 |
|
*/ |
353
|
|
|
public function countLiveJobs($workerName = null, $methodName = null) |
354
|
1 |
|
{ |
355
|
1 |
|
/** @var DocumentManager $objectManager */ |
356
|
|
|
$objectManager = $this->getObjectManager(); |
357
|
|
|
$builder = $objectManager->createQueryBuilder($this->getJobClass()); |
358
|
|
|
|
359
|
|
|
$this->addWorkerNameCriterion($builder, $workerName, $methodName); |
360
|
|
|
// Filter |
361
|
|
|
$this->addStandardPredicates($builder); |
362
|
|
|
|
363
|
|
|
return $builder->getQuery()->count(); |
364
|
|
|
} |
365
|
|
|
|
366
|
|
|
/** |
367
|
|
|
* @param string $workerName |
368
|
|
|
* @param string $methodName |
369
|
|
|
* @param callable|null $progressCallback |
370
|
|
|
*/ |
371
|
|
|
public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null) |
372
|
3 |
|
{ |
373
|
|
|
/** @var DocumentManager $documentManager */ |
374
|
|
|
$documentManager = $this->getObjectManager(); |
375
|
2 |
|
$count = 0; |
376
|
2 |
|
$builder = $this->getJobQueryBuilder($workerName, $methodName, true); |
377
|
|
|
$builder |
378
|
2 |
|
->findAndUpdate() |
379
|
|
|
->returnNew(); |
380
|
3 |
|
|
381
|
|
|
$builder->field('status')->set(Job::STATUS_ARCHIVE); |
382
|
2 |
|
$query = $builder->getQuery(); |
383
|
|
|
do { |
384
|
|
|
$job = $query->execute(); |
385
|
|
|
if ($job) { |
386
|
|
|
$documentManager->remove($job); |
387
|
|
|
++$count; |
388
|
|
|
|
389
|
|
|
if (0 == $count % 10) { |
390
|
1 |
|
$this->flush(); |
391
|
1 |
|
$this->updateProgress($progressCallback, $count); |
392
|
|
|
} |
393
|
1 |
|
} |
394
|
1 |
|
} while ($job); |
395
|
1 |
|
$this->flush(); |
396
|
|
|
$this->updateProgress($progressCallback, $count); |
397
|
1 |
|
} |
398
|
|
|
} |
399
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.