Completed
Push — master ( b1ebb8...16b58d )
by Matthew
07:16
created

JobManager::getJobCount()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 1

Importance

Changes 1
Bugs 1 Features 0
Metric Value
dl 0
loc 22
ccs 11
cts 11
cp 1
rs 9.2
c 1
b 1
f 0
cc 1
eloc 13
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
    const REDUCE_FUNCTION = 'function(k, vals) {
16
            var result = {};
17
            for (var index in vals) {
18
                var val =  vals[index];
19
                for (var i in val) {
20
                    if (result.hasOwnProperty(i)) {
21
                        result[i] += val[i];
22
                    }
23
                    else {
24
                        result[i] = val[i];
25
                    }
26
                }
27
            }
28
            return result;
29
        }';
30
31 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
32
    {
33
        /** @var DocumentManager $objectManager */
34 3
        $objectManager = $this->getObjectManager();
35 3
        $qb = $objectManager->createQueryBuilder($objectName);
36
        $qb
37 3
            ->find()
38 3
            ->field('status')->equals($status);
39
40 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
41 3
        $query = $qb->getQuery();
42
43 3
        return $query->count();
44
    }
45
46
    /**
47
     * @param string|null $workerName
48
     * @param string|null $method
49
     */
50 1
    public function pruneErroneousJobs($workerName = null, $method = null)
51
    {
52
        /** @var DocumentManager $objectManager */
53 1
        $objectManager = $this->getObjectManager();
54 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
55 1
        $qb = $qb->remove();
56 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
57 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
58
59 1
        $query = $qb->getQuery();
60 1
        $result = $query->execute();
61 1
        if (isset($result['n'])) {
62 1
            return $result['n'];
63
        }
64
65
        return 0;
66
    }
67
68
    /**
69
     * @param Builder     $builder
70
     * @param string|null $workerName
71
     * @param string|null $method
72
     */
73 18
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
74
    {
75 18
        if (null !== $workerName) {
76 6
            $builder->field('workerName')->equals($workerName);
77
        }
78
79 18
        if (null !== $method) {
80 5
            $builder->field('method')->equals($method);
81
        }
82 18
    }
83
84
    /**
85
     * @param null $workerName
86
     * @param null $method
87
     *
88
     * @return int
89
     */
90 2
    protected function updateExpired($workerName = null, $method = null)
91
    {
92
        /** @var DocumentManager $objectManager */
93 2
        $objectManager = $this->getObjectManager();
94 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
95 2
        $qb = $qb->updateMany();
96 2
        $qb->field('expiresAt')->lte(new \DateTime());
97 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
98 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
99 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
100 2
        $query = $qb->getQuery();
101 2
        $result = $query->execute();
102 2
        if (isset($result['n'])) {
103 2
            return $result['n'];
104
        }
105
106
        return 0;
107
    }
108
109
    /**
110
     * Removes archived jobs older than $olderThan.
111
     *
112
     * @param \DateTime $olderThan
113
     *
114
     * @return int
115
     */
116 1
    public function pruneArchivedJobs(\DateTime $olderThan)
117
    {
118 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
119
    }
120
121 2
    public function getJobCount($workerName = null, $method = null)
122
    {
123
        /** @var DocumentManager $objectManager */
124 2
        $objectManager = $this->getObjectManager();
125 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
126
        $qb
127 2
            ->find();
128
129 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
130
131
        // Filter
132 2
        $date = new \DateTime();
133
        $qb
134 2
            ->addAnd(
135 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
136
            )
137 2
            ->field('locked')->equals(null);
138
139 2
        $query = $qb->getQuery();
140
141 2
        return $query->count(true);
142
    }
143
144
    /**
145
     * Get Status Jobs.
146
     *
147
     * @param string $documentName
148
     *
149
     * @return array
150
     */
151 2
    protected function getStatusByDocument($documentName)
