1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ODM; |
4
|
|
|
|
5
|
|
|
use Doctrine\MongoDB\Query\Builder; |
6
|
|
|
use Dtc\QueueBundle\Doctrine\BaseJobManager; |
7
|
|
|
use Doctrine\ODM\MongoDB\DocumentManager; |
8
|
|
|
use Dtc\QueueBundle\Document\Job; |
9
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
10
|
|
|
|
11
|
|
|
class JobManager extends BaseJobManager |
12
|
|
|
{ |
13
|
3 |
View Code Duplication |
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
|
|
|
|
14
|
|
|
{ |
15
|
|
|
/** @var DocumentManager $objectManager */ |
16
|
3 |
|
$objectManager = $this->getObjectManager(); |
17
|
3 |
|
$qb = $objectManager->createQueryBuilder($objectName); |
18
|
|
|
$qb |
19
|
3 |
|
->find() |
20
|
3 |
|
->field('status')->equals($status); |
21
|
|
|
|
22
|
3 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
23
|
3 |
|
$query = $qb->getQuery(); |
24
|
|
|
|
25
|
3 |
|
return $query->count(); |
26
|
|
|
} |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @param string $objectName |
30
|
|
|
*/ |
31
|
8 |
|
public function stopIdGenerator($objectName) |
32
|
|
|
{ |
33
|
|
|
// Not needed for ODM |
34
|
8 |
|
} |
35
|
|
|
|
36
|
1 |
|
public function restoreIdGenerator($objectName) |
37
|
|
|
{ |
38
|
|
|
// Not needed for ODM |
39
|
1 |
|
} |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* @param string|null $workerName |
43
|
|
|
* @param string|null $method |
44
|
|
|
*/ |
45
|
|
|
public function pruneErroneousJobs($workerName = null, $method = null) |
46
|
|
|
{ |
47
|
1 |
|
return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) { |
48
|
|
|
/* @var Builder $qb */ |
49
|
1 |
|
return $qb->field('status')->equals(BaseJob::STATUS_ERROR); |
50
|
1 |
|
}); |
51
|
|
|
} |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* Prunes jobs according to a condition function. |
55
|
|
|
* |
56
|
|
|
* @param string|null $workerName |
57
|
|
|
* @param string|null $method |
58
|
|
|
* @param $conditionFunc |
59
|
|
|
* |
60
|
|
|
* @return int |
61
|
|
|
*/ |
62
|
1 |
|
protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc) |
63
|
|
|
{ |
64
|
|
|
/** @var DocumentManager $objectManager */ |
65
|
1 |
|
$objectManager = $this->getObjectManager(); |
66
|
1 |
|
$qb = $objectManager->createQueryBuilder($objectName); |
67
|
1 |
|
$qb = $qb->remove(); |
68
|
1 |
|
$qb = $conditionFunc($qb); |
69
|
1 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
70
|
|
|
|
71
|
1 |
|
$query = $qb->getQuery(); |
72
|
1 |
|
$result = $query->execute(); |
73
|
1 |
|
if (isset($result['n'])) { |
74
|
1 |
|
return $result['n']; |
75
|
|
|
} |
76
|
|
|
|
77
|
|
|
return 0; |
78
|
|
|
} |
79
|
|
|
|
80
|
10 |
|
protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null) |
81
|
|
|
{ |
82
|
10 |
|
if (null !== $workerName) { |
83
|
4 |
|
$builder->field('workerName')->equals($workerName); |
84
|
|
|
} |
85
|
|
|
|
86
|
10 |
|
if (null !== $method) { |
87
|
3 |
|
$builder->field('method')->equals($method); |
88
|
|
|
} |
89
|
10 |
|
} |
90
|
|
|
|
91
|
1 |
|
protected function updateExpired($workerName = null, $method = null) |
92
|
|
|
{ |
93
|
|
|
/** @var DocumentManager $objectManager */ |
94
|
1 |
|
$objectManager = $this->getObjectManager(); |
95
|
1 |
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
96
|
1 |
|
$qb = $qb->update()->updateMany(); |
|
|
|
|
97
|
1 |
|
$qb->field('expiresAt')->lte(new \DateTime()); |
98
|
1 |
|
$qb->field('status')->equals(BaseJob::STATUS_NEW); |
99
|
1 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
100
|
1 |
|
$qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED); |
101
|
1 |
|
$query = $qb->getQuery(); |
102
|
1 |
|
$result = $query->execute(); |
103
|
1 |
|
if (isset($result['n'])) { |
104
|
1 |
|
return $result['n']; |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
return 0; |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* Removes archived jobs older than $olderThan. |
112
|
|
|
* |
113
|
|
|
* @param \DateTime $olderThan |
114
|
|
|
*/ |
115
|
1 |
|
public function pruneArchivedJobs(\DateTime $olderThan) |
116
|
|
|
{ |
117
|
|
|
/** @var DocumentManager $objectManager */ |
118
|
1 |
|
$objectManager = $this->getObjectManager(); |
119
|
1 |
|
$qb = $objectManager->createQueryBuilder($this->getArchiveObjectName()); |
120
|
|
|
$qb |
121
|
1 |
|
->remove() |
122
|
1 |
|
->field('updatedAt')->lt($olderThan); |
123
|
|
|
|
124
|
1 |
|
$query = $qb->getQuery(); |
125
|
1 |
|
$result = $query->execute(); |
126
|
1 |
|
if (isset($result['n'])) { |
127
|
1 |
|
return $result['n']; |
128
|
|
|
} |
129
|
|
|
|
130
|
|
|
return 0; |
131
|
|
|
} |
132
|
|
|
|
133
|
1 |
|
public function getJobCount($workerName = null, $method = null) |
134
|
|
|
{ |
135
|
|
|
/** @var DocumentManager $objectManager */ |
136
|
1 |
|
$objectManager = $this->getObjectManager(); |
137
|
1 |
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
138
|
|
|
$qb |
139
|
1 |
|
->find(); |
140
|
|
|
|
141
|
1 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
142
|
|
|
|
143
|
|
|
// Filter |
144
|
1 |
|
$date = new \DateTime(); |
145
|
|
|
$qb |
146
|
1 |
|
->addAnd( |
147
|
1 |
|
$qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date)) |
148
|
|
|
) |
149
|
1 |
|
->field('locked')->equals(null); |
150
|
|
|
|
151
|
1 |
|
$query = $qb->getQuery(); |
152
|
|
|
|
153
|
1 |
|
return $query->count(true); |
154
|
|
|
} |
155
|
|
|
|
156
|
|
|
/** |
157
|
|
|
* Get Status Jobs. |
158
|
|
|
* |
159
|
|
|
* @param string $documentName |
160
|
|
|
*/ |
161
|
|
|
protected function getStatusByDocument($documentName) |
162
|
|
|
{ |
163
|
|
|
// Run a map reduce function get worker and status break down |
164
|
|
|
$mapFunc = "function() { |
165
|
|
|
var result = {}; |
166
|
|
|
result[this.status] = 1; |
167
|
|
|
var key = this.worker_name + '->' + this.method + '()'; |
168
|
|
|
emit(key, result); |
169
|
|
|
}"; |
170
|
|
|
$reduceFunc = 'function(k, vals) { |
171
|
|
|
var result = {}; |
172
|
|
|
for (var index in vals) { |
173
|
|
|
var val = vals[index]; |
174
|
|
|
for (var i in val) { |
175
|
|
|
if (result.hasOwnProperty(i)) { |
176
|
|
|
result[i] += val[i]; |
177
|
|
|
} |
178
|
|
|
else { |
179
|
|
|
result[i] = val[i]; |
180
|
|
|
} |
181
|
|
|
} |
182
|
|
|
} |
183
|
|
|
return result; |
184
|
|
|
}'; |
185
|
|
|
/** @var DocumentManager $objectManager */ |
186
|
|
|
$objectManager = $this->getObjectManager(); |
187
|
|
|
$qb = $objectManager->createQueryBuilder($documentName); |
188
|
|
|
$qb->map($mapFunc) |
189
|
|
|
->reduce($reduceFunc); |
190
|
|
|
$query = $qb->getQuery(); |
191
|
|
|
$results = $query->execute(); |
192
|
|
|
|
193
|
|
|
$allStatus = array( |
194
|
|
|
BaseJob::STATUS_ERROR => 0, |
195
|
|
|
BaseJob::STATUS_NEW => 0, |
196
|
|
|
BaseJob::STATUS_RUNNING => 0, |
197
|
|
|
BaseJob::STATUS_SUCCESS => 0, |
198
|
|
|
); |
199
|
|
|
|
200
|
|
|
$status = []; |
201
|
|
|
|
202
|
|
|
foreach ($results as $info) { |
203
|
|
|
$status[$info['_id']] = $info['value'] + $allStatus; |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
return $status; |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
public function getStatus() |
210
|
|
|
{ |
211
|
|
|
$result = $this->getStatusByDocument($this->getObjectName()); |
212
|
|
|
$status2 = $this->getStatusByDocument($this->getArchiveObjectName()); |
213
|
|
|
foreach ($status2 as $key => $value) { |
214
|
|
|
foreach ($value as $k => $v) { |
215
|
|
|
$result[$key][$k] += $v; |
216
|
|
|
} |
217
|
|
|
} |
218
|
|
|
|
219
|
|
|
$finalResult = []; |
220
|
|
|
foreach ($result as $key => $item) { |
221
|
|
|
ksort($item); |
222
|
|
|
$finalResult[$key] = $item; |
223
|
|
|
} |
224
|
|
|
|
225
|
|
|
return $finalResult; |
226
|
|
|
} |
227
|
|
|
|
228
|
|
|
/** |
229
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
230
|
|
|
* |
231
|
|
|
* @param string $workerName |
232
|
|
|
* @param string $methodName |
233
|
|
|
* @param bool $prioritize |
234
|
|
|
* |
235
|
|
|
* @return \Dtc\QueueBundle\Model\Job |
236
|
|
|
*/ |
237
|
5 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
238
|
|
|
{ |
239
|
|
|
/** @var DocumentManager $objectManager */ |
240
|
5 |
|
$objectManager = $this->getObjectManager(); |
241
|
5 |
|
$qb = $objectManager->createQueryBuilder($this->getObjectName()); |
242
|
|
|
$qb |
243
|
5 |
|
->findAndUpdate() |
244
|
5 |
|
->returnNew(); |
245
|
|
|
|
246
|
5 |
|
$this->addWorkerNameCriterion($qb, $workerName, $methodName); |
247
|
5 |
|
if ($prioritize) { |
248
|
5 |
|
$qb->sort('priority', 'asc'); |
249
|
|
|
} else { |
250
|
|
|
$qb->sort('whenAt', 'asc'); |
251
|
|
|
} |
252
|
|
|
|
253
|
|
|
// Filter |
254
|
5 |
|
$date = new \DateTime(); |
255
|
|
|
$qb |
256
|
5 |
|
->addAnd( |
257
|
5 |
|
$qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)), |
258
|
5 |
|
$qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date)) |
259
|
|
|
) |
260
|
5 |
|
->field('status')->equals(BaseJob::STATUS_NEW) |
261
|
5 |
|
->field('locked')->equals(null); |
262
|
|
|
|
263
|
|
|
// Update |
264
|
|
|
$qb |
265
|
5 |
|
->field('lockedAt')->set($date) // Set started |
266
|
5 |
|
->field('locked')->set(true) |
267
|
5 |
|
->field('status')->set(BaseJob::STATUS_RUNNING) |
268
|
5 |
|
->field('runId')->set($runId); |
269
|
|
|
|
270
|
5 |
|
$query = $qb->getQuery(); |
271
|
|
|
|
272
|
5 |
|
$job = $query->execute(); |
273
|
|
|
|
274
|
5 |
|
return $job; |
275
|
|
|
} |
276
|
|
|
} |
277
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.