Completed
Pull Request — master (#30)
by Matthew
18:57 queued 16:20
created

JobManager::addWorkerNameCriterion()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 4
nop 3
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
use Dtc\QueueBundle\Model\StallableJob;
12
13
class JobManager extends DoctrineJobManager
14
{
15
    use CommonTrait;
16
    const REDUCE_FUNCTION = 'function(k, vals) {
17
            var result = {};
18
            for (var index in vals) {
19
                var val = vals[index];
20
                for (var i in val) {
21
                    if (result.hasOwnProperty(i)) {
22
                        result[i] += val[i];
23
                    }
24
                    else {
25
                        result[i] = val[i];
26
                    }
27
                }
28
            }
29
            return result;
30
        }';
31
32 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
33
    {
34
        /** @var DocumentManager $objectManager */
35 3
        $objectManager = $this->getObjectManager();
36 3
        $qb = $objectManager->createQueryBuilder($objectName);
37
        $qb
38 3
            ->find()
39 3
            ->field('status')->equals($status);
40
41 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42 3
        $query = $qb->getQuery();
43
44 3
        return $query->count();
45
    }
46
47
    /**
48
     * @param string|null $workerName
49
     * @param string|null $method
50
     */
51 1
    public function pruneExceptionJobs($workerName = null, $method = null)
52
    {
53
        /** @var DocumentManager $objectManager */
54 1
        $objectManager = $this->getObjectManager();
55 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
56 1
        $qb = $qb->remove();
57 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
58 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
59
60 1
        $query = $qb->getQuery();
61 1
        $result = $query->execute();
62 1
        if (isset($result['n'])) {
63 1
            return $result['n'];
64
        }
65
66
        return 0;
67
    }
68
69
    /**
70
     * @param Builder     $builder
71
     * @param string|null $workerName
72
     * @param string|null $method
73
     */
74 17
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
75
    {
76 17
        if (null !== $workerName) {
77 6
            $builder->field('workerName')->equals($workerName);
78
        }
79
80 17
        if (null !== $method) {
81 5
            $builder->field('method')->equals($method);
82
        }
83 17
    }
84
85
    /**
86
     * @param null $workerName
87
     * @param null $method
88
     *
89
     * @return int
90
     */
91 2
    protected function updateExpired($workerName = null, $method = null)
92
    {
93
        /** @var DocumentManager $objectManager */
94 2
        $objectManager = $this->getObjectManager();
95 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
96 2
        $qb = $qb->updateMany();
97 2
        $qb->field('expiresAt')->lte(new \DateTime());
98 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
99 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
100 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
101 2
        $query = $qb->getQuery();
102 2
        $result = $query->execute();
103 2
        if (isset($result['n'])) {
104 2
            return $result['n'];
105
        }
106
107
        return 0;
108
    }
109
110
    /**
111
     * Removes archived jobs older than $olderThan.
112
     *
113
     * @param \DateTime $olderThan
114
     *
115
     * @return int
116
     */
117 1
    public function pruneArchivedJobs(\DateTime $olderThan)
118
    {
119 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
120
    }
121
122 2
    public function getJobCount($workerName = null, $method = null)
123
    {
124
        /** @var DocumentManager $objectManager */
125 2
        $objectManager = $this->getObjectManager();
126 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
127
        $qb
128 2
            ->find();
129
130 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
131
132
        // Filter
133 2
        $date = new \DateTime();
134
        $qb
135 2
            ->addAnd(
136 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
137
            );
138 2
        $query = $qb->getQuery();
139
140 2
        return $query->count(true);
141
    }
142
143
    /**
144
     * Get Status Jobs.
145
     *
146
     * @param string $documentName
147
     *
148
     * @return array
149
     */
150 3
    protected function getStatusByDocument($documentName)
151
    {
152
        // Run a map reduce function get worker and status break down
153 3
        $mapFunc = "function() {
154
            var result = {};
155
            result[this.status] = 1;
156
            var key = this.worker_name + '->' + this.method + '()';
157
            emit(key, result);
158
        }";
159 3
        $reduceFunc = self::REDUCE_FUNCTION;
160
        /** @var DocumentManager $objectManager */
161 3
        $objectManager = $this->getObjectManager();
162 3
        $qb = $objectManager->createQueryBuilder($documentName);
163 3
        $qb->map($mapFunc)
164 3
            ->reduce($reduceFunc);
165 3
        $query = $qb->getQuery();
166 3
        $results = $query->execute();
167
168
        $allStatus = array(
169 3
            BaseJob::STATUS_NEW => 0,
170
            BaseJob::STATUS_RUNNING => 0,
171
            BaseJob::STATUS_SUCCESS => 0,
172
            BaseJob::STATUS_FAILURE => 0,
173
            BaseJob::STATUS_EXCEPTION => 0,
174
            StallableJob::STATUS_STALLED => 0,
175
            \Dtc\QueueBundle\Model\Job::STATUS_EXPIRED => 0,
176
            RetryableJob::STATUS_MAX_FAILURES => 0,
177
            RetryableJob::STATUS_MAX_EXCEPTIONS => 0,
178
            RetryableJob::STATUS_MAX_RETRIES => 0,
179
            StallableJob::STATUS_MAX_STALLS => 0,
180
        );
181
182 3
        $status = [];
183
184 3
        foreach ($results as $info) {
185 1
            $status[$info['_id']] = $info['value'] + $allStatus;
186
        }
187
188 3
        return $status;
189
    }
