1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ODM; |
4
|
|
|
|
5
|
|
|
use Doctrine\MongoDB\Query\Builder; |
6
|
|
|
use Dtc\QueueBundle\Doctrine\BaseJobManager; |
7
|
|
|
use Doctrine\ODM\MongoDB\DocumentManager; |
8
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
9
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
10
|
|
|
|
11
|
|
|
class JobManager extends BaseJobManager |
12
|
|
|
{ |
13
|
|
|
use CommonTrait; |
14
|
|
|
|
15
|
|
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
16
|
|
|
{ |
17
|
|
|
/** @var DocumentManager $objectManager */ |
18
|
|
|
$objectManager = $this->getObjectManager(); |
19
|
|
|
$qb = $objectManager->createQueryBuilder($objectName); |
20
|
|
|
$qb |
21
|
|
|
->find() |
22
|
|
|
->field('status')->equals($status); |
23
|
|
|
|
24
|
|
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
25
|
|
|
$query = $qb->getQuery(); |
26
|
|
|
|
27
|
|
|
return $query->count(); |
28
|
|
|
} |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @param string|null $workerName |
32
|
|
|
* @param string|null $method |
33
|
|
|
*/ |
34
|
|
|
public function pruneErroneousJobs($workerName = null, $method = null) |
35
|
|
|
{ |
36
|
|
|
/** @var DocumentManager $objectManager */ |
37
|
|
|
$objectManager = $this->getObjectManager(); |
38
|
|
|
$qb = $objectManager->createQueryBuilder($this->getArchiveObjectName()); |
39
|
|
|
$qb = $qb->remove(); |
40
|
|
|
$qb->field('status')->equals(BaseJob::STATUS_ERROR); |
41
|
|
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
42
|
|
|
|
43
|
|
|
$query = $qb->getQuery(); |
44
|
|
|
$result = $query->execute(); |
45
|
|
|
if (isset($result['n'])) { |
46
|
|
|
return $result['n']; |
47
|
|
|
} |
48
|
|
|
|
49
|
|
|
return 0; |
50
|
|
|
} |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* @param Builder $builder |
54
|
|
|
* @param string|null $workerName |
55
|
|
|
* @param string|null $method |
56
|
|
|
*/ |
57
|
3 |
|
protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null) |
58
|
|
|
{ |
59
|
3 |
|
if (null !== $workerName) { |
60
|
|
|
$builder->field('workerName')->equals($workerName); |
61
|
|
|
} |
62
|
|
|
|
63
|
3 |
|
if (null !== $method) { |
64
|
|
|
$builder->field('method')->equals($method); |
65
|
|
|
} |
66
|
3 |
|
} |
67
|
|
|
|
68
|
|
|
/** |
69
|
|
|
* @param null $workerName |
70
|
|
|
* @param null $method |
71
|
|
|
* |
72
|
|
|
* @return int |
73
|
|
|
*/ |
74
|
1 |
|
protected function updateExpired($workerName = null, $method = null) |
75
|
|
|
{ |
76
|
|
|
/** @var DocumentManager $objectManager */ |
77
|
1 |
|
$objectManager = $this->getObjectManager(); |
78
|
1 |
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
79
|
1 |
|
$qb = $qb->updateMany(); |
80
|
1 |
|
$qb->field('expiresAt')->lte(new \DateTime()); |
81
|
1 |
|
$qb->field('status')->equals(BaseJob::STATUS_NEW); |
82
|
1 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
83
|
1 |
|
$qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED); |
84
|
1 |
|
$query = $qb->getQuery(); |
85
|
1 |
|
$result = $query->execute(); |
86
|
1 |
|
if (isset($result['n'])) { |
87
|
1 |
|
return $result['n']; |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
return 0; |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
/** |
94
|
|
|
* Removes archived jobs older than $olderThan. |
95
|
|
|
* |
96
|
|
|
* @param \DateTime $olderThan |
97
|
|
|
* return int |
98
|
|
|
*/ |
99
|
|
|
public function pruneArchivedJobs(\DateTime $olderThan) |
100
|
|
|
{ |
101
|
|
|
return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan); |
102
|
|
|
} |
103
|
|
|
|
104
|
1 |
|
public function getJobCount($workerName = null, $method = null) |
105
|
|
|
{ |
106
|
|
|
/** @var DocumentManager $objectManager */ |
107
|
1 |
|
$objectManager = $this->getObjectManager(); |
108
|
1 |
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
109
|
|
|
$qb |
110
|
1 |
|
->find(); |
111
|
|
|
|
112
|
1 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
113
|
|
|
|
114
|
|
|
// Filter |
115
|
1 |
|
$date = new \DateTime(); |
116
|
|
|
$qb |
117
|
1 |
|
->addAnd( |
118
|
1 |
|
$qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date)) |
119
|
1 |
|
) |
120
|
1 |
|
->field('locked')->equals(null); |
121
|
|
|
|
122
|
1 |
|
$query = $qb->getQuery(); |
123
|
|
|
|
124
|
1 |
|
return $query->count(true); |
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* Get Status Jobs. |
129
|
|
|
* |
130
|
|
|
* @param string $documentName |
131
|
|
|
* |
132
|
|
|
* @return array |
133
|
|
|
*/ |
134
|
1 |
|
protected function getStatusByDocument($documentName) |
135
|
|
|
{ |
136
|
|
|
// Run a map reduce function get worker and status break down |
137
|
|
|
$mapFunc = "function() { |
138
|
|
|
var result = {}; |
139
|
|
|
result[this.status] = 1; |
140
|
|
|
var key = this.worker_name + '->' + this.method + '()'; |
141
|
|
|
emit(key, result); |
142
|
1 |
|
}"; |
143
|
|
|
$reduceFunc = 'function(k, vals) { |
144
|
|
|
var result = {}; |
145
|
|
|
for (var index in vals) { |
146
|
|
|
var val = vals[index]; |
147
|
|
|
for (var i in val) { |
148
|
|
|
if (result.hasOwnProperty(i)) { |
149
|
|
|
result[i] += val[i]; |
150
|
|
|
} |
151
|
|
|
else { |
152
|
|
|
result[i] = val[i]; |
153
|
|
|
} |
154
|
|
|
} |
155
|
|
|
} |
156
|
|
|
return result; |
157
|
1 |
|
}'; |
158
|
|
|
/** @var DocumentManager $objectManager */ |
159
|
1 |
|
$objectManager = $this->getObjectManager(); |
160
|
1 |
|
$qb = $objectManager->createQueryBuilder($documentName); |
161
|
1 |
|
$qb->map($mapFunc) |
162
|
1 |
|
->reduce($reduceFunc); |
163
|
1 |
|
$query = $qb->getQuery(); |
164
|
1 |
|
$results = $query->execute(); |
165
|
|
|
|
166
|
|
|
$allStatus = array( |
167
|
1 |
|
BaseJob::STATUS_ERROR => 0, |
168
|
1 |
|
BaseJob::STATUS_NEW => 0, |
169
|
1 |
|
RetryableJob::STATUS_EXPIRED => 0, |
170
|
1 |
|
RetryableJob::STATUS_MAX_ERROR => 0, |
171
|
1 |
|
RetryableJob::STATUS_MAX_RETRIES => 0, |
172
|
1 |
|
RetryableJob::STATUS_MAX_STALLED => 0, |
173
|
1 |
|
BaseJob::STATUS_RUNNING => 0, |
174
|
1 |
|
BaseJob::STATUS_SUCCESS => 0, |
175
|
1 |
|
); |
176
|
|
|
|
177
|
1 |
|
$status = []; |
178
|
|
|
|
179
|
1 |
|
foreach ($results as $info) { |
180
|
|
|
$status[$info['_id']] = $info['value'] + $allStatus; |
181
|
1 |
|
} |
182
|
|
|
|
183
|
1 |
|
return $status; |
184
|
|
|
} |
185
|
|
|
|
186
|
1 |
|
public function getStatus() |
187
|
|
|
{ |
188
|
1 |
|
$result = $this->getStatusByDocument($this->getObjectName()); |
189
|
1 |
|
$status2 = $this->getStatusByDocument($this->getArchiveObjectName()); |
190
|
1 |
|
foreach ($status2 as $key => $value) { |
191
|
|
|
foreach ($value as $k => $v) { |
192
|
|
|
$result[$key][$k] += $v; |
193
|
|
|
} |
194
|
1 |
|
} |
195
|
|
|
|
196
|
1 |
|
$finalResult = []; |
197
|
1 |
|
foreach ($result as $key => $item) { |
198
|
|
|
ksort($item); |
199
|
|
|
$finalResult[$key] = $item; |
200
|
1 |
|
} |
201
|
|
|
|
202
|
1 |
|
return $finalResult; |
203
|
|
|
} |
204
|
|
|
|
205
|
|
|
/** |
206
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
207
|
|
|
* |
208
|
|
|
* @param string $workerName |
209
|
|
|
* @param string $methodName |
210
|
|
|
* @param bool $prioritize |
211
|
|
|
* |
212
|
|
|
* @return \Dtc\QueueBundle\Model\Job |
213
|
|
|
*/ |
214
|
1 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
215
|
|
|
{ |
216
|
1 |
|
$builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize); |
|
|
|
|
217
|
|
|
$builder |
218
|
1 |
|
->findAndUpdate() |
219
|
1 |
|
->returnNew(); |
220
|
|
|
|
221
|
|
|
// Update |
222
|
|
|
$builder |
223
|
1 |
|
->field('lockedAt')->set($date) // Set started |
|
|
|
|
224
|
|
|
->field('locked')->set(true) |
225
|
|
|
->field('status')->set(BaseJob::STATUS_RUNNING) |
226
|
|
|
->field('runId')->set($runId); |
227
|
|
|
|
228
|
|
|
$query = $builder->getQuery(); |
229
|
|
|
|
230
|
|
|
$job = $query->execute(); |
231
|
|
|
|
232
|
|
|
return $job; |
233
|
|
|
} |
234
|
|
|
|
235
|
|
|
/** |
236
|
|
|
* @param null $workerName |
237
|
|
|
* @param null $methodName |
238
|
|
|
* @param bool $prioritize |
239
|
|
|
* |
240
|
|
|
* @return Builder |
241
|
|
|
*/ |
242
|
1 |
|
public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true) |
243
|
|
|
{ |
244
|
|
|
/** @var DocumentManager $objectManager */ |
245
|
1 |
|
$objectManager = $this->getObjectManager(); |
246
|
1 |
|
$builder = $objectManager->createQueryBuilder($this->getObjectName()); |
247
|
|
|
|
248
|
1 |
|
$this->addWorkerNameCriterion($builder, $workerName, $methodName); |
249
|
1 |
|
if ($prioritize) { |
250
|
1 |
|
$builder->sort([ |
251
|
1 |
|
'priority' => 'desc', |
252
|
1 |
|
'whenAt' => 'asc', |
253
|
1 |
|
]); |
254
|
1 |
|
} else { |
255
|
|
|
$builder->sort('whenAt', 'asc'); |
256
|
|
|
} |
257
|
|
|
|
258
|
|
|
// Filter |
259
|
1 |
|
$date = new \DateTime(); |
260
|
|
|
$builder |
261
|
1 |
|
->addAnd( |
262
|
1 |
|
$builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)), |
263
|
1 |
|
$builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date)) |
264
|
1 |
|
) |
265
|
1 |
|
->field('status')->equals(BaseJob::STATUS_NEW) |
266
|
1 |
|
->field('locked')->equals(null); |
267
|
|
|
|
268
|
1 |
|
return $builder; |
269
|
|
|
} |
270
|
|
|
|
271
|
|
|
protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job) |
272
|
|
|
{ |
273
|
|
|
/** @var DocumentManager $objectManager */ |
274
|
|
|
$objectManager = $this->getObjectManager(); |
275
|
|
|
$builder = $objectManager->createQueryBuilder($this->getObjectName()); |
276
|
|
|
$builder->find(); |
277
|
|
|
|
278
|
|
|
$builder->sort('whenAt', 'asc'); |
279
|
|
|
$builder->field('status')->equals(BaseJob::STATUS_NEW) |
280
|
|
|
->field('crcHash')->equals($job->getCrcHash()) |
281
|
|
|
->field('locked')->equals(null); |
282
|
|
|
$oldJob = $builder->getQuery()->getSingleResult(); |
283
|
|
|
|
284
|
|
|
if (!$oldJob) { |
285
|
|
|
return null; |
286
|
|
|
} |
287
|
|
|
|
288
|
|
|
// Update priority or whenAt |
289
|
|
|
// This makes sure if someone else is updating at the same time |
290
|
|
|
// that we don't trounce their changes. |
291
|
|
|
$builder = $objectManager->createQueryBuilder($this->getObjectName()); |
292
|
|
|
$builder->findAndUpdate(); |
293
|
|
|
$builder->field('_id')->equals($oldJob->getId()); |
294
|
|
|
$builder->field('priority')->lt($job->getPriority()); |
295
|
|
|
$builder->field('priority')->set($job->getPriority()); |
296
|
|
|
$builder->getQuery()->execute(); |
297
|
|
|
|
298
|
|
|
$builder = $objectManager->createQueryBuilder($this->getObjectName()); |
299
|
|
|
$builder->findAndUpdate(); |
300
|
|
|
$builder->field('_id')->equals($oldJob->getId()); |
301
|
|
|
$builder->field('whenAt')->gt($job->getWhenAt()); |
302
|
|
|
$builder->field('whenAt')->set($job->getWhenAt()); |
303
|
|
|
$builder->getQuery()->execute(); |
304
|
|
|
|
305
|
|
|
if ($job->getWhenAt() < $oldJob->getWhenAt()) { |
306
|
|
|
$oldJob->setWhenAt($job->getWhenAt()); |
307
|
|
|
} |
308
|
|
|
if ($job->getPriority() > $oldJob->getPriority()) { |
309
|
|
|
$oldJob->setPriority($job->getPriority()); |
310
|
|
|
} |
311
|
|
|
|
312
|
|
|
return $oldJob; |
313
|
|
|
} |
314
|
|
|
} |
315
|
|
|
|
This check looks at variables that have been passed in as parameters and are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.