Completed
Pull Request — master (#30)
by Matthew
48:21 queued 02:09
created

JobManager::updateNearestBatch()   B

Complexity

Conditions 4
Paths 5

Size

Total Lines 42
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 26
CRAP Score 4.0008

Importance

Changes 0
Metric Value
dl 0
loc 42
ccs 26
cts 27
cp 0.963
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 27
nc 5
nop 1
crap 4.0008
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
use Dtc\QueueBundle\Model\StallableJob;
12
use Dtc\QueueBundle\Util\Util;
13
14
class JobManager extends DoctrineJobManager
15
{
16
    use CommonTrait;
17
    const REDUCE_FUNCTION = 'function(k, vals) {
18
            var result = {};
19
            for (var index in vals) {
20
                var val = vals[index];
21
                for (var i in val) {
22
                    if (result.hasOwnProperty(i)) {
23
                        result[i] += val[i];
24
                    }
25
                    else {
26
                        result[i] = val[i];
27
                    }
28
                }
29
            }
30
            return result;
31
        }';
32
33 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
34
    {
35
        /** @var DocumentManager $objectManager */
36 3
        $objectManager = $this->getObjectManager();
37 3
        $qb = $objectManager->createQueryBuilder($objectName);
38
        $qb
39 3
            ->find()
40 3
            ->field('status')->equals($status);
41
42 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
43 3
        $query = $qb->getQuery();
44
45 3
        return $query->count();
46
    }
47
48
    /**
49
     * @param string|null $workerName
50
     * @param string|null $method
51
     */
52 1
    public function pruneExceptionJobs($workerName = null, $method = null)
53
    {
54
        /** @var DocumentManager $objectManager */
55 1
        $objectManager = $this->getObjectManager();
56 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
57 1
        $qb = $qb->remove();
58 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
59 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
60
61 1
        $query = $qb->getQuery();
62 1
        $result = $query->execute();
63 1
        if (isset($result['n'])) {
64 1
            return $result['n'];
65
        }
66
67
        return 0;
68
    }
69
70
    /**
71
     * @param Builder     $builder
72
     * @param string|null $workerName
73
     * @param string|null $method
74
     */
75 18
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
76
    {
77 18
        if (null !== $workerName) {
78 6
            $builder->field('workerName')->equals($workerName);
79
        }
80
81 18
        if (null !== $method) {
82 5
            $builder->field('method')->equals($method);
83
        }
84 18
    }
85
86
    /**
87
     * @param null $workerName
88
     * @param null $method
89
     *
90
     * @return int
91
     */
92 2
    protected function updateExpired($workerName = null, $method = null)
93
    {
94
        /** @var DocumentManager $objectManager */
95 2
        $objectManager = $this->getObjectManager();
96 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
97 2
        $qb = $qb->updateMany();
98 2
        $qb->field('expiresAt')->lte(Util::getMicrotimeDateTime());
99 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
100 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
101 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
102 2
        $query = $qb->getQuery();
103 2
        $result = $query->execute();
104 2
        if (isset($result['n'])) {
105 2
            return $result['n'];
106
        }
107
108
        return 0;
109
    }
110
111
    /**
112
     * Removes archived jobs older than $olderThan.
113
     *
114
     * @param \DateTime $olderThan
115
     *
116
     * @return int
117
     */
118 1
    public function pruneArchivedJobs(\DateTime $olderThan)
119
    {
120 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
121
    }
122
123 2
    public function getJobCount($workerName = null, $method = null)
124
    {
125
        /** @var DocumentManager $objectManager */
126 2
        $objectManager = $this->getObjectManager();
127 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
128
        $qb
129 2
            ->find();
130
131 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
132
133
        // Filter
134 2
        $date = Util::getMicrotimeDateTime();
135
        $qb
136 2
            ->addAnd(
137 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
138
            );
139 2
        $query = $qb->getQuery();
140
141 2
        return $query->count(true);
142
    }
143
144
    /**
145
     * Get Status Jobs.
146
     *
147
     * @param string $documentName
148
     *
149
     * @return array
150
     */
151 3
    protected function getStatusByDocument($documentName)
152
    {
153
        // Run a map reduce function get worker and status break down
154 3
        $mapFunc = "function() {
155
            var result = {};
156
            result[this.status] = 1;
157
            var key = this.worker_name + '->' + this.method + '()';
158
            emit(key, result);
159
        }";
160 3
        $reduceFunc = self::REDUCE_FUNCTION;
161
        /** @var DocumentManager $objectManager */
162 3
        $objectManager = $this->getObjectManager();
163 3
        $qb = $objectManager->createQueryBuilder($documentName);
164 3
        $qb->map($mapFunc)
165 3
            ->reduce($reduceFunc);
166 3
        $query = $qb->getQuery();
167 3
        $results = $query->execute();
168
169
        $allStatus = array(
170 3
            BaseJob::STATUS_NEW => 0,
171
            BaseJob::STATUS_RUNNING => 0,
172
            BaseJob::STATUS_SUCCESS => 0,
173
            BaseJob::STATUS_FAILURE => 0,
174
            BaseJob::STATUS_EXCEPTION => 0,
175
            StallableJob::STATUS_STALLED => 0,
176
            \Dtc\QueueBundle\Model\Job::STATUS_EXPIRED => 0,
177
            RetryableJob::STATUS_MAX_FAILURES => 0,
178
            RetryableJob::STATUS_MAX_EXCEPTIONS => 0,
179
            RetryableJob::STATUS_MAX_RETRIES => 0,
180
            StallableJob::STATUS_MAX_STALLS => 0,
181
        );
182
183 3
        $status = [];
184
185 3
        foreach ($results as $info) {
186 1
            $status[$info['_id']] = $info['value'] + $allStatus;
187
        }
188
189 3
        return $status;
190
    }
