1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Documents; |
4
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Model\JobManagerInterface; |
6
|
|
|
use Doctrine\ODM\MongoDB\DocumentRepository; |
7
|
|
|
use Doctrine\ODM\MongoDB\DocumentManager; |
8
|
|
|
|
9
|
|
|
class JobManager implements JobManagerInterface |
10
|
|
|
{ |
11
|
|
|
protected $dm; |
12
|
|
|
protected $documentName; |
13
|
|
|
|
14
|
|
|
public function __construct(DocumentManager $dm, $documentName) |
15
|
|
|
{ |
16
|
|
|
$this->dm = $dm; |
17
|
|
|
$this->documentName = $documentName; |
18
|
|
|
} |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @return DocumentManager |
22
|
|
|
*/ |
23
|
|
|
public function getDocumentManager() |
24
|
|
|
{ |
25
|
|
|
return $this->dm; |
26
|
|
|
} |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @return string |
30
|
|
|
*/ |
31
|
|
|
public function getDocumentName() |
32
|
|
|
{ |
33
|
|
|
return $this->documentName; |
34
|
|
|
} |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @return DocumentRepository |
38
|
|
|
*/ |
39
|
|
|
public function getRepository() |
40
|
|
|
{ |
41
|
|
|
return $this->dm->getRepository($this->documentName); |
42
|
|
|
} |
43
|
|
|
|
44
|
|
|
public function resetErroneousJobs($workerName = null, $method = null) |
45
|
|
|
{ |
46
|
|
|
$qb = $this->dm->createQueryBuilder($this->documentName); |
47
|
|
|
$qb |
|
|
|
|
48
|
|
|
->update() |
49
|
|
|
->multiple(true) |
50
|
|
|
->field('status')->equals(Job::STATUS_ERROR) |
51
|
|
|
->field('locked')->set(null) |
52
|
|
|
->field('status')->set(Job::STATUS_NEW); |
53
|
|
|
|
54
|
|
|
if ($workerName) { |
55
|
|
|
$qb->field('workerName')->equals($workerName); |
56
|
|
|
} |
57
|
|
|
|
58
|
|
|
if ($method) { |
59
|
|
|
$qb->field('method')->equals($method); |
60
|
|
|
} |
61
|
|
|
|
62
|
|
|
$query = $qb->getQuery(); |
63
|
|
|
$query->execute(); |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
public function pruneErroneousJobs($workerName = null, $method = null) |
67
|
|
|
{ |
68
|
|
|
$qb = $this->dm->createQueryBuilder($this->documentName); |
69
|
|
|
$qb |
70
|
|
|
->remove() |
71
|
|
|
->multiple(true) |
72
|
|
|
->field('status')->equals(Job::STATUS_ERROR); |
73
|
|
|
|
74
|
|
|
if ($workerName) { |
75
|
|
|
$qb->field('workerName')->equals($workerName); |
76
|
|
|
} |
77
|
|
|
|
78
|
|
|
if ($method) { |
79
|
|
|
$qb->field('method')->equals($method); |
80
|
|
|
} |
81
|
|
|
|
82
|
|
|
$query = $qb->getQuery(); |
83
|
|
|
$query->execute(); |
84
|
|
|
} |
85
|
|
|
|
86
|
|
|
public function getJobCount($workerName = null, $method = null) |
87
|
|
|
{ |
88
|
|
|
$qb = $this->dm->createQueryBuilder($this->documentName); |
89
|
|
|
$qb |
90
|
|
|
->find(); |
91
|
|
|
|
92
|
|
|
if ($workerName) { |
93
|
|
|
$qb->field('workerName')->equals($workerName); |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
if ($method) { |
97
|
|
|
$qb->field('method')->equals($method); |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
// Filter |
101
|
|
|
$qb |
102
|
|
|
->addOr($qb->expr()->field('when')->equals(null)) |
103
|
|
|
->addOr($qb->expr()->field('when')->lte(new \DateTime())) |
104
|
|
|
->field('locked')->equals(null); |
105
|
|
|
|
106
|
|
|
$query = $qb->getQuery(); |
107
|
|
|
|
108
|
|
|
return $query->count(true); |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
/** |
112
|
|
|
* Get Status Jobs. |
113
|
|
|
*/ |
114
|
|
|
public function getStatus() |
115
|
|
|
{ |
116
|
|
|
// Run a map reduce function get worker and status break down |
117
|
|
|
$mapFunc = "function() { |
118
|
|
|
var result = {}; |
119
|
|
|
result[this.status] = 1; |
120
|
|
|
var key = this.worker_name + '->' + this.method + '()'; |
121
|
|
|
emit(key, result); |
122
|
|
|
}"; |
123
|
|
|
$reduceFunc = 'function(k, vals) { |
124
|
|
|
var result = {}; |
125
|
|
|
for (var index in vals) { |
126
|
|
|
var val = vals[index]; |
127
|
|
|
for (var i in val) { |
128
|
|
|
if (result.hasOwnProperty(i)) { |
129
|
|
|
result[i] += val[i]; |
130
|
|
|
} |
131
|
|
|
else { |
132
|
|
|
result[i] = val[i]; |
133
|
|
|
} |
134
|
|
|
} |
135
|
|
|
} |
136
|
|
|
return result; |
137
|
|
|
}'; |
138
|
|
|
|
139
|
|
|
$qb = $this->dm->createQueryBuilder($this->documentName) |
140
|
|
|
->map($mapFunc) |
141
|
|
|
->reduce($reduceFunc); |
142
|
|
|
$query = $qb->getQuery(); |
143
|
|
|
$results = $query->execute(); |
144
|
|
|
|
145
|
|
|
$allStatus = array( |
146
|
|
|
Job::STATUS_ERROR => 0, |
147
|
|
|
Job::STATUS_NEW => 0, |
148
|
|
|
Job::STATUS_SUCCESS => 0, |
149
|
|
|
); |
150
|
|
|
|
151
|
|
|
$status = []; |
152
|
|
|
|
153
|
|
|
foreach ($results as $info) { |
154
|
|
|
$status[$info['_id']] = $info['value'] + $allStatus; |
155
|
|
|
} |
156
|
|
|
|
157
|
|
|
return $status; |
158
|
|
|
} |
159
|
|
|
|
160
|
|
|
/** |
161
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
162
|
|
|
* |
163
|
|
|
* @param string $workerName |
164
|
|
|
* @param string $methodName |
165
|
|
|
* @param bool $prioritize |
166
|
|
|
* |
167
|
|
|
* @return \Dtc\QueueBundle\Model\Job |
168
|
|
|
*/ |
169
|
|
|
public function getJob($workerName = null, $methodName = null, $prioritize = true) |
170
|
|
|
{ |
171
|
|
|
$qb = $this->dm->createQueryBuilder($this->documentName); |
172
|
|
|
$qb |
173
|
|
|
->findAndUpdate() |
174
|
|
|
->returnNew(); |
175
|
|
|
|
176
|
|
|
if ($workerName) { |
|
|
|
|
177
|
|
|
$qb->field('workerName')->equals($workerName); |
178
|
|
|
} |
179
|
|
|
|
180
|
|
|
if ($methodName) { |
|
|
|
|
181
|
|
|
$qb->field('method')->equals($methodName); |
182
|
|
|
} |
183
|
|
|
|
184
|
|
|
if ($prioritize) { |
185
|
|
|
$qb->sort('priority', 'asc'); |
186
|
|
|
} else { |
187
|
|
|
$qb->sort('when', 'asc'); |
188
|
|
|
} |
189
|
|
|
|
190
|
|
|
// Filter |
191
|
|
|
$qb |
192
|
|
|
->addOr($qb->expr()->field('when')->equals(null)) |
193
|
|
|
->addOr($qb->expr()->field('when')->lte(new \DateTime())) |
194
|
|
|
->field('status')->equals(Job::STATUS_NEW) |
195
|
|
|
->field('locked')->equals(null); |
196
|
|
|
// Update |
197
|
|
|
$qb |
198
|
|
|
->field('lockedAt')->set(new \DateTime()) // Set started |
199
|
|
|
->field('locked')->set(true) |
200
|
|
|
; |
201
|
|
|
|
202
|
|
|
//$arr = $qb->getQueryArray(); |
|
|
|
|
203
|
|
|
$query = $qb->getQuery(); |
204
|
|
|
|
205
|
|
|
//ve($query->debug()); |
|
|
|
|
206
|
|
|
$job = $query->execute(); |
207
|
|
|
|
208
|
|
|
return $job; |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
public function deleteJob(\Dtc\QueueBundle\Model\Job $job) |
212
|
|
|
{ |
213
|
|
|
$this->dm->remove($job); |
214
|
|
|
$this->dm->flush(); |
215
|
|
|
} |
216
|
|
|
|
217
|
|
|
public function saveHistory(\Dtc\QueueBundle\Model\Job $job) |
218
|
|
|
{ |
219
|
|
|
$this->save($job); |
220
|
|
|
} |
221
|
|
|
|
222
|
|
|
public function save(\Dtc\QueueBundle\Model\Job $job) |
223
|
|
|
{ |
224
|
|
|
// Todo: Serialize args |
225
|
|
|
|
226
|
|
|
// Generate crc hash for the job |
227
|
|
|
$hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs()); |
228
|
|
|
$crcHash = hash('sha256', serialize($hashValues)); |
229
|
|
|
$job->setCrcHash($crcHash); |
230
|
|
|
|
231
|
|
|
if ($job->getBatch() === true) { |
232
|
|
|
// See if similar job that hasn't run exists |
233
|
|
|
$criteria = array('crcHash' => $crcHash, 'status' => Job::STATUS_NEW); |
234
|
|
|
$oldJob = $this->getRepository()->findOneBy($criteria); |
235
|
|
|
|
236
|
|
|
if ($oldJob) { |
237
|
|
|
// Old job exists - just override fields Set higher priority |
238
|
|
|
$oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority())); |
239
|
|
|
$oldJob->setWhen(min($job->getWhen(), $oldJob->getWhen())); |
240
|
|
|
$oldJob->setBatch(true); |
241
|
|
|
$oldJob->setUpdatedAt(new \DateTime()); |
242
|
|
|
|
243
|
|
|
$this->dm->flush(); |
244
|
|
|
|
245
|
|
|
return $oldJob; |
246
|
|
|
} |
247
|
|
|
} |
248
|
|
|
|
249
|
|
|
// Just save a new job |
250
|
|
|
$this->dm->persist($job); |
251
|
|
|
$this->dm->flush(); |
252
|
|
|
|
253
|
|
|
return $job; |
254
|
|
|
} |
255
|
|
|
} |
256
|
|
|
|
This method has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.