190
191 3
    public function getStatus()
192
    {
193 3
        $result = $this->getStatusByDocument($this->getJobClass());
194 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
195 3
        foreach ($status2 as $key => $value) {
196 1
            foreach ($value as $k => $v) {
197 1
                if (isset($result[$key][$k])) {
198 1
                    $result[$key][$k] += $v;
199
                } else {
200 1
                    $result[$key][$k] = $v;
201
                }
202
            }
203
        }
204
205 3
        $finalResult = [];
206 3
        foreach ($result as $key => $item) {
207 1
            ksort($item);
208 1
            $finalResult[$key] = $item;
209
        }
210
211 3
        return $finalResult;
212
    }
213
214
    /**
215
     * Get the next job to run (can be filtered by workername and method name).
216
     *
217
     * @param string      $workerName
218
     * @param string      $methodName
219
     * @param bool        $prioritize
220
     * @param string|null $runId
221
     *
222
     * @return \Dtc\QueueBundle\Model\Job
223
     */
224 10
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
225
    {
226 10
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
227
        $builder
228 10
            ->findAndUpdate()
229 10
            ->returnNew();
230
231 10
        $date = new \DateTime();
232
        // Update
233
        $builder
234 10
            ->field('startedAt')->set($date)
235 10
            ->field('status')->set(BaseJob::STATUS_RUNNING)
236 10
            ->field('runId')->set($runId);
237
238 10
        $query = $builder->getQuery();
239
240 10
        $job = $query->execute();
241
242 10
        return $job;
243
    }
244
245
    /**
246
     * @param string|null $workerName
247
     * @param string|null $methodName
248
     * @param bool        $prioritize
249
     *
250
     * @return Builder
251
     */
252 10
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
253
    {
254
        /** @var DocumentManager $objectManager */
255 10
        $objectManager = $this->getObjectManager();
256 10
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
257
258 10
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
259 10
        if ($prioritize) {
260 10
            $builder->sort([
261 10
                'priority' => 'desc',
262
                'whenAt' => 'asc',
263
            ]);
264
        } else {
265 1
            $builder->sort('whenAt', 'asc');
266
        }
267
268
        // Filter
269 10
        $this->addStandardPredicates($builder);
270
271 10
        return $builder;
272
    }