152
    {
153
        // Run a map reduce function get worker and status break down
154 2
        $mapFunc = "function() {
155
            var result = {};
156
            result[this.status] = 1;
157
            var key = this.worker_name + '->' + this.method + '()';
158
            emit(key, result);
159
        }";
160 2
        $reduceFunc = self::REDUCE_FUNCTION;
161
        /** @var DocumentManager $objectManager */
162 2
        $objectManager = $this->getObjectManager();
163 2
        $qb = $objectManager->createQueryBuilder($documentName);
164 2
        $qb->map($mapFunc)
165 2
            ->reduce($reduceFunc);
166 2
        $query = $qb->getQuery();
167 2
        $results = $query->execute();
168
169
        $allStatus = array(
170 2
            BaseJob::STATUS_ERROR => 0,
171
            BaseJob::STATUS_NEW => 0,
172
            RetryableJob::STATUS_EXPIRED => 0,
173
            RetryableJob::STATUS_MAX_ERROR => 0,
174
            RetryableJob::STATUS_MAX_RETRIES => 0,
175
            RetryableJob::STATUS_MAX_STALLED => 0,
176
            BaseJob::STATUS_RUNNING => 0,
177
            BaseJob::STATUS_SUCCESS => 0,
178
        );
179
180 2
        $status = [];
181
182 2
        foreach ($results as $info) {
183 1
            $status[$info['_id']] = $info['value'] + $allStatus;
184
        }
185
186 2
        return $status;
187
    }
188
189 2
    public function getStatus()
190
    {
191 2
        $result = $this->getStatusByDocument($this->getJobClass());
192 2
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
193 2
        foreach ($status2 as $key => $value) {
194 1
            foreach ($value as $k => $v) {
195 1
                if (isset($result[$key][$k])) {
196 1
                    $result[$key][$k] += $v;
197
                } else {
198 1
                    $result[$key][$k] = $v;
199
                }
200
            }
201
        }
202
203 2
        $finalResult = [];
204 2
        foreach ($result as $key => $item) {
205 1
            ksort($item);
206 1
            $finalResult[$key] = $item;
207
        }
208
209 2
        return $finalResult;
210
    }
211
212
    /**
213
     * Get the next job to run (can be filtered by workername and method name).
214
     *
215
     * @param string      $workerName
216
     * @param string      $methodName
217
     * @param bool        $prioritize
218
     * @param string|null $runId
219
     *
220
     * @return \Dtc\QueueBundle\Model\Job
221
     */
222 11
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
223
    {
224 11
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 222 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 222 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
225
        $builder
226 11
            ->findAndUpdate()
227 11
            ->returnNew();
228
229 11
        $date = new \DateTime();
230
        // Update
231
        $builder
232 11
            ->field('lockedAt')->set($date) // Set started
233 11
            ->field('locked')->set(true)
234 11
            ->field('status')->set(BaseJob::STATUS_RUNNING)
235 11
            ->field('runId')->set($runId);
236
237 11
        $query = $builder->getQuery();
238
239 11
        $job = $query->execute();
240
241 11
        return $job;
242
    }
243
244
    /**
245
     * @param null $workerName
246
     * @param null $methodName
247
     * @param bool $prioritize
248
     *
249
     * @return Builder
250
     */
251 11
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
252
    {
253
        /** @var DocumentManager $objectManager */
254 11
        $objectManager = $this->getObjectManager();
255 11
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
256
257 11
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
258 11
        if ($prioritize) {
259 11
            $builder->sort([
260 11
                'priority' => 'desc',
261
                'whenAt' => 'asc',
262
            ]);
263
        } else {
264 1
            $builder->sort('whenAt', 'asc');
265
        }
266
267
        // Filter
268 11
        $this->addStandardPredicates($builder);
269
270 11
        return $builder;
271
    }