191
192 3
    public function getStatus()
193
    {
194 3
        $result = $this->getStatusByDocument($this->getJobClass());
195 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
196 3
        foreach ($status2 as $key => $value) {
197 1
            foreach ($value as $k => $v) {
198 1
                if (isset($result[$key][$k])) {
199 1
                    $result[$key][$k] += $v;
200
                } else {
201 1
                    $result[$key][$k] = $v;
202
                }
203
            }
204
        }
205
206 3
        $finalResult = [];
207 3
        foreach ($result as $key => $item) {
208 1
            ksort($item);
209 1
            $finalResult[$key] = $item;
210
        }
211
212 3
        return $finalResult;
213
    }
214
215
    /**
216
     * Get the next job to run (can be filtered by workername and method name).
217
     *
218
     * @param string      $workerName
219
     * @param string      $methodName
220
     * @param bool        $prioritize
221
     * @param string|null $runId
222
     *
223
     * @return \Dtc\QueueBundle\Model\Job
224
     */
225 11
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
226
    {
227 11
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
228
        $builder
229 11
            ->findAndUpdate()
230 11
            ->returnNew();
231
232 11
        $date = Util::getMicrotimeDateTime();
233
        // Update
234
        $builder
235 11
            ->field('startedAt')->set($date)
236 11
            ->field('status')->set(BaseJob::STATUS_RUNNING)
237 11
            ->field('runId')->set($runId);
238
239 11
        $query = $builder->getQuery();
240
241 11
        $job = $query->execute();
242
243 11
        return $job;
244
    }
245
246
    /**
247
     * @param string|null $workerName
248
     * @param string|null $methodName
249
     * @param bool        $prioritize
250
     *
251
     * @return Builder
252
     */
253 11
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
254
    {
255
        /** @var DocumentManager $objectManager */
256 11
        $objectManager = $this->getObjectManager();
257 11
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
258
259 11
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
260 11
        if ($prioritize) {
261 11
            $builder->sort([
262 11
                'priority' => 'desc',
263
                'whenAt' => 'asc',
264
            ]);
265
        } else {
266 1
            $builder->sort('whenAt', 'asc');
267
        }
268
269
        // Filter
270 11
        $this->addStandardPredicates($builder);
271
272 11
        return $builder;
273
    }
