1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ODM; |
4
|
|
|
|
5
|
|
|
use Doctrine\MongoDB\Query\Builder; |
6
|
|
|
use Doctrine\ODM\MongoDB\Mapping\ClassMetadata; |
7
|
|
|
use Dtc\QueueBundle\Doctrine\BaseJobManager; |
8
|
|
|
use Doctrine\ODM\MongoDB\DocumentManager; |
9
|
|
|
use Dtc\QueueBundle\Document\Job; |
10
|
|
|
|
11
|
|
|
class JobManager extends BaseJobManager |
12
|
|
|
{ |
13
|
|
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
14
|
|
|
{ |
15
|
|
|
/** @var DocumentManager $objectManager */ |
16
|
|
|
$objectManager = $this->getObjectManager(); |
17
|
|
|
$qb = $objectManager->createQueryBuilder($objectName); |
18
|
|
|
$qb |
19
|
|
|
->find() |
20
|
|
|
->field('status')->equals($status); |
21
|
|
|
|
22
|
|
|
if (null !== $workerName) { |
23
|
|
|
$qb->field('workerName')->equals($workerName); |
24
|
|
|
} |
25
|
|
|
|
26
|
|
|
if (null !== $method) { |
27
|
|
|
$qb->field('method')->equals($method); |
28
|
|
|
} |
29
|
|
|
|
30
|
|
|
$query = $qb->getQuery(); |
31
|
|
|
|
32
|
|
|
return $query->count(); |
33
|
|
|
} |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* @param string $objectName |
37
|
|
|
*/ |
38
|
|
|
public function stopIdGenerator($objectName) |
39
|
|
|
{ |
40
|
|
|
$objectManager = $this->getObjectManager(); |
41
|
|
|
$repository = $objectManager->getRepository($objectName); |
42
|
|
|
/** @var ClassMetadata $metadata */ |
43
|
|
|
$metadata = $this->getObjectManager()->getClassMetadata($repository->getClassName()); |
44
|
|
|
$metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE); |
45
|
|
|
} |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* @param string $workerName |
49
|
|
|
* @param string $method |
50
|
|
|
*/ |
51
|
|
|
public function pruneErroneousJobs($workerName = null, $method = null) |
52
|
|
|
{ |
53
|
|
|
return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) { |
|
|
|
|
54
|
|
|
/* @var Builder $qb */ |
55
|
|
|
return $qb->field('status')->equals(Job::STATUS_ERROR); |
56
|
|
|
}); |
57
|
|
|
} |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* Prunes jobs according to a condition function. |
61
|
|
|
* |
62
|
|
|
* @param null $workerName |
63
|
|
|
* @param null $method |
64
|
|
|
* @param $conditionFunc |
65
|
|
|
* |
66
|
|
|
* @return int |
67
|
|
|
*/ |
68
|
|
|
protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc) |
69
|
|
|
{ |
70
|
|
|
/** @var DocumentManager $objectManager */ |
71
|
|
|
$objectManager = $this->getObjectManager(); |
72
|
|
|
$qb = $objectManager->createQueryBuilder($objectName); |
73
|
|
|
$qb = $qb->remove(); |
74
|
|
|
$qb = $conditionFunc($qb); |
75
|
|
|
|
76
|
|
|
if (null !== $workerName) { |
77
|
|
|
$qb->field('workerName')->equals($workerName); |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
if (null !== $method) { |
81
|
|
|
$qb->field('method')->equals($method); |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
$query = $qb->getQuery(); |
85
|
|
|
$result = $query->execute(); |
86
|
|
|
if (isset($result['n'])) { |
87
|
|
|
return $result['n']; |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
return 0; |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
/** |
94
|
|
|
* Prunes expired jobs. |
95
|
|
|
* |
96
|
|
|
* @param string $workerName |
97
|
|
|
* @param string $method |
98
|
|
|
*/ |
99
|
|
|
public function pruneExpiredJobs($workerName = null, $method = null) |
100
|
|
|
{ |
101
|
|
|
return $this->pruneJobs($workerName, $method, $this->getObjectName(), function ($qb) { |
|
|
|
|
102
|
|
|
/* @var Builder $qb */ |
103
|
|
|
return $qb->field('expiresAt')->lte(new \DateTime()); |
104
|
|
|
}); |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* Removes archived jobs older than $olderThan. |
109
|
|
|
* |
110
|
|
|
* @param \DateTime $olderThan |
111
|
|
|
*/ |
112
|
|
|
public function pruneArchivedJobs(\DateTime $olderThan) |
113
|
|
|
{ |
114
|
|
|
/** @var DocumentManager $objectManager */ |
115
|
|
|
$objectManager = $this->getObjectManager(); |
116
|
|
|
$qb = $objectManager->createQueryBuilder($this->getArchiveObjectName()); |
117
|
|
|
$qb |
118
|
|
|
->remove() |
119
|
|
|
->field('updatedAt')->lt($olderThan); |
120
|
|
|
|
121
|
|
|
$query = $qb->getQuery(); |
122
|
|
|
$result = $query->execute(); |
123
|
|
|
if (isset($result['n'])) { |
124
|
|
|
return $result['n']; |
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
return 0; |
128
|
|
|
} |
129
|
|
|
|
130
|
|
|
public function getJobCount($workerName = null, $method = null) |
131
|
|
|
{ |
132
|
|
|
/** @var DocumentManager $objectManager */ |
133
|
|
|
$objectManager = $this->getObjectManager(); |
134
|
|
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
135
|
|
|
$qb |
136
|
|
|
->find(); |
137
|
|
|
|
138
|
|
|
if (null !== $workerName) { |
139
|
|
|
$qb->field('workerName')->equals($workerName); |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
if (null !== $method) { |
143
|
|
|
$qb->field('method')->equals($method); |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
// Filter |
147
|
|
|
$date = new \DateTime(); |
148
|
|
|
$qb |
149
|
|
|
->addAnd( |
150
|
|
|
$qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date)) |
151
|
|
|
) |
152
|
|
|
->field('locked')->equals(null); |
153
|
|
|
|
154
|
|
|
$query = $qb->getQuery(); |
155
|
|
|
|
156
|
|
|
return $query->count(true); |
157
|
|
|
} |
158
|
|
|
|
159
|
|
|
/** |
160
|
|
|
* Get Status Jobs. |
161
|
|
|
* |
162
|
|
|
* @param string $documentName |
163
|
|
|
*/ |
164
|
|
|
protected function getStatusByDocument($documentName) |
165
|
|
|
{ |
166
|
|
|
// Run a map reduce function get worker and status break down |
167
|
|
|
$mapFunc = "function() { |
168
|
|
|
var result = {}; |
169
|
|
|
result[this.status] = 1; |
170
|
|
|
var key = this.worker_name + '->' + this.method + '()'; |
171
|
|
|
emit(key, result); |
172
|
|
|
}"; |
173
|
|
|
$reduceFunc = 'function(k, vals) { |
174
|
|
|
var result = {}; |
175
|
|
|
for (var index in vals) { |
176
|
|
|
var val = vals[index]; |
177
|
|
|
for (var i in val) { |
178
|
|
|
if (result.hasOwnProperty(i)) { |
179
|
|
|
result[i] += val[i]; |
180
|
|
|
} |
181
|
|
|
else { |
182
|
|
|
result[i] = val[i]; |
183
|
|
|
} |
184
|
|
|
} |
185
|
|
|
} |
186
|
|
|
return result; |
187
|
|
|
}'; |
188
|
|
|
/** @var DocumentManager $objectManager */ |
189
|
|
|
$objectManager = $this->getObjectManager(); |
190
|
|
|
$qb = $objectManager->createQueryBuilder($documentName); |
191
|
|
|
$qb->map($mapFunc) |
192
|
|
|
->reduce($reduceFunc); |
193
|
|
|
$query = $qb->getQuery(); |
194
|
|
|
$results = $query->execute(); |
195
|
|
|
|
196
|
|
|
$allStatus = array( |
197
|
|
|
Job::STATUS_ERROR => 0, |
198
|
|
|
Job::STATUS_NEW => 0, |
199
|
|
|
Job::STATUS_RUNNING => 0, |
200
|
|
|
Job::STATUS_SUCCESS => 0, |
201
|
|
|
); |
202
|
|
|
|
203
|
|
|
$status = []; |
204
|
|
|
|
205
|
|
|
foreach ($results as $info) { |
206
|
|
|
$status[$info['_id']] = $info['value'] + $allStatus; |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
return $status; |
210
|
|
|
} |
211
|
|
|
|
212
|
|
|
public function getStatus() |
213
|
|
|
{ |
214
|
|
|
$result = $this->getStatusByDocument($this->getObjectName()); |
215
|
|
|
$status2 = $this->getStatusByDocument($this->getArchiveObjectName()); |
216
|
|
|
foreach ($status2 as $key => $value) { |
217
|
|
|
foreach ($value as $k => $v) { |
218
|
|
|
$result[$key][$k] += $v; |
219
|
|
|
} |
220
|
|
|
} |
221
|
|
|
|
222
|
|
|
$finalResult = []; |
223
|
|
|
foreach ($result as $key => $item) { |
224
|
|
|
ksort($item); |
225
|
|
|
$finalResult[$key] = $item; |
226
|
|
|
} |
227
|
|
|
|
228
|
|
|
return $finalResult; |
229
|
|
|
} |
230
|
|
|
|
231
|
|
|
/** |
232
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
233
|
|
|
* |
234
|
|
|
* @param string $workerName |
235
|
|
|
* @param string $methodName |
236
|
|
|
* @param bool $prioritize |
237
|
|
|
* |
238
|
|
|
* @return \Dtc\QueueBundle\Model\Job |
239
|
|
|
*/ |
240
|
|
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
241
|
|
|
{ |
242
|
|
|
/** @var DocumentManager $objectManager */ |
243
|
|
|
$objectManager = $this->getObjectManager(); |
244
|
|
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
245
|
|
|
$qb |
246
|
|
|
->findAndUpdate() |
247
|
|
|
->returnNew(); |
248
|
|
|
|
249
|
|
|
if (null !== $workerName) { |
250
|
|
|
$qb->field('workerName')->equals($workerName); |
251
|
|
|
} |
252
|
|
|
|
253
|
|
|
if (null !== $methodName) { |
254
|
|
|
$qb->field('method')->equals($methodName); |
255
|
|
|
} |
256
|
|
|
|
257
|
|
|
if ($prioritize) { |
258
|
|
|
$qb->sort('priority', 'asc'); |
259
|
|
|
} else { |
260
|
|
|
$qb->sort('whenAt', 'asc'); |
261
|
|
|
} |
262
|
|
|
|
263
|
|
|
// Filter |
264
|
|
|
$date = new \DateTime(); |
265
|
|
|
$qb |
266
|
|
|
->addAnd( |
267
|
|
|
$qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)), |
268
|
|
|
$qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date)) |
269
|
|
|
) |
270
|
|
|
->field('status')->equals(Job::STATUS_NEW) |
271
|
|
|
->field('locked')->equals(null); |
272
|
|
|
|
273
|
|
|
// Update |
274
|
|
|
$qb |
275
|
|
|
->field('lockedAt')->set($date) // Set started |
276
|
|
|
->field('locked')->set(true) |
277
|
|
|
->field('status')->set(Job::STATUS_RUNNING) |
278
|
|
|
->field('runId')->set($runId); |
279
|
|
|
|
280
|
|
|
$query = $qb->getQuery(); |
281
|
|
|
|
282
|
|
|
$job = $query->execute(); |
283
|
|
|
|
284
|
|
|
return $job; |
285
|
|
|
} |
286
|
|
|
} |
287
|
|
|
|
This check looks at variables that have been passed in as parameters and are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.