273
274 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
275
    {
276
        /** @var DocumentManager $objectManager */
277 1
        $objectManager = $this->getObjectManager();
278 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
279 1
        $builder->find();
280
281 1
        $builder->sort('whenAt', 'asc');
282 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
283 1
            ->field('crcHash')->equals($job->getCrcHash());
284 1
        $oldJob = $builder->getQuery()->getSingleResult();
285
286 1
        if (!$oldJob) {
287
            return null;
288
        }
289
290
        // Update priority or whenAt
291
        //  This makes sure if someone else is updating at the same time
292
        //  that we don't trounce their changes.
293 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
294 1
        $builder->findAndUpdate();
295 1
        $builder->field('_id')->equals($oldJob->getId());
296 1
        $builder->field('priority')->lt($job->getPriority());
297 1
        $builder->field('priority')->set($job->getPriority());
298 1
        $builder->getQuery()->execute();
299
300 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
301 1
        $builder->findAndUpdate();
302 1
        $builder->field('_id')->equals($oldJob->getId());
303 1
        $builder->field('whenAt')->gt($job->getWhenAt());
304 1
        $builder->field('whenAt')->set($job->getWhenAt());
305 1
        $builder->getQuery()->execute();
306
307 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
308 1
            $oldJob->setWhenAt($job->getWhenAt());
309
        }
310 1
        if ($job->getPriority() > $oldJob->getPriority()) {
311 1
            $oldJob->setPriority($job->getPriority());
312
        }
313
314 1
        return $oldJob;
315
    }
316
317
    /**
318
     * @param mixed $builder
319
     */
320 11
    protected function addStandardPredicates($builder)
321
    {
322 11
        $date = new \DateTime();
323
        $builder
324 11
            ->addAnd(
325 11
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
326 11
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
327
            )
328 11
            ->field('status')->equals(BaseJob::STATUS_NEW);
329 11
    }
330
331 1
    public function getWorkersAndMethods()
332
    {
333
        /** @var DocumentManager $documentManager */
334 1
        $documentManager = $this->getObjectManager();
335
336 1
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
337
            return [];
338
        }
339
340 1
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
341
342 1
        $this->addStandardPredicates($aggregationBuilder->match());
343
344 1
        $aggregationBuilder->group()
345 1
            ->field('id')
346 1
            ->expression(
347 1
                $aggregationBuilder->expr()
348 1
                ->field('workerName')->expression('$workerName')
349 1
                ->field('method')->expression('$method')
350
            );
351 1
        $results = $aggregationBuilder->execute()->toArray();
352
353 1
        if (!$results) {
354 1
            return [];
355
        }
356
357
        $workersMethods = [];
358
        foreach ($results as $result) {
359
            if (isset($result['_id'])) {
360
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
361
            }
362
        }
363
364
        return $workersMethods;
365
    }
366
367
    /**
368
     * @param string $workerName
369
     * @param string $methodName
370
     */
371 2
    public function countLiveJobs($workerName = null, $methodName = null)
372
    {
373
        /** @var DocumentManager $objectManager */
374 2
        $objectManager = $this->getObjectManager();
375 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
376
377 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
378
        // Filter
379 2
        $this->addStandardPredicates($builder);
380
381 2
        return $builder->getQuery()->count();
382
    }
383
384
    /**
385
     * @param string   $workerName
386
     * @param string   $methodName
387
     * @param \Closure $progressCallback
388
     */
389 1
    public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback)
390
    {
391
        /** @var DocumentManager $documentManager */
392 1
        $documentManager = $this->getObjectManager();
393 1
        $count = 0;
394 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
395
        $builder
396 1
            ->findAndUpdate()
397 1
            ->returnNew();
398
399 1
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
400 1
        $query = $builder->getQuery();
401
        do {
402 1
            $job = $query->execute();
403 1
            if ($job) {
404 1
                $documentManager->remove($job);
405 1
                ++$count;
406
407 1
                if (0 == $count % 10) {
408
                    $this->flush();
409
                    $progressCallback($count);
410
                }
411
            }
412 1
        } while ($job);
413 1
        $this->flush();
414 1
        $progressCallback($count);
415 1
    }
416
}
417