274
275 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
276
    {
277
        /** @var DocumentManager $objectManager */
278 1
        $objectManager = $this->getObjectManager();
279 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
280 1
        $builder->find();
281
282 1
        $builder->sort('whenAt', 'asc');
283 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
284 1
            ->field('crcHash')->equals($job->getCrcHash());
285 1
        $oldJob = $builder->getQuery()->getSingleResult();
286
287 1
        if (!$oldJob) {
288
            return null;
289
        }
290
291
        // Update priority or whenAt
292
        //  This makes sure if someone else is updating at the same time
293
        //  that we don't trounce their changes.
294 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
295 1
        $builder->findAndUpdate();
296 1
        $builder->field('_id')->equals($oldJob->getId());
297 1
        $builder->field('priority')->lt($job->getPriority());
298 1
        $builder->field('priority')->set($job->getPriority());
299 1
        $builder->getQuery()->execute();
300
301 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
302 1
        $builder->findAndUpdate();
303 1
        $builder->field('_id')->equals($oldJob->getId());
304 1
        $builder->field('whenAt')->gt($job->getWhenAt());
305 1
        $builder->field('whenAt')->set($job->getWhenAt());
306 1
        $builder->getQuery()->execute();
307
308 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
309 1
            $oldJob->setWhenAt($job->getWhenAt());
310
        }
311 1
        if ($job->getPriority() > $oldJob->getPriority()) {
312 1
            $oldJob->setPriority($job->getPriority());
313
        }
314
315 1
        return $oldJob;
316
    }
317
318
    /**
319
     * @param mixed $builder
320
     */
321 12
    protected function addStandardPredicates($builder)
322
    {
323 12
        $date = Util::getMicrotimeDateTime();
324
        $builder
325 12
            ->addAnd(
326 12
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
327 12
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
328
            )
329 12
            ->field('status')->equals(BaseJob::STATUS_NEW);
330 12
    }
331
332 1
    public function getWorkersAndMethods()
333
    {
334
        /** @var DocumentManager $documentManager */
335 1
        $documentManager = $this->getObjectManager();
336
337 1
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
338
            return [];
339
        }
340
341 1
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
342
343 1
        $this->addStandardPredicates($aggregationBuilder->match());
344
345 1
        $aggregationBuilder->group()
346 1
            ->field('id')
347 1
            ->expression(
348 1
                $aggregationBuilder->expr()
349 1
                ->field('workerName')->expression('$workerName')
350 1
                ->field('method')->expression('$method')
351
            );
352 1
        $results = $aggregationBuilder->execute()->toArray();
353
354 1
        if (!$results) {
355 1
            return [];
356
        }
357
358
        $workersMethods = [];
359
        foreach ($results as $result) {
360
            if (isset($result['_id'])) {
361
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
362
            }
363
        }
364
365
        return $workersMethods;
366
    }
367
368
    /**
369
     * @param string $workerName
370
     * @param string $methodName
371
     */
372 2
    public function countLiveJobs($workerName = null, $methodName = null)
373
    {
374
        /** @var DocumentManager $objectManager */
375 2
        $objectManager = $this->getObjectManager();
376 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
377
378 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
379
        // Filter
380 2
        $this->addStandardPredicates($builder);
381
382 2
        return $builder->getQuery()->count();
383
    }
384
385
    /**
386
     * @param string   $workerName
387
     * @param string   $methodName
388
     * @param \Closure $progressCallback
389
     */
390 1
    public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback)
391
    {
392
        /** @var DocumentManager $documentManager */
393 1
        $documentManager = $this->getObjectManager();
394 1
        $count = 0;
395 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
396
        $builder
397 1
            ->findAndUpdate()
398 1
            ->returnNew();
399
400 1
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
401 1
        $query = $builder->getQuery();
402
        do {
403 1
            $job = $query->execute();
404 1
            if ($job) {
405 1
                $documentManager->remove($job);
406 1
                ++$count;
407
408 1
                if (0 == $count % 10) {
409
                    $this->flush();
410
                    $progressCallback($count);
411
                }
412
            }
413 1
        } while ($job);
414 1
        $this->flush();
415 1
        $progressCallback($count);
416 1
    }
417
}
418