1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ODM; |
4
|
|
|
|
5
|
|
|
use Doctrine\ODM\MongoDB\Mapping\ClassMetadata; |
6
|
|
|
use Dtc\QueueBundle\Doctrine\BaseJobManager; |
7
|
|
|
use Doctrine\ODM\MongoDB\DocumentManager; |
8
|
|
|
use Dtc\QueueBundle\Document\Job; |
9
|
|
|
|
10
|
|
|
class JobManager extends BaseJobManager |
11
|
|
|
{ |
12
|
|
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
13
|
|
|
{ |
14
|
|
|
/** @var DocumentManager $objectManager */ |
15
|
|
|
$objectManager = $this->getObjectManager(); |
16
|
|
|
$qb = $objectManager->createQueryBuilder($objectName); |
17
|
|
|
$qb |
18
|
|
|
->find() |
19
|
|
|
->field('status')->equals($status); |
20
|
|
|
|
21
|
|
|
if ($workerName) { |
22
|
|
|
$qb->field('workerName')->equals($workerName); |
23
|
|
|
} |
24
|
|
|
|
25
|
|
|
if ($method) { |
26
|
|
|
$qb->field('method')->equals($method); |
27
|
|
|
} |
28
|
|
|
|
29
|
|
|
$query = $qb->getQuery(); |
30
|
|
|
|
31
|
|
|
return $query->count(); |
32
|
|
|
} |
33
|
|
|
|
34
|
|
|
public function stopIdGenerator($objectName) |
35
|
|
|
{ |
36
|
|
|
$objectManager = $this->getObjectManager(); |
37
|
|
|
$repository = $objectManager->getRepository($objectName); |
38
|
|
|
/** @var ClassMetadata $metadata */ |
39
|
|
|
$metadata = $this->getObjectManager()->getClassMetadata($repository->getClassName()); |
40
|
|
|
$metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE); |
41
|
|
|
} |
42
|
|
|
|
43
|
|
View Code Duplication |
public function pruneErroneousJobs($workerName = null, $method = null) |
|
|
|
|
44
|
|
|
{ |
45
|
|
|
/** @var DocumentManager $objectManager */ |
46
|
|
|
$objectManager = $this->getObjectManager(); |
47
|
|
|
$qb = $objectManager->createQueryBuilder($this->getArchiveObjectName()); |
48
|
|
|
$qb |
49
|
|
|
->remove() |
50
|
|
|
->field('status')->equals(Job::STATUS_ERROR); |
51
|
|
|
|
52
|
|
|
if ($workerName) { |
53
|
|
|
$qb->field('workerName')->equals($workerName); |
54
|
|
|
} |
55
|
|
|
|
56
|
|
|
if ($method) { |
57
|
|
|
$qb->field('method')->equals($method); |
58
|
|
|
} |
59
|
|
|
$query = $qb->getQuery(); |
60
|
|
|
$result = $query->execute(); |
61
|
|
|
if (isset($result['n'])) { |
62
|
|
|
return $result['n']; |
63
|
|
|
} |
64
|
|
|
|
65
|
|
|
return 0; |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
/** |
69
|
|
|
* Prunes expired jobs. |
70
|
|
|
*/ |
71
|
|
View Code Duplication |
public function pruneExpiredJobs($workerName = null, $method = null) |
|
|
|
|
72
|
|
|
{ |
73
|
|
|
/** @var DocumentManager $objectManager */ |
74
|
|
|
$objectManager = $this->getObjectManager(); |
75
|
|
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
76
|
|
|
$qb |
77
|
|
|
->remove() |
78
|
|
|
->field('expiresAt')->lte(new \DateTime()); |
79
|
|
|
|
80
|
|
|
if ($workerName) { |
81
|
|
|
$qb->field('workerName')->equals($workerName); |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
if ($method) { |
85
|
|
|
$qb->field('method')->equals($method); |
86
|
|
|
} |
87
|
|
|
|
88
|
|
|
$query = $qb->getQuery(); |
89
|
|
|
$result = $query->execute(); |
90
|
|
|
if (isset($result['n'])) { |
91
|
|
|
return $result['n']; |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
return 0; |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
/** |
98
|
|
|
* Removes archived jobs older than $olderThan. |
99
|
|
|
* |
100
|
|
|
* @param \DateTime $olderThan |
101
|
|
|
*/ |
102
|
|
|
public function pruneArchivedJobs(\DateTime $olderThan) |
103
|
|
|
{ |
104
|
|
|
/** @var DocumentManager $objectManager */ |
105
|
|
|
$objectManager = $this->getObjectManager(); |
106
|
|
|
$qb = $objectManager->createQueryBuilder($this->getArchiveObjectName()); |
107
|
|
|
$qb |
108
|
|
|
->remove() |
109
|
|
|
->field('updatedAt')->lt($olderThan); |
110
|
|
|
|
111
|
|
|
$query = $qb->getQuery(); |
112
|
|
|
$result = $query->execute(); |
113
|
|
|
if (isset($result['n'])) { |
114
|
|
|
return $result['n']; |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
return 0; |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
public function getJobCount($workerName = null, $method = null) |
121
|
|
|
{ |
122
|
|
|
/** @var DocumentManager $objectManager */ |
123
|
|
|
$objectManager = $this->getObjectManager(); |
124
|
|
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
125
|
|
|
$qb |
126
|
|
|
->find(); |
127
|
|
|
|
128
|
|
|
if ($workerName) { |
129
|
|
|
$qb->field('workerName')->equals($workerName); |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
if ($method) { |
133
|
|
|
$qb->field('method')->equals($method); |
134
|
|
|
} |
135
|
|
|
|
136
|
|
|
// Filter |
137
|
|
|
$date = new \DateTime(); |
138
|
|
|
$qb |
139
|
|
|
->addAnd( |
140
|
|
|
$qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date)) |
141
|
|
|
) |
142
|
|
|
->field('locked')->equals(null); |
143
|
|
|
|
144
|
|
|
$query = $qb->getQuery(); |
145
|
|
|
|
146
|
|
|
return $query->count(true); |
147
|
|
|
} |
148
|
|
|
|
149
|
|
|
/** |
150
|
|
|
* Get Status Jobs. |
151
|
|
|
*/ |
152
|
|
|
protected function getStatusByDocument($documentName) |
153
|
|
|
{ |
154
|
|
|
// Run a map reduce function get worker and status break down |
155
|
|
|
$mapFunc = "function() { |
156
|
|
|
var result = {}; |
157
|
|
|
result[this.status] = 1; |
158
|
|
|
var key = this.worker_name + '->' + this.method + '()'; |
159
|
|
|
emit(key, result); |
160
|
|
|
}"; |
161
|
|
|
$reduceFunc = 'function(k, vals) { |
162
|
|
|
var result = {}; |
163
|
|
|
for (var index in vals) { |
164
|
|
|
var val = vals[index]; |
165
|
|
|
for (var i in val) { |
166
|
|
|
if (result.hasOwnProperty(i)) { |
167
|
|
|
result[i] += val[i]; |
168
|
|
|
} |
169
|
|
|
else { |
170
|
|
|
result[i] = val[i]; |
171
|
|
|
} |
172
|
|
|
} |
173
|
|
|
} |
174
|
|
|
return result; |
175
|
|
|
}'; |
176
|
|
|
/** @var DocumentManager $objectManager */ |
177
|
|
|
$objectManager = $this->getObjectManager(); |
178
|
|
|
$qb = $objectManager->createQueryBuilder($documentName); |
179
|
|
|
$qb->map($mapFunc) |
180
|
|
|
->reduce($reduceFunc); |
181
|
|
|
$query = $qb->getQuery(); |
182
|
|
|
$results = $query->execute(); |
183
|
|
|
|
184
|
|
|
$allStatus = array( |
185
|
|
|
Job::STATUS_ERROR => 0, |
186
|
|
|
Job::STATUS_NEW => 0, |
187
|
|
|
Job::STATUS_RUNNING => 0, |
188
|
|
|
Job::STATUS_SUCCESS => 0, |
189
|
|
|
); |
190
|
|
|
|
191
|
|
|
$status = []; |
192
|
|
|
|
193
|
|
|
foreach ($results as $info) { |
194
|
|
|
$status[$info['_id']] = $info['value'] + $allStatus; |
195
|
|
|
} |
196
|
|
|
|
197
|
|
|
return $status; |
198
|
|
|
} |
199
|
|
|
|
200
|
|
|
public function getStatus() |
201
|
|
|
{ |
202
|
|
|
$result = $this->getStatusByDocument($this->getObjectName()); |
203
|
|
|
$status2 = $this->getStatusByDocument($this->getArchiveObjectName()); |
204
|
|
|
foreach ($status2 as $key => $value) { |
205
|
|
|
foreach ($value as $k => $v) { |
206
|
|
|
$result[$key][$k] += $v; |
207
|
|
|
} |
208
|
|
|
} |
209
|
|
|
|
210
|
|
|
$finalResult = []; |
211
|
|
|
foreach ($result as $key => $item) { |
212
|
|
|
ksort($item); |
213
|
|
|
$finalResult[$key] = $item; |
214
|
|
|
} |
215
|
|
|
|
216
|
|
|
return $finalResult; |
217
|
|
|
} |
218
|
|
|
|
219
|
|
|
/** |
220
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
221
|
|
|
* |
222
|
|
|
* @param string $workerName |
223
|
|
|
* @param string $methodName |
224
|
|
|
* @param bool $prioritize |
225
|
|
|
* |
226
|
|
|
* @return \Dtc\QueueBundle\Model\Job |
227
|
|
|
*/ |
228
|
|
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
229
|
|
|
{ |
230
|
|
|
/** @var DocumentManager $objectManager */ |
231
|
|
|
$objectManager = $this->getObjectManager(); |
232
|
|
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
233
|
|
|
$qb |
234
|
|
|
->findAndUpdate() |
235
|
|
|
->returnNew(); |
236
|
|
|
|
237
|
|
|
if ($workerName) { |
|
|
|
|
238
|
|
|
$qb->field('workerName')->equals($workerName); |
239
|
|
|
} |
240
|
|
|
|
241
|
|
|
if ($methodName) { |
|
|
|
|
242
|
|
|
$qb->field('method')->equals($methodName); |
243
|
|
|
} |
244
|
|
|
|
245
|
|
|
if ($prioritize) { |
246
|
|
|
$qb->sort('priority', 'asc'); |
247
|
|
|
} else { |
248
|
|
|
$qb->sort('whenAt', 'asc'); |
249
|
|
|
} |
250
|
|
|
|
251
|
|
|
// Filter |
252
|
|
|
$date = new \DateTime(); |
253
|
|
|
$qb |
254
|
|
|
->addAnd( |
255
|
|
|
$qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)), |
256
|
|
|
$qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date)) |
257
|
|
|
) |
258
|
|
|
->field('status')->equals(Job::STATUS_NEW) |
259
|
|
|
->field('locked')->equals(null); |
260
|
|
|
|
261
|
|
|
// Update |
262
|
|
|
$qb |
263
|
|
|
->field('lockedAt')->set($date) // Set started |
264
|
|
|
->field('locked')->set(true) |
265
|
|
|
->field('status')->set(Job::STATUS_RUNNING) |
266
|
|
|
->field('runId')->set($runId); |
267
|
|
|
|
268
|
|
|
//$arr = $qb->getQueryArray(); |
|
|
|
|
269
|
|
|
$query = $qb->getQuery(); |
270
|
|
|
|
271
|
|
|
//ve($query->debug()); |
|
|
|
|
272
|
|
|
$job = $query->execute(); |
273
|
|
|
|
274
|
|
|
return $job; |
275
|
|
|
} |
276
|
|
|
} |
277
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.