272
273 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
274
    {
275
        /** @var DocumentManager $objectManager */
276 1
        $objectManager = $this->getObjectManager();
277 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
278 1
        $builder->find();
279
280 1
        $builder->sort('whenAt', 'asc');
281 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
282 1
            ->field('crcHash')->equals($job->getCrcHash())
283 1
            ->field('locked')->equals(null);
284 1
        $oldJob = $builder->getQuery()->getSingleResult();
285
286 1
        if (!$oldJob) {
287
            return null;
288
        }
289
290
        // Update priority or whenAt
291
        //  This makes sure if someone else is updating at the same time
292
        //  that we don't trounce their changes.
293 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
294 1
        $builder->findAndUpdate();
295 1
        $builder->field('_id')->equals($oldJob->getId());
296 1
        $builder->field('priority')->lt($job->getPriority());
297 1
        $builder->field('priority')->set($job->getPriority());
298 1
        $builder->getQuery()->execute();
299
300 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
301 1
        $builder->findAndUpdate();
302 1
        $builder->field('_id')->equals($oldJob->getId());
303 1
        $builder->field('whenAt')->gt($job->getWhenAt());
304 1
        $builder->field('whenAt')->set($job->getWhenAt());
305 1
        $builder->getQuery()->execute();
306
307 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
308 1
            $oldJob->setWhenAt($job->getWhenAt());
309
        }
310 1
        if ($job->getPriority() > $oldJob->getPriority()) {
311 1
            $oldJob->setPriority($job->getPriority());
312
        }
313
314 1
        return $oldJob;
315
    }
316
317
    /**
318
     * @param mixed $builder
319
     */
320 11
    protected function addStandardPredicates($builder)
321
    {
322 11
        $date = new \DateTime();
323
        $builder
324 11
            ->addAnd(
325 11
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
326 11
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
327
            )
328 11
            ->field('status')->equals(BaseJob::STATUS_NEW)
329 11
            ->field('locked')->equals(null);
330 11
    }
331
332
    public function getWorkersAndMethods()
333
    {
334
        /** @var DocumentManager $documentManager */
335
        $documentManager = $this->getObjectManager();
336
337
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
338
            return [];
339
        }
340
341
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
342
343
        $this->addStandardPredicates($aggregationBuilder->match());
344
345
        $aggregationBuilder->group()
346
            ->field('id')
347
            ->expression(
348
                $aggregationBuilder->expr()
349
                ->field('workerName')->expression('$workerName')
350
                ->field('method')->expression('$method')
351
            );
352
        $results = $aggregationBuilder->execute()->toArray();
353
354
        if (!$results) {
355
            return [];
356
        }
357
358
        $workersMethods = [];
359
        foreach ($results as $result) {
360
            if (isset($result['_id'])) {
361
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
362
            }
363
        }
364
365
        return $workersMethods;
366
    }
367
368
    /**
369
     * @param string $workerName
370
     * @param string $methodName
371
     */
372 2
    public function countLiveJobs($workerName = null, $methodName = null)
373
    {
374
        /** @var DocumentManager $objectManager */
375 2
        $objectManager = $this->getObjectManager();
376 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
377
378 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
379
        // Filter
380 2
        $this->addStandardPredicates($builder);
381
382 2
        return $builder->getQuery()->count();
383
    }
384
385
    /**
386
     * @param string $workerName
387
     * @param string $methodName
388
     * @param \Closure $progressCallback
389
     */
390 1
    public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback)
391
    {
392
        /** @var DocumentManager $documentManager */
393 1
        $documentManager = $this->getObjectManager();
394 1
        $count = 0;
395 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 390 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 390 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
396
        $builder
397 1
            ->findAndUpdate()
398 1
            ->returnNew();
399
400 1
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
401 1
        $query = $builder->getQuery();
402
        do {
403 1
            $job = $query->execute();
404 1
            if ($job) {
405 1
                $documentManager->remove($job);
406 1
                ++$count;
407
408 1
                if (0 == $count % 10) {
409
                    $this->flush();
410
                    $progressCallback($count);
411
                }
412
            }
413 1
        } while ($job);
414 1
        $this->flush();
415 1
        $progressCallback($count);
416 1
    }
417
}
418